aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/basecomponent/base.go289
-rw-r--r--internal/basecomponent/flags.go61
-rw-r--r--internal/caching/immutablecache.go17
-rw-r--r--internal/caching/immutableinmemorylru.go95
-rw-r--r--internal/config/appservice.go329
-rw-r--r--internal/config/config.go804
-rw-r--r--internal/config/config_test.go145
-rw-r--r--internal/consumers.go125
-rw-r--r--internal/eventcontent.go86
-rw-r--r--internal/events.go150
-rw-r--r--internal/http/http.go57
-rw-r--r--internal/httpapi.go234
-rw-r--r--internal/httpapi_test.go95
-rw-r--r--internal/keydb/cache/keydb.go69
-rw-r--r--internal/keydb/interface.go13
-rw-r--r--internal/keydb/keydb.go50
-rw-r--r--internal/keydb/keydb_wasm.go48
-rw-r--r--internal/keydb/keyring.go74
-rw-r--r--internal/keydb/postgres/keydb.go115
-rw-r--r--internal/keydb/postgres/server_key_table.go144
-rw-r--r--internal/keydb/sqlite3/keydb.go116
-rw-r--r--internal/keydb/sqlite3/server_key_table.go152
-rw-r--r--internal/log.go188
-rw-r--r--internal/partition_offset_table.go111
-rw-r--r--internal/postgres.go25
-rw-r--r--internal/postgres_wasm.go22
-rw-r--r--internal/routing.go35
-rw-r--r--internal/sql.go109
-rw-r--r--internal/sqlutil/trace.go6
-rw-r--r--internal/test/client.go158
-rw-r--r--internal/test/config.go208
-rw-r--r--internal/test/kafka.go76
-rw-r--r--internal/test/server.go105
-rw-r--r--internal/test/slice.go34
-rw-r--r--internal/transactions/transactions.go95
-rw-r--r--internal/transactions/transactions_test.go77
-rw-r--r--internal/types.go66
37 files changed, 4580 insertions, 3 deletions
diff --git a/internal/basecomponent/base.go b/internal/basecomponent/base.go
new file mode 100644
index 00000000..e9a375a7
--- /dev/null
+++ b/internal/basecomponent/base.go
@@ -0,0 +1,289 @@
+// Copyright 2017 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package basecomponent
+
+import (
+ "database/sql"
+ "io"
+ "net/http"
+ "net/url"
+ "time"
+
+ "golang.org/x/crypto/ed25519"
+
+ "github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/keydb"
+ "github.com/matrix-org/dendrite/internal/keydb/cache"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/naffka"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
+ "github.com/matrix-org/dendrite/internal"
+
+ "github.com/Shopify/sarama"
+ "github.com/gorilla/mux"
+
+ appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
+ eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/config"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/sirupsen/logrus"
+
+ _ "net/http/pprof"
+)
+
+// BaseDendrite is a base for creating new instances of dendrite. It parses
+// command line flags and config, and exposes methods for creating various
+// resources. All errors are handled by logging then exiting, so all methods
+// should only be used during start up.
+// Must be closed when shutting down.
+type BaseDendrite struct {
+ componentName string
+ tracerCloser io.Closer
+
+ // APIMux should be used to register new public matrix api endpoints
+ APIMux *mux.Router
+ EnableHTTPAPIs bool
+ httpClient *http.Client
+ Cfg *config.Dendrite
+ ImmutableCache caching.ImmutableCache
+ KafkaConsumer sarama.Consumer
+ KafkaProducer sarama.SyncProducer
+}
+
+const HTTPServerTimeout = time.Minute * 5
+const HTTPClientTimeout = time.Second * 30
+
+// NewBaseDendrite creates a new instance to be used by a component.
+// The componentName is used for logging purposes, and should be a friendly name
+// of the compontent running, e.g. "SyncAPI"
+func NewBaseDendrite(cfg *config.Dendrite, componentName string, enableHTTPAPIs bool) *BaseDendrite {
+ internal.SetupStdLogging()
+ internal.SetupHookLogging(cfg.Logging, componentName)
+ internal.SetupPprof()
+
+ closer, err := cfg.SetupTracing("Dendrite" + componentName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to start opentracing")
+ }
+
+ var kafkaConsumer sarama.Consumer
+ var kafkaProducer sarama.SyncProducer
+ if cfg.Kafka.UseNaffka {
+ kafkaConsumer, kafkaProducer = setupNaffka(cfg)
+ } else {
+ kafkaConsumer, kafkaProducer = setupKafka(cfg)
+ }
+
+ cache, err := caching.NewImmutableInMemoryLRUCache()
+ if err != nil {
+ logrus.WithError(err).Warnf("Failed to create cache")
+ }
+
+ return &BaseDendrite{
+ componentName: componentName,
+ EnableHTTPAPIs: enableHTTPAPIs,
+ tracerCloser: closer,
+ Cfg: cfg,
+ ImmutableCache: cache,
+ APIMux: mux.NewRouter().UseEncodedPath(),
+ httpClient: &http.Client{Timeout: HTTPClientTimeout},
+ KafkaConsumer: kafkaConsumer,
+ KafkaProducer: kafkaProducer,
+ }
+}
+
+// Close implements io.Closer
+func (b *BaseDendrite) Close() error {
+ return b.tracerCloser.Close()
+}
+
+// CreateHTTPAppServiceAPIs returns the QueryAPI for hitting the appservice
+// component over HTTP.
+func (b *BaseDendrite) CreateHTTPAppServiceAPIs() appserviceAPI.AppServiceQueryAPI {
+ a, err := appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), b.httpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed")
+ }
+ return a
+}
+
+// CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI for hitting
+// the roomserver over HTTP.
+func (b *BaseDendrite) CreateHTTPRoomserverAPIs() roomserverAPI.RoomserverInternalAPI {
+ rsAPI, err := roomserverAPI.NewRoomserverInternalAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient, b.ImmutableCache)
+ if err != nil {
+ logrus.WithError(err).Panic("NewRoomserverInternalAPIHTTP failed", b.httpClient)
+ }
+ return rsAPI
+}
+
+// CreateHTTPEDUServerAPIs returns eduInputAPI for hitting the EDU
+// server over HTTP
+func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI {
+ e, err := eduServerAPI.NewEDUServerInputAPIHTTP(b.Cfg.EDUServerURL(), b.httpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("NewEDUServerInputAPIHTTP failed", b.httpClient)
+ }
+ return e
+}
+
+// CreateHTTPFederationSenderAPIs returns FederationSenderInternalAPI for hitting
+// the federation sender over HTTP
+func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderInternalAPI {
+ f, err := federationSenderAPI.NewFederationSenderInternalAPIHTTP(b.Cfg.FederationSenderURL(), b.httpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("NewFederationSenderInternalAPIHTTP failed", b.httpClient)
+ }
+ return f
+}
+
+// CreateDeviceDB creates a new instance of the device database. Should only be
+// called once per component.
+func (b *BaseDendrite) CreateDeviceDB() devices.Database {
+ db, err := devices.NewDatabase(string(b.Cfg.Database.Device), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to devices db")
+ }
+
+ return db
+}
+
+// CreateAccountsDB creates a new instance of the accounts database. Should only
+// be called once per component.
+func (b *BaseDendrite) CreateAccountsDB() accounts.Database {
+ db, err := accounts.NewDatabase(string(b.Cfg.Database.Account), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to accounts db")
+ }
+
+ return db
+}
+
+// CreateKeyDB creates a new instance of the key database. Should only be called
+// once per component.
+func (b *BaseDendrite) CreateKeyDB() keydb.Database {
+ db, err := keydb.NewDatabase(
+ string(b.Cfg.Database.ServerKey),
+ b.Cfg.DbProperties(),
+ b.Cfg.Matrix.ServerName,
+ b.Cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey),
+ b.Cfg.Matrix.KeyID,
+ )
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to keys db")
+ }
+
+ cachedDB, err := cache.NewKeyDatabase(db, b.ImmutableCache)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to create key cache wrapper")
+ }
+ return cachedDB
+}
+
+// CreateFederationClient creates a new federation client. Should only be called
+// once per component.
+func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient {
+ return gomatrixserverlib.NewFederationClient(
+ b.Cfg.Matrix.ServerName, b.Cfg.Matrix.KeyID, b.Cfg.Matrix.PrivateKey,
+ )
+}
+
+// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on
+// ApiMux under /api/ and adds a prometheus handler under /metrics.
+func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) {
+ // If a separate bind address is defined, listen on that. Otherwise use
+ // the listen address
+ var addr string
+ if bindaddr != "" {
+ addr = bindaddr
+ } else {
+ addr = listenaddr
+ }
+
+ serv := http.Server{
+ Addr: addr,
+ WriteTimeout: HTTPServerTimeout,
+ }
+
+ internal.SetupHTTPAPI(http.DefaultServeMux, internal.WrapHandlerInCORS(b.APIMux), b.Cfg)
+ logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr)
+
+ err := serv.ListenAndServe()
+ if err != nil {
+ logrus.WithError(err).Fatal("failed to serve http")
+ }
+
+ logrus.Infof("Stopped %s server on %s", b.componentName, serv.Addr)
+}
+
+// setupKafka creates kafka consumer/producer pair from the config.
+func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
+ consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to start kafka consumer")
+ }
+
+ producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to setup kafka producers")
+ }
+
+ return consumer, producer
+}
+
+// setupNaffka creates kafka consumer/producer pair from the config.
+func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
+ var err error
+ var db *sql.DB
+ var naffkaDB *naffka.DatabaseImpl
+
+ uri, err := url.Parse(string(cfg.Database.Naffka))
+ if err != nil || uri.Scheme == "file" {
+ db, err = sqlutil.Open(internal.SQLiteDriverName(), string(cfg.Database.Naffka), nil)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to open naffka database")
+ }
+
+ naffkaDB, err = naffka.NewSqliteDatabase(db)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ } else {
+ db, err = sqlutil.Open("postgres", string(cfg.Database.Naffka), nil)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to open naffka database")
+ }
+
+ naffkaDB, err = naffka.NewPostgresqlDatabase(db)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ }
+
+ if naffkaDB == nil {
+ panic("naffka connection string not understood")
+ }
+
+ naff, err := naffka.New(naffkaDB)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka")
+ }
+
+ return naff, naff
+}
diff --git a/internal/basecomponent/flags.go b/internal/basecomponent/flags.go
new file mode 100644
index 00000000..ecef81f6
--- /dev/null
+++ b/internal/basecomponent/flags.go
@@ -0,0 +1,61 @@
+// Copyright 2017 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package basecomponent
+
+import (
+ "flag"
+
+ "github.com/matrix-org/dendrite/internal/config"
+
+ "github.com/sirupsen/logrus"
+)
+
+var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
+
+// ParseFlags parses the commandline flags and uses them to create a config.
+// If running as a monolith use `ParseMonolithFlags` instead.
+func ParseFlags() *config.Dendrite {
+ flag.Parse()
+
+ if *configPath == "" {
+ logrus.Fatal("--config must be supplied")
+ }
+
+ cfg, err := config.Load(*configPath)
+
+ if err != nil {
+ logrus.Fatalf("Invalid config file: %s", err)
+ }
+
+ return cfg
+}
+
+// ParseMonolithFlags parses the commandline flags and uses them to create a
+// config. Should only be used if running a monolith. See `ParseFlags`.
+func ParseMonolithFlags() *config.Dendrite {
+ flag.Parse()
+
+ if *configPath == "" {
+ logrus.Fatal("--config must be supplied")
+ }
+
+ cfg, err := config.LoadMonolithic(*configPath)
+
+ if err != nil {
+ logrus.Fatalf("Invalid config file: %s", err)
+ }
+
+ return cfg
+}
diff --git a/internal/caching/immutablecache.go b/internal/caching/immutablecache.go
new file mode 100644
index 00000000..fea05dd1
--- /dev/null
+++ b/internal/caching/immutablecache.go
@@ -0,0 +1,17 @@
+package caching
+
+import (
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const (
+ RoomVersionMaxCacheEntries = 1024
+ ServerKeysMaxCacheEntries = 1024
+)
+
+type ImmutableCache interface {
+ GetRoomVersion(roomId string) (gomatrixserverlib.RoomVersion, bool)
+ StoreRoomVersion(roomId string, roomVersion gomatrixserverlib.RoomVersion)
+ GetServerKey(request gomatrixserverlib.PublicKeyLookupRequest) (gomatrixserverlib.PublicKeyLookupResult, bool)
+ StoreServerKey(request gomatrixserverlib.PublicKeyLookupRequest, response gomatrixserverlib.PublicKeyLookupResult)
+}
diff --git a/internal/caching/immutableinmemorylru.go b/internal/caching/immutableinmemorylru.go
new file mode 100644
index 00000000..36cd56dc
--- /dev/null
+++ b/internal/caching/immutableinmemorylru.go
@@ -0,0 +1,95 @@
+package caching
+
+import (
+ "fmt"
+
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+type ImmutableInMemoryLRUCache struct {
+ roomVersions *lru.Cache
+ serverKeys *lru.Cache
+}
+
+func NewImmutableInMemoryLRUCache() (*ImmutableInMemoryLRUCache, error) {
+ roomVersionCache, rvErr := lru.New(RoomVersionMaxCacheEntries)
+ if rvErr != nil {
+ return nil, rvErr
+ }
+ serverKeysCache, rvErr := lru.New(ServerKeysMaxCacheEntries)
+ if rvErr != nil {
+ return nil, rvErr
+ }
+ cache := &ImmutableInMemoryLRUCache{
+ roomVersions: roomVersionCache,
+ serverKeys: serverKeysCache,
+ }
+ cache.configureMetrics()
+ return cache, nil
+}
+
+func (c *ImmutableInMemoryLRUCache) configureMetrics() {
+ promauto.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "caching",
+ Name: "number_room_version_entries",
+ Help: "The number of room version entries cached.",
+ }, func() float64 {
+ return float64(c.roomVersions.Len())
+ })
+
+ promauto.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "caching",
+ Name: "number_server_key_entries",
+ Help: "The number of server key entries cached.",
+ }, func() float64 {
+ return float64(c.serverKeys.Len())
+ })
+}
+
+func checkForInvalidMutation(cache *lru.Cache, key string, value interface{}) {
+ if peek, ok := cache.Peek(key); ok && peek != value {
+ panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key))
+ }
+}
+
+func (c *ImmutableInMemoryLRUCache) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) {
+ val, found := c.roomVersions.Get(roomID)
+ if found && val != nil {
+ if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok {
+ return roomVersion, true
+ }
+ }
+ return "", false
+}
+
+func (c *ImmutableInMemoryLRUCache) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) {
+ checkForInvalidMutation(c.roomVersions, roomID, roomVersion)
+ c.roomVersions.Add(roomID, roomVersion)
+}
+
+func (c *ImmutableInMemoryLRUCache) GetServerKey(
+ request gomatrixserverlib.PublicKeyLookupRequest,
+) (gomatrixserverlib.PublicKeyLookupResult, bool) {
+ key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID)
+ val, found := c.serverKeys.Get(key)
+ if found && val != nil {
+ if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok {
+ return keyLookupResult, true
+ }
+ }
+ return gomatrixserverlib.PublicKeyLookupResult{}, false
+}
+
+func (c *ImmutableInMemoryLRUCache) StoreServerKey(
+ request gomatrixserverlib.PublicKeyLookupRequest,
+ response gomatrixserverlib.PublicKeyLookupResult,
+) {
+ key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID)
+ checkForInvalidMutation(c.roomVersions, key, response)
+ c.serverKeys.Add(request, response)
+}
diff --git a/internal/config/appservice.go b/internal/config/appservice.go
new file mode 100644
index 00000000..bf5f089b
--- /dev/null
+++ b/internal/config/appservice.go
@@ -0,0 +1,329 @@
+// Copyright 2017 Andrew Morgan <andrew@amorgan.xyz>
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "fmt"
+ "io/ioutil"
+ "path/filepath"
+ "regexp"
+ "strings"
+
+ log "github.com/sirupsen/logrus"
+ yaml "gopkg.in/yaml.v2"
+)
+
+// ApplicationServiceNamespace is the namespace that a specific application
+// service has management over.
+type ApplicationServiceNamespace struct {
+ // Whether or not the namespace is managed solely by this application service
+ Exclusive bool `yaml:"exclusive"`
+ // A regex pattern that represents the namespace
+ Regex string `yaml:"regex"`
+ // The ID of an existing group that all users of this application service will
+ // be added to. This field is only relevant to the `users` namespace.
+ // Note that users who are joined to this group through an application service
+ // are not to be listed when querying for the group's members, however the
+ // group should be listed when querying an application service user's groups.
+ // This is to prevent making spamming all users of an application service
+ // trivial.
+ GroupID string `yaml:"group_id"`
+ // Regex object representing our pattern. Saves having to recompile every time
+ RegexpObject *regexp.Regexp
+}
+
+// ApplicationService represents a Matrix application service.
+// https://matrix.org/docs/spec/application_service/unstable.html
+type ApplicationService struct {
+ // User-defined, unique, persistent ID of the application service
+ ID string `yaml:"id"`
+ // Base URL of the application service
+ URL string `yaml:"url"`
+ // Application service token provided in requests to a homeserver
+ ASToken string `yaml:"as_token"`
+ // Homeserver token provided in requests to an application service
+ HSToken string `yaml:"hs_token"`
+ // Localpart of application service user
+ SenderLocalpart string `yaml:"sender_localpart"`
+ // Information about an application service's namespaces. Key is either
+ // "users", "aliases" or "rooms"
+ NamespaceMap map[string][]ApplicationServiceNamespace `yaml:"namespaces"`
+ // Whether rate limiting is applied to each application service user
+ RateLimited bool `yaml:"rate_limited"`
+ // Any custom protocols that this application service provides (e.g. IRC)
+ Protocols []string `yaml:"protocols"`
+}
+
+// IsInterestedInRoomID returns a bool on whether an application service's
+// namespace includes the given room ID
+func (a *ApplicationService) IsInterestedInRoomID(
+ roomID string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["rooms"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.RegexpObject.MatchString(roomID) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// IsInterestedInUserID returns a bool on whether an application service's
+// namespace includes the given user ID
+func (a *ApplicationService) IsInterestedInUserID(
+ userID string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["users"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.RegexpObject.MatchString(userID) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// OwnsNamespaceCoveringUserId returns a bool on whether an application service's
+// namespace is exclusive and includes the given user ID
+func (a *ApplicationService) OwnsNamespaceCoveringUserId(
+ userID string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["users"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.Exclusive && namespace.RegexpObject.MatchString(userID) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// IsInterestedInRoomAlias returns a bool on whether an application service's
+// namespace includes the given room alias
+func (a *ApplicationService) IsInterestedInRoomAlias(
+ roomAlias string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["aliases"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.RegexpObject.MatchString(roomAlias) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// loadAppServices iterates through all application service config files
+// and loads their data into the config object for later access.
+func loadAppServices(config *Dendrite) error {
+ for _, configPath := range config.ApplicationServices.ConfigFiles {
+ // Create a new application service with default options
+ appservice := ApplicationService{
+ RateLimited: true,
+ }
+
+ // Create an absolute path from a potentially relative path
+ absPath, err := filepath.Abs(configPath)
+ if err != nil {
+ return err
+ }
+
+ // Read the application service's config file
+ configData, err := ioutil.ReadFile(absPath)
+ if err != nil {
+ return err
+ }
+
+ // Load the config data into our struct
+ if err = yaml.UnmarshalStrict(configData, &appservice); err != nil {
+ return err
+ }
+
+ // Append the parsed application service to the global config
+ config.Derived.ApplicationServices = append(
+ config.Derived.ApplicationServices, appservice,
+ )
+ }
+
+ // Check for any errors in the loaded application services
+ return checkErrors(config)
+}
+
+// setupRegexps will create regex objects for exclusive and non-exclusive
+// usernames, aliases and rooms of all application services, so that other
+// methods can quickly check if a particular string matches any of them.
+func setupRegexps(cfg *Dendrite) (err error) {
+ // Combine all exclusive namespaces for later string checking
+ var exclusiveUsernameStrings, exclusiveAliasStrings []string
+
+ // If an application service's regex is marked as exclusive, add
+ // its contents to the overall exlusive regex string. Room regex
+ // not necessary as we aren't denying exclusive room ID creation
+ for _, appservice := range cfg.Derived.ApplicationServices {
+ for key, namespaceSlice := range appservice.NamespaceMap {
+ switch key {
+ case "users":
+ appendExclusiveNamespaceRegexs(&exclusiveUsernameStrings, namespaceSlice)
+ case "aliases":
+ appendExclusiveNamespaceRegexs(&exclusiveAliasStrings, namespaceSlice)
+ }
+ }
+ }
+
+ // Join the regexes together into one big regex.
+ // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
+ // Later we can check if a username or alias matches any exclusive regex and
+ // deny access if it isn't from an application service
+ exclusiveUsernames := strings.Join(exclusiveUsernameStrings, "|")
+ exclusiveAliases := strings.Join(exclusiveAliasStrings, "|")
+
+ // If there are no exclusive regexes, compile string so that it will not match
+ // any valid usernames/aliases/roomIDs
+ if exclusiveUsernames == "" {
+ exclusiveUsernames = "^$"
+ }
+ if exclusiveAliases == "" {
+ exclusiveAliases = "^$"
+ }
+
+ // Store compiled Regex
+ if cfg.Derived.ExclusiveApplicationServicesUsernameRegexp, err = regexp.Compile(exclusiveUsernames); err != nil {
+ return err
+ }
+ if cfg.Derived.ExclusiveApplicationServicesAliasRegexp, err = regexp.Compile(exclusiveAliases); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// appendExclusiveNamespaceRegexs takes a slice of strings and a slice of
+// namespaces and will append the regexes of only the exclusive namespaces
+// into the string slice
+func appendExclusiveNamespaceRegexs(
+ exclusiveStrings *[]string, namespaces []ApplicationServiceNamespace,
+) {
+ for index, namespace := range namespaces {
+ if namespace.Exclusive {
+ // We append parenthesis to later separate each regex when we compile
+ // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
+ *exclusiveStrings = append(*exclusiveStrings, "("+namespace.Regex+")")
+ }
+
+ // Compile this regex into a Regexp object for later use
+ namespaces[index].RegexpObject, _ = regexp.Compile(namespace.Regex)
+ }
+}
+
+// checkErrors checks for any configuration errors amongst the loaded
+// application services according to the application service spec.
+func checkErrors(config *Dendrite) (err error) {
+ var idMap = make(map[string]bool)
+ var tokenMap = make(map[string]bool)
+
+ // Compile regexp object for checking groupIDs
+ groupIDRegexp := regexp.MustCompile(`\+.*:.*`)
+
+ // Check each application service for any config errors
+ for _, appservice := range config.Derived.ApplicationServices {
+ // Namespace-related checks
+ for key, namespaceSlice := range appservice.NamespaceMap {
+ for _, namespace := range namespaceSlice {
+ if err := validateNamespace(&appservice, key, &namespace, groupIDRegexp); err != nil {
+ return err
+ }
+ }
+ }
+
+ // Check if the url has trailing /'s. If so, remove them
+ appservice.URL = strings.TrimRight(appservice.URL, "/")
+
+ // Check if we've already seen this ID. No two application services
+ // can have the same ID or token.
+ if idMap[appservice.ID] {
+ return configErrors([]string{fmt.Sprintf(
+ "Application service ID %s must be unique", appservice.ID,
+ )})
+ }
+ // Check if we've already seen this token
+ if tokenMap[appservice.ASToken] {
+ return configErrors([]string{fmt.Sprintf(
+ "Application service Token %s must be unique", appservice.ASToken,
+ )})
+ }
+
+ // Add the id/token to their respective maps if we haven't already
+ // seen them.
+ idMap[appservice.ID] = true
+ tokenMap[appservice.ASToken] = true
+
+ // TODO: Remove once rate_limited is implemented
+ if appservice.RateLimited {
+ log.Warn("WARNING: Application service option rate_limited is currently unimplemented")
+ }
+ // TODO: Remove once protocols is implemented
+ if len(appservice.Protocols) > 0 {
+ log.Warn("WARNING: Application service option protocols is currently unimplemented")
+ }
+ }
+
+ return setupRegexps(config)
+}
+
+// validateNamespace returns nil or an error based on whether a given
+// application service namespace is valid. A namespace is valid if it has the
+// required fields, and its regex is correct.
+func validateNamespace(
+ appservice *ApplicationService,
+ key string,
+ namespace *ApplicationServiceNamespace,
+ groupIDRegexp *regexp.Regexp,
+) error {
+ // Check that namespace(s) are valid regex
+ if !IsValidRegex(namespace.Regex) {
+ return configErrors([]string{fmt.Sprintf(
+ "Invalid regex string for Application Service %s", appservice.ID,
+ )})
+ }
+
+ // Check if GroupID for the users namespace is in the correct format
+ if key == "users" && namespace.GroupID != "" {
+ // TODO: Remove once group_id is implemented
+ log.Warn("WARNING: Application service option group_id is currently unimplemented")
+
+ correctFormat := groupIDRegexp.MatchString(namespace.GroupID)
+ if !correctFormat {
+ return configErrors([]string{fmt.Sprintf(
+ "Invalid user group_id field for application service %s.",
+ appservice.ID,
+ )})
+ }
+ }
+
+ return nil
+}
+
+// IsValidRegex returns true or false based on whether the
+// given string is valid regex or not
+func IsValidRegex(regexString string) bool {
+ _, err := regexp.Compile(regexString)
+
+ return err == nil
+}
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 00000000..e1e96f9d
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,804 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "encoding/pem"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "path/filepath"
+ "regexp"
+ "strings"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ed25519"
+ yaml "gopkg.in/yaml.v2"
+
+ jaegerconfig "github.com/uber/jaeger-client-go/config"
+ jaegermetrics "github.com/uber/jaeger-lib/metrics"
+)
+
+// Version is the current version of the config format.
+// This will change whenever we make breaking changes to the config format.
+const Version = 0
+
+// Dendrite contains all the config used by a dendrite process.
+// Relative paths are resolved relative to the current working directory
+type Dendrite struct {
+ // The version of the configuration file.
+ // If the version in a file doesn't match the current dendrite config
+ // version then we can give a clear error message telling the user
+ // to update their config file to the current version.
+ // The version of the file should only be different if there has
+ // been a breaking change to the config file format.
+ Version int `yaml:"version"`
+
+ // The configuration required for a matrix server.
+ Matrix struct {
+ // The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
+ ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
+ // Path to the private key which will be used to sign requests and events.
+ PrivateKeyPath Path `yaml:"private_key"`
+ // The private key which will be used to sign requests and events.
+ PrivateKey ed25519.PrivateKey `yaml:"-"`
+ // An arbitrary string used to uniquely identify the PrivateKey. Must start with the
+ // prefix "ed25519:".
+ KeyID gomatrixserverlib.KeyID `yaml:"-"`
+ // List of paths to X509 certificates used by the external federation listeners.
+ // These are used to calculate the TLS fingerprints to publish for this server.
+ // Other matrix servers talking to this server will expect the x509 certificate
+ // to match one of these certificates.
+ // The certificates should be in PEM format.
+ FederationCertificatePaths []Path `yaml:"federation_certificates"`
+ // A list of SHA256 TLS fingerprints for the X509 certificates used by the
+ // federation listener for this server.
+ TLSFingerPrints []gomatrixserverlib.TLSFingerprint `yaml:"-"`
+ // How long a remote server can cache our server key for before requesting it again.
+ // Increasing this number will reduce the number of requests made by remote servers
+ // for our key, but increases the period a compromised key will be considered valid
+ // by remote servers.
+ // Defaults to 24 hours.
+ KeyValidityPeriod time.Duration `yaml:"key_validity_period"`
+ // List of domains that the server will trust as identity servers to
+ // verify third-party identifiers.
+ // Defaults to an empty array.
+ TrustedIDServers []string `yaml:"trusted_third_party_id_servers"`
+ // If set, allows registration by anyone who also has the shared
+ // secret, even if registration is otherwise disabled.
+ RegistrationSharedSecret string `yaml:"registration_shared_secret"`
+ // This Home Server's ReCAPTCHA public key.
+ RecaptchaPublicKey string `yaml:"recaptcha_public_key"`
+ // This Home Server's ReCAPTCHA private key.
+ RecaptchaPrivateKey string `yaml:"recaptcha_private_key"`
+ // Boolean stating whether catpcha registration is enabled
+ // and required
+ RecaptchaEnabled bool `yaml:"enable_registration_captcha"`
+ // Secret used to bypass the captcha registration entirely
+ RecaptchaBypassSecret string `yaml:"captcha_bypass_secret"`
+ // HTTP API endpoint used to verify whether the captcha response
+ // was successful
+ RecaptchaSiteVerifyAPI string `yaml:"recaptcha_siteverify_api"`
+ // If set disables new users from registering (except via shared
+ // secrets)
+ RegistrationDisabled bool `yaml:"registration_disabled"`
+ // Perspective keyservers, to use as a backup when direct key fetch
+ // requests don't succeed
+ KeyPerspectives KeyPerspectives `yaml:"key_perspectives"`
+ } `yaml:"matrix"`
+
+ // The configuration specific to the media repostitory.
+ Media struct {
+ // The base path to where the media files will be stored. May be relative or absolute.
+ BasePath Path `yaml:"base_path"`
+ // The absolute base path to where media files will be stored.
+ AbsBasePath Path `yaml:"-"`
+ // The maximum file size in bytes that is allowed to be stored on this server.
+ // Note: if max_file_size_bytes is set to 0, the size is unlimited.
+ // Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB)
+ MaxFileSizeBytes *FileSizeBytes `yaml:"max_file_size_bytes,omitempty"`
+ // Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated
+ DynamicThumbnails bool `yaml:"dynamic_thumbnails"`
+ // The maximum number of simultaneous thumbnail generators. default: 10
+ MaxThumbnailGenerators int `yaml:"max_thumbnail_generators"`
+ // A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content
+ ThumbnailSizes []ThumbnailSize `yaml:"thumbnail_sizes"`
+ } `yaml:"media"`
+
+ // The configuration to use for Prometheus metrics
+ Metrics struct {
+ // Whether or not the metrics are enabled
+ Enabled bool `yaml:"enabled"`
+ // Use BasicAuth for Authorization
+ BasicAuth struct {
+ // Authorization via Static Username & Password
+ // Hardcoded Username and Password
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ } `yaml:"basic_auth"`
+ } `yaml:"metrics"`
+
+ // The configuration for talking to kafka.
+ Kafka struct {
+ // A list of kafka addresses to connect to.
+ Addresses []string `yaml:"addresses"`
+ // Whether to use naffka instead of kafka.
+ // Naffka can only be used when running dendrite as a single monolithic server.
+ // Kafka can be used both with a monolithic server and when running the
+ // components as separate servers.
+ UseNaffka bool `yaml:"use_naffka,omitempty"`
+ // The names of the topics to use when reading and writing from kafka.
+ Topics struct {
+ // Topic for roomserver/api.OutputRoomEvent events.
+ OutputRoomEvent Topic `yaml:"output_room_event"`
+ // Topic for sending account data from client API to sync API
+ OutputClientData Topic `yaml:"output_client_data"`
+ // Topic for eduserver/api.OutputTypingEvent events.
+ OutputTypingEvent Topic `yaml:"output_typing_event"`
+ // Topic for user updates (profile, presence)
+ UserUpdates Topic `yaml:"user_updates"`
+ }
+ } `yaml:"kafka"`
+
+ // Postgres Config
+ Database struct {
+ // The Account database stores the login details and account information
+ // for local users. It is accessed by the ClientAPI.
+ Account DataSource `yaml:"account"`
+ // The Device database stores session information for the devices of logged
+ // in local users. It is accessed by the ClientAPI, the MediaAPI and the SyncAPI.
+ Device DataSource `yaml:"device"`
+ // The MediaAPI database stores information about files uploaded and downloaded
+ // by local users. It is only accessed by the MediaAPI.
+ MediaAPI DataSource `yaml:"media_api"`
+ // The ServerKey database caches the public keys of remote servers.
+ // It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI.
+ ServerKey DataSource `yaml:"server_key"`
+ // The SyncAPI stores information used by the SyncAPI server.
+ // It is only accessed by the SyncAPI server.
+ SyncAPI DataSource `yaml:"sync_api"`
+ // The RoomServer database stores information about matrix rooms.
+ // It is only accessed by the RoomServer.
+ RoomServer DataSource `yaml:"room_server"`
+ // The FederationSender database stores information used by the FederationSender
+ // It is only accessed by the FederationSender.
+ FederationSender DataSource `yaml:"federation_sender"`
+ // The AppServices database stores information used by the AppService component.
+ // It is only accessed by the AppService component.
+ AppService DataSource `yaml:"appservice"`
+ // The PublicRoomsAPI database stores information used to compute the public
+ // room directory. It is only accessed by the PublicRoomsAPI server.
+ PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
+ // The Naffka database is used internally by the naffka library, if used.
+ Naffka DataSource `yaml:"naffka,omitempty"`
+ // Maximum open connections to the DB (0 = use default, negative means unlimited)
+ MaxOpenConns int `yaml:"max_open_conns"`
+ // Maximum idle connections to the DB (0 = use default, negative means unlimited)
+ MaxIdleConns int `yaml:"max_idle_conns"`
+ // maximum amount of time (in seconds) a connection may be reused (<= 0 means unlimited)
+ ConnMaxLifetimeSec int `yaml:"conn_max_lifetime"`
+ } `yaml:"database"`
+
+ // TURN Server Config
+ TURN struct {
+ // TODO Guest Support
+ // Whether or not guests can request TURN credentials
+ //AllowGuests bool `yaml:"turn_allow_guests"`
+ // How long the authorization should last
+ UserLifetime string `yaml:"turn_user_lifetime"`
+ // The list of TURN URIs to pass to clients
+ URIs []string `yaml:"turn_uris"`
+
+ // Authorization via Shared Secret
+ // The shared secret from coturn
+ SharedSecret string `yaml:"turn_shared_secret"`
+
+ // Authorization via Static Username & Password
+ // Hardcoded Username and Password
+ Username string `yaml:"turn_username"`
+ Password string `yaml:"turn_password"`
+ } `yaml:"turn"`
+
+ // The internal addresses the components will listen on.
+ // These should not be exposed externally as they expose metrics and debugging APIs.
+ // Falls back to addresses listed in Listen if not specified
+ Bind struct {
+ MediaAPI Address `yaml:"media_api"`
+ ClientAPI Address `yaml:"client_api"`
+ FederationAPI Address `yaml:"federation_api"`
+ AppServiceAPI Address `yaml:"appservice_api"`
+ SyncAPI Address `yaml:"sync_api"`
+ RoomServer Address `yaml:"room_server"`
+ FederationSender Address `yaml:"federation_sender"`
+ PublicRoomsAPI Address `yaml:"public_rooms_api"`
+ EDUServer Address `yaml:"edu_server"`
+ KeyServer Address `yaml:"key_server"`
+ } `yaml:"bind"`
+
+ // The addresses for talking to other microservices.
+ Listen struct {
+ MediaAPI Address `yaml:"media_api"`
+ ClientAPI Address `yaml:"client_api"`
+ FederationAPI Address `yaml:"federation_api"`
+ AppServiceAPI Address `yaml:"appservice_api"`
+ SyncAPI Address `yaml:"sync_api"`
+ RoomServer Address `yaml:"room_server"`
+ FederationSender Address `yaml:"federation_sender"`
+ PublicRoomsAPI Address `yaml:"public_rooms_api"`
+ EDUServer Address `yaml:"edu_server"`
+ KeyServer Address `yaml:"key_server"`
+ } `yaml:"listen"`
+
+ // The config for tracing the dendrite servers.
+ Tracing struct {
+ // Set to true to enable tracer hooks. If false, no tracing is set up.
+ Enabled bool `yaml:"enabled"`
+ // The config for the jaeger opentracing reporter.
+ Jaeger jaegerconfig.Configuration `yaml:"jaeger"`
+ } `yaml:"tracing"`
+
+ // Application Services
+ // https://matrix.org/docs/spec/application_service/unstable.html
+ ApplicationServices struct {
+ // Configuration files for various application services
+ ConfigFiles []string `yaml:"config_files"`
+ } `yaml:"application_services"`
+
+ // The config for logging informations. Each hook will be added to logrus.
+ Logging []LogrusHook `yaml:"logging"`
+
+ // Any information derived from the configuration options for later use.
+ Derived struct {
+ Registration struct {
+ // Flows is a slice of flows, which represent one possible way that the client can authenticate a request.
+ // http://matrix.org/docs/spec/HEAD/client_server/r0.3.0.html#user-interactive-authentication-api
+ // As long as the generated flows only rely on config file options,
+ // we can generate them on startup and store them until needed
+ Flows []authtypes.Flow `json:"flows"`
+
+ // Params that need to be returned to the client during
+ // registration in order to complete registration stages.
+ Params map[string]interface{} `json:"params"`
+ }
+
+ // Application services parsed from their config files
+ // The paths of which were given above in the main config file
+ ApplicationServices []ApplicationService
+
+ // Meta-regexes compiled from all exclusive application service
+ // Regexes.
+ //
+ // When a user registers, we check that their username does not match any
+ // exclusive application service namespaces
+ ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
+ // When a user creates a room alias, we check that it isn't already
+ // reserved by an application service
+ ExclusiveApplicationServicesAliasRegexp *regexp.Regexp
+ // Note: An Exclusive Regex for room ID isn't necessary as we aren't blocking
+ // servers from creating RoomIDs in exclusive application service namespaces
+ } `yaml:"-"`
+}
+
+// KeyPerspectives are used to configure perspective key servers for
+// retrieving server keys.
+type KeyPerspectives []struct {
+ // The server name of the perspective key server
+ ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
+ // Server keys for the perspective user, used to verify the
+ // keys have been signed by the perspective server
+ Keys []struct {
+ // The key ID, e.g. ed25519:auto
+ KeyID gomatrixserverlib.KeyID `yaml:"key_id"`
+ // The public key in base64 unpadded format
+ PublicKey string `yaml:"public_key"`
+ } `yaml:"keys"`
+}
+
+// A Path on the filesystem.
+type Path string
+
+// A DataSource for opening a postgresql database using lib/pq.
+type DataSource string
+
+// A Topic in kafka.
+type Topic string
+
+// An Address to listen on.
+type Address string
+
+// FileSizeBytes is a file size in bytes
+type FileSizeBytes int64
+
+// ThumbnailSize contains a single thumbnail size configuration
+type ThumbnailSize struct {
+ // Maximum width of the thumbnail image
+ Width int `yaml:"width"`
+ // Maximum height of the thumbnail image
+ Height int `yaml:"height"`
+ // ResizeMethod is one of crop or scale.
+ // crop scales to fill the requested dimensions and crops the excess.
+ // scale scales to fit the requested dimensions and one dimension may be smaller than requested.
+ ResizeMethod string `yaml:"method,omitempty"`
+}
+
+// LogrusHook represents a single logrus hook. At this point, only parsing and
+// verification of the proper values for type and level are done.
+// Validity/integrity checks on the parameters are done when configuring logrus.
+type LogrusHook struct {
+ // The type of hook, currently only "file" is supported.
+ Type string `yaml:"type"`
+
+ // The level of the logs to produce. Will output only this level and above.
+ Level string `yaml:"level"`
+
+ // The parameters for this hook.
+ Params map[string]interface{} `yaml:"params"`
+}
+
+// configErrors stores problems encountered when parsing a config file.
+// It implements the error interface.
+type configErrors []string
+
+// Load a yaml config file for a server run as multiple processes.
+// Checks the config to ensure that it is valid.
+// The checks are different if the server is run as a monolithic process instead
+// of being split into multiple components
+func Load(configPath string) (*Dendrite, error) {
+ configData, err := ioutil.ReadFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+ basePath, err := filepath.Abs(".")
+ if err != nil {
+ return nil, err
+ }
+ // Pass the current working directory and ioutil.ReadFile so that they can
+ // be mocked in the tests
+ monolithic := false
+ return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
+}
+
+// LoadMonolithic loads a yaml config file for a server run as a single monolith.
+// Checks the config to ensure that it is valid.
+// The checks are different if the server is run as a monolithic process instead
+// of being split into multiple components
+func LoadMonolithic(configPath string) (*Dendrite, error) {
+ configData, err := ioutil.ReadFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+ basePath, err := filepath.Abs(".")
+ if err != nil {
+ return nil, err
+ }
+ // Pass the current working directory and ioutil.ReadFile so that they can
+ // be mocked in the tests
+ monolithic := true
+ return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
+}
+
+func loadConfig(
+ basePath string,
+ configData []byte,
+ readFile func(string) ([]byte, error),
+ monolithic bool,
+) (*Dendrite, error) {
+ var config Dendrite
+ var err error
+ if err = yaml.Unmarshal(configData, &config); err != nil {
+ return nil, err
+ }
+
+ config.SetDefaults()
+
+ if err = config.check(monolithic); err != nil {
+ return nil, err
+ }
+
+ privateKeyPath := absPath(basePath, config.Matrix.PrivateKeyPath)
+ privateKeyData, err := readFile(privateKeyPath)
+ if err != nil {
+ return nil, err
+ }
+
+ if config.Matrix.KeyID, config.Matrix.PrivateKey, err = readKeyPEM(privateKeyPath, privateKeyData); err != nil {
+ return nil, err
+ }
+
+ for _, certPath := range config.Matrix.FederationCertificatePaths {
+ absCertPath := absPath(basePath, certPath)
+ var pemData []byte
+ pemData, err = readFile(absCertPath)
+ if err != nil {
+ return nil, err
+ }
+ fingerprint := fingerprintPEM(pemData)
+ if fingerprint == nil {
+ return nil, fmt.Errorf("no certificate PEM data in %q", absCertPath)
+ }
+ config.Matrix.TLSFingerPrints = append(config.Matrix.TLSFingerPrints, *fingerprint)
+ }
+
+ config.Media.AbsBasePath = Path(absPath(basePath, config.Media.BasePath))
+
+ // Generate data from config options
+ err = config.Derive()
+ if err != nil {
+ return nil, err
+ }
+
+ return &config, nil
+}
+
+// Derive generates data that is derived from various values provided in
+// the config file.
+func (config *Dendrite) Derive() error {
+ // Determine registrations flows based off config values
+
+ config.Derived.Registration.Params = make(map[string]interface{})
+
+ // TODO: Add email auth type
+ // TODO: Add MSISDN auth type
+
+ if config.Matrix.RecaptchaEnabled {
+ config.Derived.Registration.Params[authtypes.LoginTypeRecaptcha] = map[string]string{"public_key": config.Matrix.RecaptchaPublicKey}
+ config.Derived.Registration.Flows = append(config.Derived.Registration.Flows,
+ authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeRecaptcha}})
+ } else {
+ config.Derived.Registration.Flows = append(config.Derived.Registration.Flows,
+ authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeDummy}})
+ }
+
+ // Load application service configuration files
+ if err := loadAppServices(config); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// SetDefaults sets default config values if they are not explicitly set.
+func (config *Dendrite) SetDefaults() {
+ if config.Matrix.KeyValidityPeriod == 0 {
+ config.Matrix.KeyValidityPeriod = 24 * time.Hour
+ }
+
+ if config.Matrix.TrustedIDServers == nil {
+ config.Matrix.TrustedIDServers = []string{}
+ }
+
+ if config.Media.MaxThumbnailGenerators == 0 {
+ config.Media.MaxThumbnailGenerators = 10
+ }
+
+ if config.Media.MaxFileSizeBytes == nil {
+ defaultMaxFileSizeBytes := FileSizeBytes(10485760)
+ config.Media.MaxFileSizeBytes = &defaultMaxFileSizeBytes
+ }
+
+ if config.Database.MaxIdleConns == 0 {
+ config.Database.MaxIdleConns = 2
+ }
+
+ if config.Database.MaxOpenConns == 0 {
+ config.Database.MaxOpenConns = 100
+ }
+
+}
+
+// Error returns a string detailing how many errors were contained within a
+// configErrors type.
+func (errs configErrors) Error() string {
+ if len(errs) == 1 {
+ return errs[0]
+ }
+ return fmt.Sprintf(
+ "%s (and %d other problems)", errs[0], len(errs)-1,
+ )
+}
+
+// Add appends an error to the list of errors in this configErrors.
+// It is a wrapper to the builtin append and hides pointers from
+// the client code.
+// This method is safe to use with an uninitialized configErrors because
+// if it is nil, it will be properly allocated.
+func (errs *configErrors) Add(str string) {
+ *errs = append(*errs, str)
+}
+
+// checkNotEmpty verifies the given value is not empty in the configuration.
+// If it is, adds an error to the list.
+func checkNotEmpty(configErrs *configErrors, key, value string) {
+ if value == "" {
+ configErrs.Add(fmt.Sprintf("missing config key %q", key))
+ }
+}
+
+// checkNotZero verifies the given value is not zero in the configuration.
+// If it is, adds an error to the list.
+func checkNotZero(configErrs *configErrors, key string, value int64) {
+ if value == 0 {
+ configErrs.Add(fmt.Sprintf("missing config key %q", key))
+ }
+}
+
+// checkPositive verifies the given value is positive (zero included)
+// in the configuration. If it is not, adds an error to the list.
+func checkPositive(configErrs *configErrors, key string, value int64) {
+ if value < 0 {
+ configErrs.Add(fmt.Sprintf("invalid value for config key %q: %d", key, value))
+ }
+}
+
+// checkTurn verifies the parameters turn.* are valid.
+func (config *Dendrite) checkTurn(configErrs *configErrors) {
+ value := config.TURN.UserLifetime
+ if value != "" {
+ if _, err := time.ParseDuration(value); err != nil {
+ configErrs.Add(fmt.Sprintf("invalid duration for config key %q: %s", "turn.turn_user_lifetime", value))
+ }
+ }
+}
+
+// checkMatrix verifies the parameters matrix.* are valid.
+func (config *Dendrite) checkMatrix(configErrs *configErrors) {
+ checkNotEmpty(configErrs, "matrix.server_name", string(config.Matrix.ServerName))
+ checkNotEmpty(configErrs, "matrix.private_key", string(config.Matrix.PrivateKeyPath))
+ checkNotZero(configErrs, "matrix.federation_certificates", int64(len(config.Matrix.FederationCertificatePaths)))
+ if config.Matrix.RecaptchaEnabled {
+ checkNotEmpty(configErrs, "matrix.recaptcha_public_key", string(config.Matrix.RecaptchaPublicKey))
+ checkNotEmpty(configErrs, "matrix.recaptcha_private_key", string(config.Matrix.RecaptchaPrivateKey))
+ checkNotEmpty(configErrs, "matrix.recaptcha_siteverify_api", string(config.Matrix.RecaptchaSiteVerifyAPI))
+ }
+}
+
+// checkMedia verifies the parameters media.* are valid.
+func (config *Dendrite) checkMedia(configErrs *configErrors) {
+ checkNotEmpty(configErrs, "media.base_path", string(config.Media.BasePath))
+ checkPositive(configErrs, "media.max_file_size_bytes", int64(*config.Media.MaxFileSizeBytes))
+ checkPositive(configErrs, "media.max_thumbnail_generators", int64(config.Media.MaxThumbnailGenerators))
+
+ for i, size := range config.Media.ThumbnailSizes {
+ checkPositive(configErrs, fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width))
+ checkPositive(configErrs, fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height))
+ }
+}
+
+// checkKafka verifies the parameters kafka.* and the related
+// database.naffka are valid.
+func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) {
+
+ if config.Kafka.UseNaffka {
+ if !monolithic {
+ configErrs.Add(fmt.Sprintf("naffka can only be used in a monolithic server"))
+ }
+
+ checkNotEmpty(configErrs, "database.naffka", string(config.Database.Naffka))
+ } else {
+ // If we aren't using naffka then we need to have at least one kafka
+ // server to talk to.
+ checkNotZero(configErrs, "kafka.addresses", int64(len(config.Kafka.Addresses)))
+ }
+ checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
+ checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
+ checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent))
+ checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
+}
+
+// checkDatabase verifies the parameters database.* are valid.
+func (config *Dendrite) checkDatabase(configErrs *configErrors) {
+ checkNotEmpty(configErrs, "database.account", string(config.Database.Account))
+ checkNotEmpty(configErrs, "database.device", string(config.Database.Device))
+ checkNotEmpty(configErrs, "database.server_key", string(config.Database.ServerKey))
+ checkNotEmpty(configErrs, "database.media_api", string(config.Database.MediaAPI))
+ checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI))
+ checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer))
+}
+
+// checkListen verifies the parameters listen.* are valid.
+func (config *Dendrite) checkListen(configErrs *configErrors) {
+ checkNotEmpty(configErrs, "listen.media_api", string(config.Listen.MediaAPI))
+ checkNotEmpty(configErrs, "listen.client_api", string(config.Listen.ClientAPI))
+ checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI))
+ checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI))
+ checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer))
+ checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer))
+}
+
+// checkLogging verifies the parameters logging.* are valid.
+func (config *Dendrite) checkLogging(configErrs *configErrors) {
+ for _, logrusHook := range config.Logging {
+ checkNotEmpty(configErrs, "logging.type", string(logrusHook.Type))
+ checkNotEmpty(configErrs, "logging.level", string(logrusHook.Level))
+ }
+}
+
+// check returns an error type containing all errors found within the config
+// file.
+func (config *Dendrite) check(monolithic bool) error {
+ var configErrs configErrors
+
+ if config.Version != Version {
+ configErrs.Add(fmt.Sprintf(
+ "unknown config version %q, expected %q", config.Version, Version,
+ ))
+ return configErrs
+ }
+
+ config.checkMatrix(&configErrs)
+ config.checkMedia(&configErrs)
+ config.checkTurn(&configErrs)
+ config.checkKafka(&configErrs, monolithic)
+ config.checkDatabase(&configErrs)
+ config.checkLogging(&configErrs)
+
+ if !monolithic {
+ config.checkListen(&configErrs)
+ }
+
+ // Due to how Golang manages its interface types, this condition is not redundant.
+ // In order to get the proper behaviour, it is necessary to return an explicit nil
+ // and not a nil configErrors.
+ // This is because the following equalities hold:
+ // error(nil) == nil
+ // error(configErrors(nil)) != nil
+ if configErrs != nil {
+ return configErrs
+ }
+ return nil
+}
+
+// absPath returns the absolute path for a given relative or absolute path.
+func absPath(dir string, path Path) string {
+ if filepath.IsAbs(string(path)) {
+ // filepath.Join cleans the path so we should clean the absolute paths as well for consistency.
+ return filepath.Clean(string(path))
+ }
+ return filepath.Join(dir, string(path))
+}
+
+func readKeyPEM(path string, data []byte) (gomatrixserverlib.KeyID, ed25519.PrivateKey, error) {
+ for {
+ var keyBlock *pem.Block
+ keyBlock, data = pem.Decode(data)
+ if data == nil {
+ return "", nil, fmt.Errorf("no matrix private key PEM data in %q", path)
+ }
+ if keyBlock == nil {
+ return "", nil, fmt.Errorf("keyBlock is nil %q", path)
+ }
+ if keyBlock.Type == "MATRIX PRIVATE KEY" {
+ keyID := keyBlock.Headers["Key-ID"]
+ if keyID == "" {
+ return "", nil, fmt.Errorf("missing key ID in PEM data in %q", path)
+ }
+ if !strings.HasPrefix(keyID, "ed25519:") {
+ return "", nil, fmt.Errorf("key ID %q doesn't start with \"ed25519:\" in %q", keyID, path)
+ }
+ _, privKey, err := ed25519.GenerateKey(bytes.NewReader(keyBlock.Bytes))
+ if err != nil {
+ return "", nil, err
+ }
+ return gomatrixserverlib.KeyID(keyID), privKey, nil
+ }
+ }
+}
+
+func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint {
+ for {
+ var certDERBlock *pem.Block
+ certDERBlock, data = pem.Decode(data)
+ if data == nil {
+ return nil
+ }
+ if certDERBlock.Type == "CERTIFICATE" {
+ digest := sha256.Sum256(certDERBlock.Bytes)
+ return &gomatrixserverlib.TLSFingerprint{SHA256: digest[:]}
+ }
+ }
+}
+
+// AppServiceURL returns a HTTP URL for where the appservice component is listening.
+func (config *Dendrite) AppServiceURL() string {
+ // Hard code the appservice server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return "http://" + string(config.Listen.AppServiceAPI)
+}
+
+// RoomServerURL returns an HTTP URL for where the roomserver is listening.
+func (config *Dendrite) RoomServerURL() string {
+ // Hard code the roomserver to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return "http://" + string(config.Listen.RoomServer)
+}
+
+// EDUServerURL returns an HTTP URL for where the EDU server is listening.
+func (config *Dendrite) EDUServerURL() string {
+ // Hard code the EDU server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return "http://" + string(config.Listen.EDUServer)
+}
+
+// FederationSenderURL returns an HTTP URL for where the federation sender is listening.
+func (config *Dendrite) FederationSenderURL() string {
+ // Hard code the federation sender server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return "http://" + string(config.Listen.FederationSender)
+}
+
+// SetupTracing configures the opentracing using the supplied configuration.
+func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
+ if !config.Tracing.Enabled {
+ return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
+ }
+ return config.Tracing.Jaeger.InitGlobalTracer(
+ serviceName,
+ jaegerconfig.Logger(logrusLogger{logrus.StandardLogger()}),
+ jaegerconfig.Metrics(jaegermetrics.NullFactory),
+ )
+}
+
+// MaxIdleConns returns maximum idle connections to the DB
+func (config Dendrite) MaxIdleConns() int {
+ return config.Database.MaxIdleConns
+}
+
+// MaxOpenConns returns maximum open connections to the DB
+func (config Dendrite) MaxOpenConns() int {
+ return config.Database.MaxOpenConns
+}
+
+// ConnMaxLifetime returns maximum amount of time a connection may be reused
+func (config Dendrite) ConnMaxLifetime() time.Duration {
+ return time.Duration(config.Database.ConnMaxLifetimeSec) * time.Second
+}
+
+// DbProperties functions return properties used by database/sql/DB
+type DbProperties interface {
+ MaxIdleConns() int
+ MaxOpenConns() int
+ ConnMaxLifetime() time.Duration
+}
+
+// DbProperties returns cfg as a DbProperties interface
+func (config Dendrite) DbProperties() DbProperties {
+ return config
+}
+
+// logrusLogger is a small wrapper that implements jaeger.Logger using logrus.
+type logrusLogger struct {
+ l *logrus.Logger
+}
+
+func (l logrusLogger) Error(msg string) {
+ l.l.Error(msg)
+}
+
+func (l logrusLogger) Infof(msg string, args ...interface{}) {
+ l.l.Infof(msg, args...)
+}
diff --git a/internal/config/config_test.go b/internal/config/config_test.go
new file mode 100644
index 00000000..b72f5fad
--- /dev/null
+++ b/internal/config/config_test.go
@@ -0,0 +1,145 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLoadConfigRelative(t *testing.T) {
+ _, err := loadConfig("/my/config/dir", []byte(testConfig),
+ mockReadFile{
+ "/my/config/dir/matrix_key.pem": testKey,
+ "/my/config/dir/tls_cert.pem": testCert,
+ }.readFile,
+ false,
+ )
+ if err != nil {
+ t.Error("failed to load config:", err)
+ }
+}
+
+const testConfig = `
+version: 0
+matrix:
+ server_name: localhost
+ private_key: matrix_key.pem
+ federation_certificates: [tls_cert.pem]
+media:
+ base_path: media_store
+kafka:
+ addresses: ["localhost:9092"]
+ topics:
+ output_room_event: output.room
+ output_client_data: output.client
+ output_typing_event: output.typing
+ user_updates: output.user
+database:
+ media_api: "postgresql:///media_api"
+ account: "postgresql:///account"
+ device: "postgresql:///device"
+ server_key: "postgresql:///server_keys"
+ sync_api: "postgresql:///syn_api"
+ room_server: "postgresql:///room_server"
+ appservice: "postgresql:///appservice"
+listen:
+ room_server: "localhost:7770"
+ client_api: "localhost:7771"
+ federation_api: "localhost:7772"
+ sync_api: "localhost:7773"
+ media_api: "localhost:7774"
+ appservice_api: "localhost:7777"
+ edu_server: "localhost:7778"
+logging:
+ - type: "file"
+ level: "info"
+ params:
+ path: "/my/log/dir"
+`
+
+type mockReadFile map[string]string
+
+func (m mockReadFile) readFile(path string) ([]byte, error) {
+ data, ok := m[path]
+ if !ok {
+ return nil, fmt.Errorf("no such file %q", path)
+ }
+ return []byte(data), nil
+}
+
+func TestReadKey(t *testing.T) {
+ keyID, _, err := readKeyPEM("path/to/key", []byte(testKey))
+ if err != nil {
+ t.Error("failed to load private key:", err)
+ }
+ wantKeyID := testKeyID
+ if wantKeyID != string(keyID) {
+ t.Errorf("wanted key ID to be %q, got %q", wantKeyID, keyID)
+ }
+}
+
+const testKeyID = "ed25519:c8NsuQ"
+
+const testKey = `
+-----BEGIN MATRIX PRIVATE KEY-----
+Key-ID: ` + testKeyID + `
+7KRZiZ2sTyRR8uqqUjRwczuwRXXkUMYIUHq4Mc3t4bE=
+-----END MATRIX PRIVATE KEY-----
+`
+
+func TestFingerprintPEM(t *testing.T) {
+ got := fingerprintPEM([]byte(testCert))
+ if got == nil {
+ t.Error("failed to calculate fingerprint")
+ }
+ if string(got.SHA256) != testCertFingerprint {
+ t.Errorf("bad fingerprint: wanted %q got %q", got, testCertFingerprint)
+ }
+
+}
+
+const testCertFingerprint = "56.\\SPQxE\xd4\x95\xfb\xf6\xd5\x04\x91\xcb/\x07\xb1^\x88\x08\xe3\xc1p\xdfY\x04\x19w\xcb"
+
+const testCert = `
+-----BEGIN CERTIFICATE-----
+MIIE0zCCArugAwIBAgIJAPype3u24LJeMA0GCSqGSIb3DQEBCwUAMAAwHhcNMTcw
+NjEzMTQyODU4WhcNMTgwNjEzMTQyODU4WjAAMIICIjANBgkqhkiG9w0BAQEFAAOC
+Ag8AMIICCgKCAgEA3vNSr7lCh/alxPFqairp/PYohwdsqPvOD7zf7dJCNhy0gbdC
+9/APwIbPAPL9nU+o9ud1ACNCKBCQin/9LnI5vd5pa/Ne+mmRADDLB/BBBoywSJWG
+NSfKJ9n3XY1bjgtqi53uUh+RDdQ7sXudDqCUxiiJZmS7oqK/mp88XXAgCbuXUY29
+GmzbbDz37vntuSxDgUOnJ8uPSvRp5YPKogA3JwW1SyrlLt4Z30CQ6nH3Y2Q5SVfJ
+NIQyMrnwyjA9bCdXezv1cLXoTYn7U9BRyzXTZeXs3y3ldnRfISXN35CU04Az1F8j
+lfj7nXMEqI/qAj/qhxZ8nVBB+rpNOZy9RJko3O+G5Qa/EvzkQYV1rW4TM2Yme88A
+QyJspoV/0bXk6gG987PonK2Uk5djxSULhnGVIqswydyH0Nzb+slRp2bSoWbaNlee
++6TIeiyTQYc055pCHOp22gtLrC5LQGchksi02St2ZzRHdnlfqCJ8S9sS7x3trzds
+cYueg1sGI+O8szpQ3eUM7OhJOBrx6OlR7+QYnQg1wr/V+JAz1qcyTC1URcwfeqtg
+QjxFdBD9LfCtfK+AO51H9ugtsPJqOh33PmvfvUBEM05OHCA0lNaWJHROGpm4T4cc
+YQI9JQk/0lB7itF1qK5RG74qgKdjkBkfZxi0OqkUgHk6YHtJlKfET8zfrtcCAwEA
+AaNQME4wHQYDVR0OBBYEFGwb0NgH0Zr7Ga23njEJ85Ozf8M9MB8GA1UdIwQYMBaA
+FGwb0NgH0Zr7Ga23njEJ85Ozf8M9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEL
+BQADggIBAKU3RHXggbq/pLhGinU5q/9QT0TB/0bBnF1wNFkKQC0FrNJ+ZnBNmusy
+oqOn7DEohBCCDxT0kgOC05gLEsGLkSXlVyqCsPFfycCFhtu1QzSRtQNRxB3pW3Wq
+4/RFVYv0PGBjVBKxImQlEmXJWEDwemGKqDQZPtqR/FTHTbJcaT0xQr5+1oG6lawt
+I/2cW6GQ0kYW/Szps8FgNdSNgVqCjjNIzBYbWhRWMx/63qD1ReUbY7/Yw9KKT8nK
+zXERpbTM9k+Pnm0g9Gep+9HJ1dBFJeuTPugKeSeyqg2OJbENw1hxGs/HjBXw7580
+ioiMn/kMj6Tg/f3HCfKrdHHBFQw0/fJW6o17QImYIpPOPzc5RjXBrCJWb34kxqEd
+NQdKgejWiV/LlVsguIF8hVZH2kRzvoyypkVUtSUYGmjvA5UXoORQZfJ+b41llq1B
+GcSF6iaVbAFKnsUyyr1i9uHz/6Muqflphv/SfZxGheIn5u3PnhXrzDagvItjw0NS
+n0Xq64k7fc42HXJpF8CGBkSaIhtlzcruO+vqR80B9r62+D0V7VmHOnP135MT6noU
+8F0JQfEtP+I8NII5jHSF/khzSgP5g80LS9tEc2ILnIHK1StkInAoRQQ+/HsQsgbz
+ANAf5kxmMsM0zlN2hkxl0H6o7wKlBSw3RI3cjfilXiMWRPJrzlc4
+-----END CERTIFICATE-----
+`
diff --git a/internal/consumers.go b/internal/consumers.go
new file mode 100644
index 00000000..df68cbfa
--- /dev/null
+++ b/internal/consumers.go
@@ -0,0 +1,125 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/Shopify/sarama"
+)
+
+// A PartitionOffset is the offset into a partition of the input log.
+type PartitionOffset struct {
+ // The ID of the partition.
+ Partition int32
+ // The offset into the partition.
+ Offset int64
+}
+
+// A PartitionStorer has the storage APIs needed by the consumer.
+type PartitionStorer interface {
+ // PartitionOffsets returns the offsets the consumer has reached for each partition.
+ PartitionOffsets(ctx context.Context, topic string) ([]PartitionOffset, error)
+ // SetPartitionOffset records where the consumer has reached for a partition.
+ SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error
+}
+
+// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
+// remember the offset it reached.
+type ContinualConsumer struct {
+ // The kafkaesque topic to consume events from.
+ // This is the name used in kafka to identify the stream to consume events from.
+ Topic string
+ // A kafkaesque stream consumer providing the APIs for talking to the event source.
+ // The interface is taken from a client library for Apache Kafka.
+ // But any equivalent event streaming protocol could be made to implement the same interface.
+ Consumer sarama.Consumer
+ // A thing which can load and save partition offsets for a topic.
+ PartitionStore PartitionStorer
+ // ProcessMessage is a function which will be called for each message in the log. Return an error to
+ // stop processing messages. See ErrShutdown for specific control signals.
+ ProcessMessage func(msg *sarama.ConsumerMessage) error
+ // ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
+ // It is optional.
+ ShutdownCallback func()
+}
+
+// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
+var ErrShutdown = fmt.Errorf("shutdown")
+
+// Start starts the consumer consuming.
+// Starts up a goroutine for each partition in the kafka stream.
+// Returns nil once all the goroutines are started.
+// Returns an error if it can't start consuming for any of the partitions.
+func (c *ContinualConsumer) Start() error {
+ offsets := map[int32]int64{}
+
+ partitions, err := c.Consumer.Partitions(c.Topic)
+ if err != nil {
+ return err
+ }
+ for _, partition := range partitions {
+ // Default all the offsets to the beginning of the stream.
+ offsets[partition] = sarama.OffsetOldest
+ }
+
+ storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
+ if err != nil {
+ return err
+ }
+ for _, offset := range storedOffsets {
+ // We've already processed events from this partition so advance the offset to where we got to.
+ // ConsumePartition will start streaming from the message with the given offset (inclusive),
+ // so increment 1 to avoid getting the same message a second time.
+ offsets[offset.Partition] = 1 + offset.Offset
+ }
+
+ var partitionConsumers []sarama.PartitionConsumer
+ for partition, offset := range offsets {
+ pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset)
+ if err != nil {
+ for _, p := range partitionConsumers {
+ p.Close() // nolint: errcheck
+ }
+ return err
+ }
+ partitionConsumers = append(partitionConsumers, pc)
+ }
+ for _, pc := range partitionConsumers {
+ go c.consumePartition(pc)
+ }
+
+ return nil
+}
+
+// consumePartition consumes the room events for a single partition of the kafkaesque stream.
+func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
+ defer pc.Close() // nolint: errcheck
+ for message := range pc.Messages() {
+ msgErr := c.ProcessMessage(message)
+ // Advance our position in the stream so that we will start at the right position after a restart.
+ if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
+ panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err))
+ }
+ // Shutdown if we were told to do so.
+ if msgErr == ErrShutdown {
+ if c.ShutdownCallback != nil {
+ c.ShutdownCallback()
+ }
+ return
+ }
+ }
+}
diff --git a/internal/eventcontent.go b/internal/eventcontent.go
new file mode 100644
index 00000000..64512836
--- /dev/null
+++ b/internal/eventcontent.go
@@ -0,0 +1,86 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import "github.com/matrix-org/gomatrixserverlib"
+
+// NameContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-name
+type NameContent struct {
+ Name string `json:"name"`
+}
+
+// TopicContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-topic
+type TopicContent struct {
+ Topic string `json:"topic"`
+}
+
+// GuestAccessContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-guest-access
+type GuestAccessContent struct {
+ GuestAccess string `json:"guest_access"`
+}
+
+// HistoryVisibilityContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-history-visibility
+type HistoryVisibilityContent struct {
+ HistoryVisibility string `json:"history_visibility"`
+}
+
+// CanonicalAlias is the event content for https://matrix.org/docs/spec/client_server/r0.6.0#m-room-canonical-alias
+type CanonicalAlias struct {
+ Alias string `json:"alias"`
+}
+
+// InitialPowerLevelsContent returns the initial values for m.room.power_levels on room creation
+// if they have not been specified.
+// http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-power-levels
+// https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/handlers/room.py#L294
+func InitialPowerLevelsContent(roomCreator string) (c gomatrixserverlib.PowerLevelContent) {
+ c.Defaults()
+ c.Events = map[string]int64{
+ "m.room.name": 50,
+ "m.room.power_levels": 100,
+ "m.room.history_visibility": 100,
+ "m.room.canonical_alias": 50,
+ "m.room.avatar": 50,
+ "m.room.aliases": 0, // anyone can publish aliases by default. Has to be 0 else state_default is used.
+ }
+ c.Users = map[string]int64{roomCreator: 100}
+ return c
+}
+
+// AliasesContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-aliases
+type AliasesContent struct {
+ Aliases []string `json:"aliases"`
+}
+
+// CanonicalAliasContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-canonical-alias
+type CanonicalAliasContent struct {
+ Alias string `json:"alias"`
+}
+
+// AvatarContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar
+type AvatarContent struct {
+ Info ImageInfo `json:"info,omitempty"`
+ URL string `json:"url"`
+ ThumbnailURL string `json:"thumbnail_url,omitempty"`
+ ThumbnailInfo ImageInfo `json:"thumbnail_info,omitempty"`
+}
+
+// ImageInfo implements the ImageInfo structure from http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar
+type ImageInfo struct {
+ Mimetype string `json:"mimetype"`
+ Height int64 `json:"h"`
+ Width int64 `json:"w"`
+ Size int64 `json:"size"`
+}
diff --git a/internal/events.go b/internal/events.go
new file mode 100644
index 00000000..89c82e03
--- /dev/null
+++ b/internal/events.go
@@ -0,0 +1,150 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// ErrRoomNoExists is returned when trying to lookup the state of a room that
+// doesn't exist
+var ErrRoomNoExists = errors.New("Room does not exist")
+
+// BuildEvent builds a Matrix event using the event builder and roomserver query
+// API client provided. If also fills roomserver query API response (if provided)
+// in case the function calling FillBuilder needs to use it.
+// Returns ErrRoomNoExists if the state of the room could not be retrieved because
+// the room doesn't exist
+// Returns an error if something else went wrong
+func BuildEvent(
+ ctx context.Context,
+ builder *gomatrixserverlib.EventBuilder, cfg *config.Dendrite, evTime time.Time,
+ rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse,
+) (*gomatrixserverlib.Event, error) {
+ if queryRes == nil {
+ queryRes = &api.QueryLatestEventsAndStateResponse{}
+ }
+
+ err := AddPrevEventsToEvent(ctx, builder, rsAPI, queryRes)
+ if err != nil {
+ // This can pass through a ErrRoomNoExists to the caller
+ return nil, err
+ }
+
+ event, err := builder.Build(
+ evTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID,
+ cfg.Matrix.PrivateKey, queryRes.RoomVersion,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return &event, nil
+}
+
+// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder
+func AddPrevEventsToEvent(
+ ctx context.Context,
+ builder *gomatrixserverlib.EventBuilder,
+ rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse,
+) error {
+ eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err)
+ }
+
+ if len(eventsNeeded.Tuples()) == 0 {
+ return errors.New("expecting state tuples for event builder, got none")
+ }
+
+ // Ask the roomserver for information about this room
+ queryReq := api.QueryLatestEventsAndStateRequest{
+ RoomID: builder.RoomID,
+ StateToFetch: eventsNeeded.Tuples(),
+ }
+ if err = rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil {
+ return fmt.Errorf("rsAPI.QueryLatestEventsAndState: %w", err)
+ }
+
+ if !queryRes.RoomExists {
+ return ErrRoomNoExists
+ }
+
+ eventFormat, err := queryRes.RoomVersion.EventFormat()
+ if err != nil {
+ return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err)
+ }
+
+ builder.Depth = queryRes.Depth
+
+ authEvents := gomatrixserverlib.NewAuthEvents(nil)
+
+ for i := range queryRes.StateEvents {
+ err = authEvents.AddEvent(&queryRes.StateEvents[i].Event)
+ if err != nil {
+ return fmt.Errorf("authEvents.AddEvent: %w", err)
+ }
+ }
+
+ refs, err := eventsNeeded.AuthEventReferences(&authEvents)
+ if err != nil {
+ return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err)
+ }
+
+ truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents)
+ switch eventFormat {
+ case gomatrixserverlib.EventFormatV1:
+ builder.AuthEvents = truncAuth
+ builder.PrevEvents = truncPrev
+ case gomatrixserverlib.EventFormatV2:
+ v2AuthRefs, v2PrevRefs := []string{}, []string{}
+ for _, ref := range truncAuth {
+ v2AuthRefs = append(v2AuthRefs, ref.EventID)
+ }
+ for _, ref := range truncPrev {
+ v2PrevRefs = append(v2PrevRefs, ref.EventID)
+ }
+ builder.AuthEvents = v2AuthRefs
+ builder.PrevEvents = v2PrevRefs
+ }
+
+ return nil
+}
+
+// truncateAuthAndPrevEvents limits the number of events we add into
+// an event as prev_events or auth_events.
+// NOTSPEC: The limits here feel a bit arbitrary but they are currently
+// here because of https://github.com/matrix-org/matrix-doc/issues/2307
+// and because Synapse will just drop events that don't comply.
+func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) (
+ truncAuth, truncPrev []gomatrixserverlib.EventReference,
+) {
+ truncAuth, truncPrev = auth, prev
+ if len(truncAuth) > 10 {
+ truncAuth = truncAuth[:10]
+ }
+ if len(truncPrev) > 20 {
+ truncPrev = truncPrev[:20]
+ }
+ return
+}
diff --git a/internal/http/http.go b/internal/http/http.go
new file mode 100644
index 00000000..3c647544
--- /dev/null
+++ b/internal/http/http.go
@@ -0,0 +1,57 @@
+package http
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+
+ opentracing "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+)
+
+// PostJSON performs a POST request with JSON on an internal HTTP API
+func PostJSON(
+ ctx context.Context, span opentracing.Span, httpClient *http.Client,
+ apiURL string, request, response interface{},
+) error {
+ jsonBytes, err := json.Marshal(request)
+ if err != nil {
+ return err
+ }
+
+ req, err := http.NewRequest(http.MethodPost, apiURL, bytes.NewReader(jsonBytes))
+ if err != nil {
+ return err
+ }
+
+ // Mark the span as being an RPC client.
+ ext.SpanKindRPCClient.Set(span)
+ carrier := opentracing.HTTPHeadersCarrier(req.Header)
+ tracer := opentracing.GlobalTracer()
+
+ if err = tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
+ return err
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+
+ res, err := httpClient.Do(req.WithContext(ctx))
+ if res != nil {
+ defer (func() { err = res.Body.Close() })()
+ }
+ if err != nil {
+ return err
+ }
+ if res.StatusCode != http.StatusOK {
+ var errorBody struct {
+ Message string `json:"message"`
+ }
+ if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil {
+ return err
+ }
+ return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message)
+ }
+ return json.NewDecoder(res.Body).Decode(response)
+}
diff --git a/internal/httpapi.go b/internal/httpapi.go
new file mode 100644
index 00000000..b4c53f58
--- /dev/null
+++ b/internal/httpapi.go
@@ -0,0 +1,234 @@
+package internal
+
+import (
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "net/http/httputil"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/auth"
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ opentracing "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
+)
+
+// BasicAuth is used for authorization on /metrics handlers
+type BasicAuth struct {
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+}
+
+// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request.
+func MakeAuthAPI(
+ metricsName string, data auth.Data,
+ f func(*http.Request, *authtypes.Device) util.JSONResponse,
+) http.Handler {
+ h := func(req *http.Request) util.JSONResponse {
+ device, err := auth.VerifyUserFromRequest(req, data)
+ if err != nil {
+ return *err
+ }
+ // add the user ID to the logger
+ logger := util.GetLogger((req.Context()))
+ logger = logger.WithField("user_id", device.UserID)
+ req = req.WithContext(util.ContextWithLogger(req.Context(), logger))
+
+ return f(req, device)
+ }
+ return MakeExternalAPI(metricsName, h)
+}
+
+// MakeExternalAPI turns a util.JSONRequestHandler function into an http.Handler.
+// This is used for APIs that are called from the internet.
+func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler {
+ // TODO: We shouldn't be directly reading env vars here, inject it in instead.
+ // Refactor this when we split out config structs.
+ verbose := false
+ if os.Getenv("DENDRITE_TRACE_HTTP") == "1" {
+ verbose = true
+ }
+ h := util.MakeJSONAPI(util.NewJSONRequestHandler(f))
+ withSpan := func(w http.ResponseWriter, req *http.Request) {
+ nextWriter := w
+ if verbose {
+ logger := logrus.NewEntry(logrus.StandardLogger())
+ // Log outgoing response
+ rec := httptest.NewRecorder()
+ nextWriter = rec
+ defer func() {
+ resp := rec.Result()
+ dump, err := httputil.DumpResponse(resp, true)
+ if err != nil {
+ logger.Debugf("Failed to dump outgoing response: %s", err)
+ } else {
+ strSlice := strings.Split(string(dump), "\n")
+ for _, s := range strSlice {
+ logger.Debug(s)
+ }
+ }
+ // copy the response to the client
+ for hdr, vals := range resp.Header {
+ for _, val := range vals {
+ w.Header().Add(hdr, val)
+ }
+ }
+ w.WriteHeader(resp.StatusCode)
+ // discard errors as this is for debugging
+ _, _ = io.Copy(w, resp.Body)
+ _ = resp.Body.Close()
+ }()
+
+ // Log incoming request
+ dump, err := httputil.DumpRequest(req, true)
+ if err != nil {
+ logger.Debugf("Failed to dump incoming request: %s", err)
+ } else {
+ strSlice := strings.Split(string(dump), "\n")
+ for _, s := range strSlice {
+ logger.Debug(s)
+ }
+ }
+ }
+
+ span := opentracing.StartSpan(metricsName)
+ defer span.Finish()
+ req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span))
+ h.ServeHTTP(nextWriter, req)
+
+ }
+
+ return http.HandlerFunc(withSpan)
+}
+
+// MakeHTMLAPI adds Span metrics to the HTML Handler function
+// This is used to serve HTML alongside JSON error messages
+func MakeHTMLAPI(metricsName string, f func(http.ResponseWriter, *http.Request) *util.JSONResponse) http.Handler {
+ withSpan := func(w http.ResponseWriter, req *http.Request) {
+ span := opentracing.StartSpan(metricsName)
+ defer span.Finish()
+ req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span))
+ if err := f(w, req); err != nil {
+ h := util.MakeJSONAPI(util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {
+ return *err
+ }))
+ h.ServeHTTP(w, req)
+ }
+ }
+
+ return promhttp.InstrumentHandlerCounter(
+ promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: metricsName,
+ Help: "Total number of http requests for HTML resources",
+ },
+ []string{"code"},
+ ),
+ http.HandlerFunc(withSpan),
+ )
+}
+
+// MakeInternalAPI turns a util.JSONRequestHandler function into an http.Handler.
+// This is used for APIs that are internal to dendrite.
+// If we are passed a tracing context in the request headers then we use that
+// as the parent of any tracing spans we create.
+func MakeInternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler {
+ h := util.MakeJSONAPI(util.NewJSONRequestHandler(f))
+ withSpan := func(w http.ResponseWriter, req *http.Request) {
+ carrier := opentracing.HTTPHeadersCarrier(req.Header)
+ tracer := opentracing.GlobalTracer()
+ clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
+ var span opentracing.Span
+ if err == nil {
+ // Default to a span without RPC context.
+ span = tracer.StartSpan(metricsName)
+ } else {
+ // Set the RPC context.
+ span = tracer.StartSpan(metricsName, ext.RPCServerOption(clientContext))
+ }
+ defer span.Finish()
+ req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span))
+ h.ServeHTTP(w, req)
+ }
+
+ return http.HandlerFunc(withSpan)
+}
+
+// MakeFedAPI makes an http.Handler that checks matrix federation authentication.
+func MakeFedAPI(
+ metricsName string,
+ serverName gomatrixserverlib.ServerName,
+ keyRing gomatrixserverlib.KeyRing,
+ f func(*http.Request, *gomatrixserverlib.FederationRequest) util.JSONResponse,
+) http.Handler {
+ h := func(req *http.Request) util.JSONResponse {
+ fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
+ req, time.Now(), serverName, keyRing,
+ )
+ if fedReq == nil {
+ return errResp
+ }
+ return f(req, fedReq)
+ }
+ return MakeExternalAPI(metricsName, h)
+}
+
+// SetupHTTPAPI registers an HTTP API mux under /api and sets up a metrics
+// listener.
+func SetupHTTPAPI(servMux *http.ServeMux, apiMux http.Handler, cfg *config.Dendrite) {
+ if cfg.Metrics.Enabled {
+ servMux.Handle("/metrics", WrapHandlerInBasicAuth(promhttp.Handler(), cfg.Metrics.BasicAuth))
+ }
+ servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
+}
+
+// WrapHandlerInBasicAuth adds basic auth to a handler. Only used for /metrics
+func WrapHandlerInBasicAuth(h http.Handler, b BasicAuth) http.HandlerFunc {
+ if b.Username == "" || b.Password == "" {
+ logrus.Warn("Metrics are exposed without protection. Make sure you set up protection at proxy level.")
+ }
+ return func(w http.ResponseWriter, r *http.Request) {
+ // Serve without authorization if either Username or Password is unset
+ if b.Username == "" || b.Password == "" {
+ h.ServeHTTP(w, r)
+ return
+ }
+ user, pass, ok := r.BasicAuth()
+
+ if !ok || user != b.Username || pass != b.Password {
+ http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
+ return
+ }
+ h.ServeHTTP(w, r)
+ }
+}
+
+// WrapHandlerInCORS adds CORS headers to all responses, including all error
+// responses.
+// Handles OPTIONS requests directly.
+func WrapHandlerInCORS(h http.Handler) http.HandlerFunc {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
+ w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization")
+
+ if r.Method == http.MethodOptions && r.Header.Get("Access-Control-Request-Method") != "" {
+ // Its easiest just to always return a 200 OK for everything. Whether
+ // this is technically correct or not is a question, but in the end this
+ // is what a lot of other people do (including synapse) and the clients
+ // are perfectly happy with it.
+ w.WriteHeader(http.StatusOK)
+ } else {
+ h.ServeHTTP(w, r)
+ }
+ })
+}
diff --git a/internal/httpapi_test.go b/internal/httpapi_test.go
new file mode 100644
index 00000000..6f159c8d
--- /dev/null
+++ b/internal/httpapi_test.go
@@ -0,0 +1,95 @@
+package internal
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "testing"
+)
+
+func TestWrapHandlerInBasicAuth(t *testing.T) {
+ type args struct {
+ h http.Handler
+ b BasicAuth
+ }
+
+ dummyHandler := http.HandlerFunc(func(h http.ResponseWriter, r *http.Request) {
+ h.WriteHeader(http.StatusOK)
+ })
+
+ tests := []struct {
+ name string
+ args args
+ want int
+ reqAuth bool
+ }{
+ {
+ name: "no user or password setup",
+ args: args{h: dummyHandler},
+ want: http.StatusOK,
+ reqAuth: false,
+ },
+ {
+ name: "only user set",
+ args: args{
+ h: dummyHandler,
+ b: BasicAuth{Username: "test"}, // no basic auth
+ },
+ want: http.StatusOK,
+ reqAuth: false,
+ },
+ {
+ name: "only pass set",
+ args: args{
+ h: dummyHandler,
+ b: BasicAuth{Password: "test"}, // no basic auth
+ },
+ want: http.StatusOK,
+ reqAuth: false,
+ },
+ {
+ name: "credentials correct",
+ args: args{
+ h: dummyHandler,
+ b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled
+ },
+ want: http.StatusOK,
+ reqAuth: true,
+ },
+ {
+ name: "credentials wrong",
+ args: args{
+ h: dummyHandler,
+ b: BasicAuth{Username: "test1", Password: "test"}, // basic auth enabled
+ },
+ want: http.StatusForbidden,
+ reqAuth: true,
+ },
+ {
+ name: "no basic auth in request",
+ args: args{
+ h: dummyHandler,
+ b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled
+ },
+ want: http.StatusForbidden,
+ reqAuth: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ baHandler := WrapHandlerInBasicAuth(tt.args.h, tt.args.b)
+
+ req := httptest.NewRequest("GET", "http://localhost/metrics", nil)
+ if tt.reqAuth {
+ req.SetBasicAuth("test", "test")
+ }
+
+ w := httptest.NewRecorder()
+ baHandler(w, req)
+ resp := w.Result()
+
+ if resp.StatusCode != tt.want {
+ t.Errorf("Expected status code %d, got %d", resp.StatusCode, tt.want)
+ }
+ })
+ }
+}
diff --git a/internal/keydb/cache/keydb.go b/internal/keydb/cache/keydb.go
new file mode 100644
index 00000000..87573ed2
--- /dev/null
+++ b/internal/keydb/cache/keydb.go
@@ -0,0 +1,69 @@
+package cache
+
+import (
+ "context"
+ "errors"
+
+ "github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/keydb"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// A Database implements gomatrixserverlib.KeyDatabase and is used to store
+// the public keys for other matrix servers.
+type KeyDatabase struct {
+ inner keydb.Database
+ cache caching.ImmutableCache
+}
+
+func NewKeyDatabase(inner keydb.Database, cache caching.ImmutableCache) (*KeyDatabase, error) {
+ if inner == nil {
+ return nil, errors.New("inner database can't be nil")
+ }
+ if cache == nil {
+ return nil, errors.New("cache can't be nil")
+ }
+ return &KeyDatabase{
+ inner: inner,
+ cache: cache,
+ }, nil
+}
+
+// FetcherName implements KeyFetcher
+func (d KeyDatabase) FetcherName() string {
+ return "InMemoryKeyCache"
+}
+
+// FetchKeys implements gomatrixserverlib.KeyDatabase
+func (d *KeyDatabase) FetchKeys(
+ ctx context.Context,
+ requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
+) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
+ results := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult)
+ for req := range requests {
+ if res, cached := d.cache.GetServerKey(req); cached {
+ results[req] = res
+ delete(requests, req)
+ }
+ }
+ fromDB, err := d.inner.FetchKeys(ctx, requests)
+ if err != nil {
+ return results, err
+ }
+ for req, res := range fromDB {
+ results[req] = res
+ d.cache.StoreServerKey(req, res)
+ }
+ return results, nil
+}
+
+// StoreKeys implements gomatrixserverlib.KeyDatabase
+func (d *KeyDatabase) StoreKeys(
+ ctx context.Context,
+ keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
+) error {
+ for req, res := range keyMap {
+ d.cache.StoreServerKey(req, res)
+ }
+ return d.inner.StoreKeys(ctx, keyMap)
+}
diff --git a/internal/keydb/interface.go b/internal/keydb/interface.go
new file mode 100644
index 00000000..c9a20fdd
--- /dev/null
+++ b/internal/keydb/interface.go
@@ -0,0 +1,13 @@
+package keydb
+
+import (
+ "context"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type Database interface {
+ FetcherName() string
+ FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
+ StoreKeys(ctx context.Context, keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error
+}
diff --git a/internal/keydb/keydb.go b/internal/keydb/keydb.go
new file mode 100644
index 00000000..ad6d56b8
--- /dev/null
+++ b/internal/keydb/keydb.go
@@ -0,0 +1,50 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !wasm
+
+package keydb
+
+import (
+ "net/url"
+
+ "golang.org/x/crypto/ed25519"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/keydb/postgres"
+ "github.com/matrix-org/dendrite/internal/keydb/sqlite3"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// NewDatabase opens a database connection.
+func NewDatabase(
+ dataSourceName string,
+ dbProperties internal.DbProperties,
+ serverName gomatrixserverlib.ServerName,
+ serverKey ed25519.PublicKey,
+ serverKeyID gomatrixserverlib.KeyID,
+) (Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return postgres.NewDatabase(dataSourceName, dbProperties, serverName, serverKey, serverKeyID)
+ }
+ switch uri.Scheme {
+ case "postgres":
+ return postgres.NewDatabase(dataSourceName, dbProperties, serverName, serverKey, serverKeyID)
+ case "file":
+ return sqlite3.NewDatabase(dataSourceName, serverName, serverKey, serverKeyID)
+ default:
+ return postgres.NewDatabase(dataSourceName, dbProperties, serverName, serverKey, serverKeyID)
+ }
+}
diff --git a/internal/keydb/keydb_wasm.go b/internal/keydb/keydb_wasm.go
new file mode 100644
index 00000000..349381ee
--- /dev/null
+++ b/internal/keydb/keydb_wasm.go
@@ -0,0 +1,48 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package keydb
+
+import (
+ "fmt"
+ "net/url"
+
+ "golang.org/x/crypto/ed25519"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/keydb/sqlite3"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// NewDatabase opens a database connection.
+func NewDatabase(
+ dataSourceName string,
+ dbProperties internal.DbProperties, // nolint:unparam
+ serverName gomatrixserverlib.ServerName,
+ serverKey ed25519.PublicKey,
+ serverKeyID gomatrixserverlib.KeyID,
+) (Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return nil, err
+ }
+ switch uri.Scheme {
+ case "postgres":
+ return nil, fmt.Errorf("Cannot use postgres implementation")
+ case "file":
+ return sqlite3.NewDatabase(dataSourceName, serverName, serverKey, serverKeyID)
+ default:
+ return nil, fmt.Errorf("Cannot use postgres implementation")
+ }
+}
diff --git a/internal/keydb/keyring.go b/internal/keydb/keyring.go
new file mode 100644
index 00000000..d0b1904e
--- /dev/null
+++ b/internal/keydb/keyring.go
@@ -0,0 +1,74 @@
+// Copyright 2017 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package keydb
+
+import (
+ "encoding/base64"
+
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ed25519"
+)
+
+// CreateKeyRing creates and configures a KeyRing object.
+//
+// It creates the necessary key fetchers and collects them into a KeyRing
+// backed by the given KeyDatabase.
+func CreateKeyRing(client gomatrixserverlib.Client,
+ keyDB gomatrixserverlib.KeyDatabase,
+ cfg config.KeyPerspectives) gomatrixserverlib.KeyRing {
+
+ fetchers := gomatrixserverlib.KeyRing{
+ KeyFetchers: []gomatrixserverlib.KeyFetcher{
+ &gomatrixserverlib.DirectKeyFetcher{
+ Client: client,
+ },
+ },
+ KeyDatabase: keyDB,
+ }
+
+ logrus.Info("Enabled direct key fetcher")
+
+ var b64e = base64.StdEncoding.WithPadding(base64.NoPadding)
+ for _, ps := range cfg {
+ perspective := &gomatrixserverlib.PerspectiveKeyFetcher{
+ PerspectiveServerName: ps.ServerName,
+ PerspectiveServerKeys: map[gomatrixserverlib.KeyID]ed25519.PublicKey{},
+ Client: client,
+ }
+
+ for _, key := range ps.Keys {
+ rawkey, err := b64e.DecodeString(key.PublicKey)
+ if err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "server_name": ps.ServerName,
+ "public_key": key.PublicKey,
+ }).Warn("Couldn't parse perspective key")
+ continue
+ }
+ perspective.PerspectiveServerKeys[key.KeyID] = rawkey
+ }
+
+ fetchers.KeyFetchers = append(fetchers.KeyFetchers, perspective)
+
+ logrus.WithFields(logrus.Fields{
+ "server_name": ps.ServerName,
+ "num_public_keys": len(ps.Keys),
+ }).Info("Enabled perspective key fetcher")
+ }
+
+ return fetchers
+}
diff --git a/internal/keydb/postgres/keydb.go b/internal/keydb/postgres/keydb.go
new file mode 100644
index 00000000..da3a4d37
--- /dev/null
+++ b/internal/keydb/postgres/keydb.go
@@ -0,0 +1,115 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "time"
+
+ "golang.org/x/crypto/ed25519"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// A Database implements gomatrixserverlib.KeyDatabase and is used to store
+// the public keys for other matrix servers.
+type Database struct {
+ statements serverKeyStatements
+}
+
+// NewDatabase prepares a new key database.
+// It creates the necessary tables if they don't already exist.
+// It prepares all the SQL statements that it will use.
+// Returns an error if there was a problem talking to the database.
+func NewDatabase(
+ dataSourceName string,
+ dbProperties internal.DbProperties,
+ serverName gomatrixserverlib.ServerName,
+ serverKey ed25519.PublicKey,
+ serverKeyID gomatrixserverlib.KeyID,
+) (*Database, error) {
+ db, err := sqlutil.Open("postgres", dataSourceName, dbProperties)
+ if err != nil {
+ return nil, err
+ }
+ d := &Database{}
+ err = d.statements.prepare(db)
+ if err != nil {
+ return nil, err
+ }
+ // Store our own keys so that we don't end up making HTTP requests to find our
+ // own keys
+ index := gomatrixserverlib.PublicKeyLookupRequest{
+ ServerName: serverName,
+ KeyID: serverKeyID,
+ }
+ value := gomatrixserverlib.PublicKeyLookupResult{
+ VerifyKey: gomatrixserverlib.VerifyKey{
+ Key: gomatrixserverlib.Base64String(serverKey),
+ },
+ ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(100 * 365 * 24 * time.Hour)),
+ ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
+ }
+ err = d.StoreKeys(
+ context.Background(),
+ map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{
+ index: value,
+ },
+ )
+ if err != nil {
+ return nil, err
+ }
+ return d, nil
+}
+
+// FetcherName implements KeyFetcher
+func (d Database) FetcherName() string {
+ return "PostgresKeyDatabase"
+}
+
+// FetchKeys implements gomatrixserverlib.KeyDatabase
+func (d *Database) FetchKeys(
+ ctx context.Context,
+ requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
+) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
+ return d.statements.bulkSelectServerKeys(ctx, requests)
+}
+
+// StoreKeys implements gomatrixserverlib.KeyDatabase
+func (d *Database) StoreKeys(
+ ctx context.Context,
+ keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
+) error {
+ // TODO: Inserting all the keys within a single transaction may
+ // be more efficient since the transaction overhead can be quite
+ // high for a single insert statement.
+ var lastErr error
+ for request, keys := range keyMap {
+ if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil {
+ // Rather than returning immediately on error we try to insert the
+ // remaining keys.
+ // Since we are inserting the keys outside of a transaction it is
+ // possible for some of the inserts to succeed even though some
+ // of the inserts have failed.
+ // Ensuring that we always insert all the keys we can means that
+ // this behaviour won't depend on the iteration order of the map.
+ lastErr = err
+ }
+ }
+ return lastErr
+}
diff --git a/internal/keydb/postgres/server_key_table.go b/internal/keydb/postgres/server_key_table.go
new file mode 100644
index 00000000..b3c26a48
--- /dev/null
+++ b/internal/keydb/postgres/server_key_table.go
@@ -0,0 +1,144 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const serverKeysSchema = `
+-- A cache of signing keys downloaded from remote servers.
+CREATE TABLE IF NOT EXISTS keydb_server_keys (
+ -- The name of the matrix server the key is for.
+ server_name TEXT NOT NULL,
+ -- The ID of the server key.
+ server_key_id TEXT NOT NULL,
+ -- Combined server name and key ID separated by the ASCII unit separator
+ -- to make it easier to run bulk queries.
+ server_name_and_key_id TEXT NOT NULL,
+ -- When the key is valid until as a millisecond timestamp.
+ -- 0 if this is an expired key (in which case expired_ts will be non-zero)
+ valid_until_ts BIGINT NOT NULL,
+ -- When the key expired as a millisecond timestamp.
+ -- 0 if this is an active key (in which case valid_until_ts will be non-zero)
+ expired_ts BIGINT NOT NULL,
+ -- The base64-encoded public key.
+ server_key TEXT NOT NULL,
+ CONSTRAINT keydb_server_keys_unique UNIQUE (server_name, server_key_id)
+);
+
+CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id);
+`
+
+const bulkSelectServerKeysSQL = "" +
+ "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " +
+ " server_key FROM keydb_server_keys" +
+ " WHERE server_name_and_key_id = ANY($1)"
+
+const upsertServerKeysSQL = "" +
+ "INSERT INTO keydb_server_keys (server_name, server_key_id," +
+ " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" +
+ " VALUES ($1, $2, $3, $4, $5, $6)" +
+ " ON CONFLICT ON CONSTRAINT keydb_server_keys_unique" +
+ " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6"
+
+type serverKeyStatements struct {
+ bulkSelectServerKeysStmt *sql.Stmt
+ upsertServerKeysStmt *sql.Stmt
+}
+
+func (s *serverKeyStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(serverKeysSchema)
+ if err != nil {
+ return
+ }
+ if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerKeysSQL); err != nil {
+ return
+ }
+ if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *serverKeyStatements) bulkSelectServerKeys(
+ ctx context.Context,
+ requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
+) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
+ var nameAndKeyIDs []string
+ for request := range requests {
+ nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request))
+ }
+ stmt := s.bulkSelectServerKeysStmt
+ rows, err := stmt.QueryContext(ctx, pq.StringArray(nameAndKeyIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectServerKeys: rows.close() failed")
+ results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{}
+ for rows.Next() {
+ var serverName string
+ var keyID string
+ var key string
+ var validUntilTS int64
+ var expiredTS int64
+ if err = rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil {
+ return nil, err
+ }
+ r := gomatrixserverlib.PublicKeyLookupRequest{
+ ServerName: gomatrixserverlib.ServerName(serverName),
+ KeyID: gomatrixserverlib.KeyID(keyID),
+ }
+ vk := gomatrixserverlib.VerifyKey{}
+ err = vk.Key.Decode(key)
+ if err != nil {
+ return nil, err
+ }
+ results[r] = gomatrixserverlib.PublicKeyLookupResult{
+ VerifyKey: vk,
+ ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS),
+ ExpiredTS: gomatrixserverlib.Timestamp(expiredTS),
+ }
+ }
+ return results, rows.Err()
+}
+
+func (s *serverKeyStatements) upsertServerKeys(
+ ctx context.Context,
+ request gomatrixserverlib.PublicKeyLookupRequest,
+ key gomatrixserverlib.PublicKeyLookupResult,
+) error {
+ _, err := s.upsertServerKeysStmt.ExecContext(
+ ctx,
+ string(request.ServerName),
+ string(request.KeyID),
+ nameAndKeyID(request),
+ key.ValidUntilTS,
+ key.ExpiredTS,
+ key.Key.Encode(),
+ )
+ return err
+}
+
+func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string {
+ return string(request.ServerName) + "\x1F" + string(request.KeyID)
+}
diff --git a/internal/keydb/sqlite3/keydb.go b/internal/keydb/sqlite3/keydb.go
new file mode 100644
index 00000000..d1dc61c9
--- /dev/null
+++ b/internal/keydb/sqlite3/keydb.go
@@ -0,0 +1,116 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "time"
+
+ "golang.org/x/crypto/ed25519"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ _ "github.com/mattn/go-sqlite3"
+)
+
+// A Database implements gomatrixserverlib.KeyDatabase and is used to store
+// the public keys for other matrix servers.
+type Database struct {
+ statements serverKeyStatements
+}
+
+// NewDatabase prepares a new key database.
+// It creates the necessary tables if they don't already exist.
+// It prepares all the SQL statements that it will use.
+// Returns an error if there was a problem talking to the database.
+func NewDatabase(
+ dataSourceName string,
+ serverName gomatrixserverlib.ServerName,
+ serverKey ed25519.PublicKey,
+ serverKeyID gomatrixserverlib.KeyID,
+) (*Database, error) {
+ db, err := sqlutil.Open(internal.SQLiteDriverName(), dataSourceName, nil)
+ if err != nil {
+ return nil, err
+ }
+ d := &Database{}
+ err = d.statements.prepare(db)
+ if err != nil {
+ return nil, err
+ }
+ // Store our own keys so that we don't end up making HTTP requests to find our
+ // own keys
+ index := gomatrixserverlib.PublicKeyLookupRequest{
+ ServerName: serverName,
+ KeyID: serverKeyID,
+ }
+ value := gomatrixserverlib.PublicKeyLookupResult{
+ VerifyKey: gomatrixserverlib.VerifyKey{
+ Key: gomatrixserverlib.Base64String(serverKey),
+ },
+ ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(100 * 365 * 24 * time.Hour)),
+ ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
+ }
+ err = d.StoreKeys(
+ context.Background(),
+ map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{
+ index: value,
+ },
+ )
+ if err != nil {
+ return nil, err
+ }
+ return d, nil
+}
+
+// FetcherName implements KeyFetcher
+func (d Database) FetcherName() string {
+ return "SqliteKeyDatabase"
+}
+
+// FetchKeys implements gomatrixserverlib.KeyDatabase
+func (d *Database) FetchKeys(
+ ctx context.Context,
+ requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
+) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
+ return d.statements.bulkSelectServerKeys(ctx, requests)
+}
+
+// StoreKeys implements gomatrixserverlib.KeyDatabase
+func (d *Database) StoreKeys(
+ ctx context.Context,
+ keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
+) error {
+ // TODO: Inserting all the keys within a single transaction may
+ // be more efficient since the transaction overhead can be quite
+ // high for a single insert statement.
+ var lastErr error
+ for request, keys := range keyMap {
+ if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil {
+ // Rather than returning immediately on error we try to insert the
+ // remaining keys.
+ // Since we are inserting the keys outside of a transaction it is
+ // possible for some of the inserts to succeed even though some
+ // of the inserts have failed.
+ // Ensuring that we always insert all the keys we can means that
+ // this behaviour won't depend on the iteration order of the map.
+ lastErr = err
+ }
+ }
+ return lastErr
+}
diff --git a/internal/keydb/sqlite3/server_key_table.go b/internal/keydb/sqlite3/server_key_table.go
new file mode 100644
index 00000000..ae24a14d
--- /dev/null
+++ b/internal/keydb/sqlite3/server_key_table.go
@@ -0,0 +1,152 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "strings"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const serverKeysSchema = `
+-- A cache of signing keys downloaded from remote servers.
+CREATE TABLE IF NOT EXISTS keydb_server_keys (
+ -- The name of the matrix server the key is for.
+ server_name TEXT NOT NULL,
+ -- The ID of the server key.
+ server_key_id TEXT NOT NULL,
+ -- Combined server name and key ID separated by the ASCII unit separator
+ -- to make it easier to run bulk queries.
+ server_name_and_key_id TEXT NOT NULL,
+ -- When the key is valid until as a millisecond timestamp.
+ -- 0 if this is an expired key (in which case expired_ts will be non-zero)
+ valid_until_ts BIGINT NOT NULL,
+ -- When the key expired as a millisecond timestamp.
+ -- 0 if this is an active key (in which case valid_until_ts will be non-zero)
+ expired_ts BIGINT NOT NULL,
+ -- The base64-encoded public key.
+ server_key TEXT NOT NULL,
+ UNIQUE (server_name, server_key_id)
+);
+
+CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id);
+`
+
+const bulkSelectServerKeysSQL = "" +
+ "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " +
+ " server_key FROM keydb_server_keys" +
+ " WHERE server_name_and_key_id IN ($1)"
+
+const upsertServerKeysSQL = "" +
+ "INSERT INTO keydb_server_keys (server_name, server_key_id," +
+ " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" +
+ " VALUES ($1, $2, $3, $4, $5, $6)" +
+ " ON CONFLICT (server_name, server_key_id)" +
+ " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6"
+
+type serverKeyStatements struct {
+ db *sql.DB
+ bulkSelectServerKeysStmt *sql.Stmt
+ upsertServerKeysStmt *sql.Stmt
+}
+
+func (s *serverKeyStatements) prepare(db *sql.DB) (err error) {
+ s.db = db
+ _, err = db.Exec(serverKeysSchema)
+ if err != nil {
+ return
+ }
+ if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerKeysSQL); err != nil {
+ return
+ }
+ if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *serverKeyStatements) bulkSelectServerKeys(
+ ctx context.Context,
+ requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
+) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
+ var nameAndKeyIDs []string
+ for request := range requests {
+ nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request))
+ }
+
+ query := strings.Replace(bulkSelectServerKeysSQL, "($1)", internal.QueryVariadic(len(nameAndKeyIDs)), 1)
+
+ iKeyIDs := make([]interface{}, len(nameAndKeyIDs))
+ for i, v := range nameAndKeyIDs {
+ iKeyIDs[i] = v
+ }
+
+ rows, err := s.db.QueryContext(ctx, query, iKeyIDs...)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectServerKeys: rows.close() failed")
+ results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{}
+ for rows.Next() {
+ var serverName string
+ var keyID string
+ var key string
+ var validUntilTS int64
+ var expiredTS int64
+ if err = rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil {
+ return nil, err
+ }
+ r := gomatrixserverlib.PublicKeyLookupRequest{
+ ServerName: gomatrixserverlib.ServerName(serverName),
+ KeyID: gomatrixserverlib.KeyID(keyID),
+ }
+ vk := gomatrixserverlib.VerifyKey{}
+ err = vk.Key.Decode(key)
+ if err != nil {
+ return nil, err
+ }
+ results[r] = gomatrixserverlib.PublicKeyLookupResult{
+ VerifyKey: vk,
+ ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS),
+ ExpiredTS: gomatrixserverlib.Timestamp(expiredTS),
+ }
+ }
+ return results, nil
+}
+
+func (s *serverKeyStatements) upsertServerKeys(
+ ctx context.Context,
+ request gomatrixserverlib.PublicKeyLookupRequest,
+ key gomatrixserverlib.PublicKeyLookupResult,
+) error {
+ _, err := s.upsertServerKeysStmt.ExecContext(
+ ctx,
+ string(request.ServerName),
+ string(request.KeyID),
+ nameAndKeyID(request),
+ key.ValidUntilTS,
+ key.ExpiredTS,
+ key.Key.Encode(),
+ )
+ return err
+}
+
+func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string {
+ return string(request.ServerName) + "\x1F" + string(request.KeyID)
+}
diff --git a/internal/log.go b/internal/log.go
new file mode 100644
index 00000000..fd2b84ab
--- /dev/null
+++ b/internal/log.go
@@ -0,0 +1,188 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "path"
+ "path/filepath"
+ "runtime"
+ "strings"
+
+ "github.com/matrix-org/util"
+
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dugong"
+ "github.com/sirupsen/logrus"
+)
+
+type utcFormatter struct {
+ logrus.Formatter
+}
+
+func (f utcFormatter) Format(entry *logrus.Entry) ([]byte, error) {
+ entry.Time = entry.Time.UTC()
+ return f.Formatter.Format(entry)
+}
+
+// Logrus hook which wraps another hook and filters log entries according to their level.
+// (Note that we cannot use solely logrus.SetLevel, because Dendrite supports multiple
+// levels of logging at the same time.)
+type logLevelHook struct {
+ level logrus.Level
+ logrus.Hook
+}
+
+// Levels returns all the levels supported by this hook.
+func (h *logLevelHook) Levels() []logrus.Level {
+ levels := make([]logrus.Level, 0)
+
+ for _, level := range logrus.AllLevels {
+ if level <= h.level {
+ levels = append(levels, level)
+ }
+ }
+
+ return levels
+}
+
+// callerPrettyfier is a function that given a runtime.Frame object, will
+// extract the calling function's name and file, and return them in a nicely
+// formatted way
+func callerPrettyfier(f *runtime.Frame) (string, string) {
+ // Retrieve just the function name
+ s := strings.Split(f.Function, ".")
+ funcname := s[len(s)-1]
+
+ // Append a newline + tab to it to move the actual log content to its own line
+ funcname += "\n\t"
+
+ // Surround the filepath in brackets and append line number so IDEs can quickly
+ // navigate
+ filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line)
+
+ return funcname, filename
+}
+
+// SetupPprof starts a pprof listener. We use the DefaultServeMux here because it is
+// simplest, and it gives us the freedom to run pprof on a separate port.
+func SetupPprof() {
+ if hostPort := os.Getenv("PPROFLISTEN"); hostPort != "" {
+ logrus.Warn("Starting pprof on ", hostPort)
+ go func() {
+ logrus.WithError(http.ListenAndServe(hostPort, nil)).Error("Failed to setup pprof listener")
+ }()
+ }
+}
+
+// SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded.
+func SetupStdLogging() {
+ logrus.SetReportCaller(true)
+ logrus.SetFormatter(&utcFormatter{
+ &logrus.TextFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ FullTimestamp: true,
+ DisableColors: false,
+ DisableTimestamp: false,
+ QuoteEmptyFields: true,
+ CallerPrettyfier: callerPrettyfier,
+ },
+ })
+}
+
+// SetupHookLogging configures the logging hooks defined in the configuration.
+// If something fails here it means that the logging was improperly configured,
+// so we just exit with the error
+func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
+ logrus.SetReportCaller(true)
+ for _, hook := range hooks {
+ // Check we received a proper logging level
+ level, err := logrus.ParseLevel(hook.Level)
+ if err != nil {
+ logrus.Fatalf("Unrecognised logging level %s: %q", hook.Level, err)
+ }
+
+ // Perform a first filter on the logs according to the lowest level of all
+ // (Eg: If we have hook for info and above, prevent logrus from processing debug logs)
+ if logrus.GetLevel() < level {
+ logrus.SetLevel(level)
+ }
+
+ switch hook.Type {
+ case "file":
+ checkFileHookParams(hook.Params)
+ setupFileHook(hook, level, componentName)
+ default:
+ logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
+ }
+ }
+}
+
+// File type hooks should be provided a path to a directory to store log files
+func checkFileHookParams(params map[string]interface{}) {
+ path, ok := params["path"]
+ if !ok {
+ logrus.Fatalf("Expecting a parameter \"path\" for logging hook of type \"file\"")
+ }
+
+ if _, ok := path.(string); !ok {
+ logrus.Fatalf("Parameter \"path\" for logging hook of type \"file\" should be a string")
+ }
+}
+
+// Add a new FSHook to the logger. Each component will log in its own file
+func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName string) {
+ dirPath := (hook.Params["path"]).(string)
+ fullPath := filepath.Join(dirPath, componentName+".log")
+
+ if err := os.MkdirAll(path.Dir(fullPath), os.ModePerm); err != nil {
+ logrus.Fatalf("Couldn't create directory %s: %q", path.Dir(fullPath), err)
+ }
+
+ logrus.AddHook(&logLevelHook{
+ level,
+ dugong.NewFSHook(
+ fullPath,
+ &utcFormatter{
+ &logrus.TextFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ DisableColors: true,
+ DisableTimestamp: false,
+ DisableSorting: false,
+ QuoteEmptyFields: true,
+ },
+ },
+ &dugong.DailyRotationSchedule{GZip: true},
+ ),
+ })
+}
+
+//CloseAndLogIfError Closes io.Closer and logs the error if any
+func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
+ if closer == nil {
+ return
+ }
+ err := closer.Close()
+ if ctx == nil {
+ ctx = context.TODO()
+ }
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error(message)
+ }
+}
diff --git a/internal/partition_offset_table.go b/internal/partition_offset_table.go
new file mode 100644
index 00000000..8b72819b
--- /dev/null
+++ b/internal/partition_offset_table.go
@@ -0,0 +1,111 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "database/sql"
+ "strings"
+)
+
+const partitionOffsetsSchema = `
+-- The offsets that the server has processed up to.
+CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets (
+ -- The name of the topic.
+ topic TEXT NOT NULL,
+ -- The 32-bit partition ID
+ partition INTEGER NOT NULL,
+ -- The 64-bit offset.
+ partition_offset BIGINT NOT NULL,
+ UNIQUE (topic, partition)
+);
+`
+
+const selectPartitionOffsetsSQL = "" +
+ "SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1"
+
+const upsertPartitionOffsetsSQL = "" +
+ "INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
+ " ON CONFLICT (topic, partition)" +
+ " DO UPDATE SET partition_offset = $3"
+
+// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
+type PartitionOffsetStatements struct {
+ selectPartitionOffsetsStmt *sql.Stmt
+ upsertPartitionOffsetStmt *sql.Stmt
+}
+
+// Prepare converts the raw SQL statements into prepared statements.
+// Takes a prefix to prepend to the table name used to store the partition offsets.
+// This allows multiple components to share the same database schema.
+func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) {
+ _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
+ if err != nil {
+ return
+ }
+ if s.selectPartitionOffsetsStmt, err = db.Prepare(
+ strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1),
+ ); err != nil {
+ return
+ }
+ if s.upsertPartitionOffsetStmt, err = db.Prepare(
+ strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1),
+ ); err != nil {
+ return
+ }
+ return
+}
+
+// PartitionOffsets implements PartitionStorer
+func (s *PartitionOffsetStatements) PartitionOffsets(
+ ctx context.Context, topic string,
+) ([]PartitionOffset, error) {
+ return s.selectPartitionOffsets(ctx, topic)
+}
+
+// SetPartitionOffset implements PartitionStorer
+func (s *PartitionOffsetStatements) SetPartitionOffset(
+ ctx context.Context, topic string, partition int32, offset int64,
+) error {
+ return s.upsertPartitionOffset(ctx, topic, partition, offset)
+}
+
+// selectPartitionOffsets returns all the partition offsets for the given topic.
+func (s *PartitionOffsetStatements) selectPartitionOffsets(
+ ctx context.Context, topic string,
+) ([]PartitionOffset, error) {
+ rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic)
+ if err != nil {
+ return nil, err
+ }
+ defer CloseAndLogIfError(ctx, rows, "selectPartitionOffsets: rows.close() failed")
+ var results []PartitionOffset
+ for rows.Next() {
+ var offset PartitionOffset
+ if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
+ return nil, err
+ }
+ results = append(results, offset)
+ }
+ return results, rows.Err()
+}
+
+// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
+func (s *PartitionOffsetStatements) upsertPartitionOffset(
+ ctx context.Context, topic string, partition int32, offset int64,
+) error {
+ _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset)
+ return err
+}
diff --git a/internal/postgres.go b/internal/postgres.go
new file mode 100644
index 00000000..7ae40d8f
--- /dev/null
+++ b/internal/postgres.go
@@ -0,0 +1,25 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !wasm
+
+package internal
+
+import "github.com/lib/pq"
+
+// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
+func IsUniqueConstraintViolationErr(err error) bool {
+ pqErr, ok := err.(*pq.Error)
+ return ok && pqErr.Code == "23505"
+}
diff --git a/internal/postgres_wasm.go b/internal/postgres_wasm.go
new file mode 100644
index 00000000..64d32829
--- /dev/null
+++ b/internal/postgres_wasm.go
@@ -0,0 +1,22 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build wasm
+
+package internal
+
+// IsUniqueConstraintViolationErr no-ops for this architecture
+func IsUniqueConstraintViolationErr(err error) bool {
+ return false
+}
diff --git a/internal/routing.go b/internal/routing.go
new file mode 100644
index 00000000..4462c70c
--- /dev/null
+++ b/internal/routing.go
@@ -0,0 +1,35 @@
+// Copyright 2019 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "net/url"
+)
+
+// URLDecodeMapValues is a function that iterates through each of the items in a
+// map, URL decodes the value, and returns a new map with the decoded values
+// under the same key names
+func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) {
+ decoded := make(map[string]string, len(vmap))
+ for key, value := range vmap {
+ decodedVal, err := url.PathUnescape(value)
+ if err != nil {
+ return make(map[string]string), err
+ }
+ decoded[key] = decodedVal
+ }
+
+ return decoded, nil
+}
diff --git a/internal/sql.go b/internal/sql.go
new file mode 100644
index 00000000..d6a5a308
--- /dev/null
+++ b/internal/sql.go
@@ -0,0 +1,109 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "database/sql"
+ "fmt"
+ "runtime"
+ "time"
+)
+
+// A Transaction is something that can be committed or rolledback.
+type Transaction interface {
+ // Commit the transaction
+ Commit() error
+ // Rollback the transaction.
+ Rollback() error
+}
+
+// EndTransaction ends a transaction.
+// If the transaction succeeded then it is committed, otherwise it is rolledback.
+// You MUST check the error returned from this function to be sure that the transaction
+// was applied correctly. For example, 'database is locked' errors in sqlite will happen here.
+func EndTransaction(txn Transaction, succeeded *bool) error {
+ if *succeeded {
+ return txn.Commit() // nolint: errcheck
+ } else {
+ return txn.Rollback() // nolint: errcheck
+ }
+}
+
+// WithTransaction runs a block of code passing in an SQL transaction
+// If the code returns an error or panics then the transactions is rolledback
+// Otherwise the transaction is committed.
+func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
+ txn, err := db.Begin()
+ if err != nil {
+ return
+ }
+ succeeded := false
+ defer func() {
+ err2 := EndTransaction(txn, &succeeded)
+ if err == nil && err2 != nil { // failed to commit/rollback
+ err = err2
+ }
+ }()
+
+ err = fn(txn)
+ if err != nil {
+ return
+ }
+
+ succeeded = true
+ return
+}
+
+// TxStmt wraps an SQL stmt inside an optional transaction.
+// If the transaction is nil then it returns the original statement that will
+// run outside of a transaction.
+// Otherwise returns a copy of the statement that will run inside the transaction.
+func TxStmt(transaction *sql.Tx, statement *sql.Stmt) *sql.Stmt {
+ if transaction != nil {
+ statement = transaction.Stmt(statement)
+ }
+ return statement
+}
+
+// Hack of the century
+func QueryVariadic(count int) string {
+ return QueryVariadicOffset(count, 0)
+}
+
+func QueryVariadicOffset(count, offset int) string {
+ str := "("
+ for i := 0; i < count; i++ {
+ str += fmt.Sprintf("$%d", i+offset+1)
+ if i < (count - 1) {
+ str += ", "
+ }
+ }
+ str += ")"
+ return str
+}
+
+func SQLiteDriverName() string {
+ if runtime.GOOS == "js" {
+ return "sqlite3_js"
+ }
+ return "sqlite3"
+}
+
+// DbProperties functions return properties used by database/sql/DB
+type DbProperties interface {
+ MaxIdleConns() int
+ MaxOpenConns() int
+ ConnMaxLifetime() time.Duration
+}
diff --git a/internal/sqlutil/trace.go b/internal/sqlutil/trace.go
index 42ac4e58..1b008e1b 100644
--- a/internal/sqlutil/trace.go
+++ b/internal/sqlutil/trace.go
@@ -25,7 +25,7 @@ import (
"strings"
"time"
- "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/internal"
"github.com/ngrok/sqlmw"
"github.com/sirupsen/logrus"
)
@@ -78,7 +78,7 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [
// Open opens a database specified by its database driver name and a driver-specific data source name,
// usually consisting of at least a database name and connection information. Includes tracing driver
// if DENDRITE_TRACE_SQL=1
-func Open(driverName, dsn string, dbProperties common.DbProperties) (*sql.DB, error) {
+func Open(driverName, dsn string, dbProperties internal.DbProperties) (*sql.DB, error) {
if tracingEnabled {
// install the wrapped driver
driverName += "-trace"
@@ -87,7 +87,7 @@ func Open(driverName, dsn string, dbProperties common.DbProperties) (*sql.DB, er
if err != nil {
return nil, err
}
- if driverName != common.SQLiteDriverName() && dbProperties != nil {
+ if driverName != internal.SQLiteDriverName() && dbProperties != nil {
logrus.WithFields(logrus.Fields{
"MaxOpenConns": dbProperties.MaxOpenConns(),
"MaxIdleConns": dbProperties.MaxIdleConns(),
diff --git a/internal/test/client.go b/internal/test/client.go
new file mode 100644
index 00000000..a38540ac
--- /dev/null
+++ b/internal/test/client.go
@@ -0,0 +1,158 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "crypto/tls"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Request contains the information necessary to issue a request and test its result
+type Request struct {
+ Req *http.Request
+ WantedBody string
+ WantedStatusCode int
+ LastErr *LastRequestErr
+}
+
+// LastRequestErr is a synchronised error wrapper
+// Useful for obtaining the last error from a set of requests
+type LastRequestErr struct {
+ sync.Mutex
+ Err error
+}
+
+// Set sets the error
+func (r *LastRequestErr) Set(err error) {
+ r.Lock()
+ defer r.Unlock()
+ r.Err = err
+}
+
+// Get gets the error
+func (r *LastRequestErr) Get() error {
+ r.Lock()
+ defer r.Unlock()
+ return r.Err
+}
+
+// CanonicalJSONInput canonicalises a slice of JSON strings
+// Useful for test input
+func CanonicalJSONInput(jsonData []string) []string {
+ for i := range jsonData {
+ jsonBytes, err := gomatrixserverlib.CanonicalJSON([]byte(jsonData[i]))
+ if err != nil && err != io.EOF {
+ panic(err)
+ }
+ jsonData[i] = string(jsonBytes)
+ }
+ return jsonData
+}
+
+// Do issues a request and checks the status code and body of the response
+func (r *Request) Do() (err error) {
+ client := &http.Client{
+ Timeout: 5 * time.Second,
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ },
+ }
+ res, err := client.Do(r.Req)
+ if err != nil {
+ return err
+ }
+ defer (func() { err = res.Body.Close() })()
+
+ if res.StatusCode != r.WantedStatusCode {
+ return fmt.Errorf("incorrect status code. Expected: %d Got: %d", r.WantedStatusCode, res.StatusCode)
+ }
+
+ if r.WantedBody != "" {
+ resBytes, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ return err
+ }
+ jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes)
+ if err != nil {
+ return err
+ }
+ if string(jsonBytes) != r.WantedBody {
+ return fmt.Errorf("returned wrong bytes. Expected:\n%s\n\nGot:\n%s", r.WantedBody, string(jsonBytes))
+ }
+ }
+
+ return nil
+}
+
+// DoUntilSuccess blocks and repeats the same request until the response returns the desired status code and body.
+// It then closes the given channel and returns.
+func (r *Request) DoUntilSuccess(done chan error) {
+ r.LastErr = &LastRequestErr{}
+ for {
+ if err := r.Do(); err != nil {
+ r.LastErr.Set(err)
+ time.Sleep(1 * time.Second) // don't tightloop
+ continue
+ }
+ close(done)
+ return
+ }
+}
+
+// Run repeatedly issues a request until success, error or a timeout is reached
+func (r *Request) Run(label string, timeout time.Duration, serverCmdChan chan error) {
+ fmt.Printf("==TESTING== %v (timeout: %v)\n", label, timeout)
+ done := make(chan error, 1)
+
+ // We need to wait for the server to:
+ // - have connected to the database
+ // - have created the tables
+ // - be listening on the given port
+ go r.DoUntilSuccess(done)
+
+ // wait for one of:
+ // - the test to pass (done channel is closed)
+ // - the server to exit with an error (error sent on serverCmdChan)
+ // - our test timeout to expire
+ // We don't need to clean up since the main() function handles that in the event we panic
+ select {
+ case <-time.After(timeout):
+ fmt.Printf("==TESTING== %v TIMEOUT\n", label)
+ if reqErr := r.LastErr.Get(); reqErr != nil {
+ fmt.Println("Last /sync request error:")
+ fmt.Println(reqErr)
+ }
+ panic(fmt.Sprintf("%v server timed out", label))
+ case err := <-serverCmdChan:
+ if err != nil {
+ fmt.Println("=============================================================================================")
+ fmt.Printf("%v server failed to run. If failing with 'pq: password authentication failed for user' try:", label)
+ fmt.Println(" export PGHOST=/var/run/postgresql")
+ fmt.Println("=============================================================================================")
+ panic(err)
+ }
+ case <-done:
+ fmt.Printf("==TESTING== %v PASSED\n", label)
+ }
+}
diff --git a/internal/test/config.go b/internal/test/config.go
new file mode 100644
index 00000000..06510c8b
--- /dev/null
+++ b/internal/test/config.go
@@ -0,0 +1,208 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "crypto/rand"
+ "crypto/rsa"
+ "crypto/x509"
+ "encoding/base64"
+ "encoding/pem"
+ "fmt"
+ "io/ioutil"
+ "math/big"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ "gopkg.in/yaml.v2"
+)
+
+const (
+ // ConfigFile is the name of the config file for a server.
+ ConfigFile = "dendrite.yaml"
+ // ServerKeyFile is the name of the file holding the matrix server private key.
+ ServerKeyFile = "server_key.pem"
+ // TLSCertFile is the name of the file holding the TLS certificate used for federation.
+ TLSCertFile = "tls_cert.pem"
+ // TLSKeyFile is the name of the file holding the TLS key used for federation.
+ TLSKeyFile = "tls_key.pem"
+ // MediaDir is the name of the directory used to store media.
+ MediaDir = "media"
+)
+
+// MakeConfig makes a config suitable for running integration tests.
+// Generates new matrix and TLS keys for the server.
+func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*config.Dendrite, int, error) {
+ var cfg config.Dendrite
+
+ port := startPort
+ assignAddress := func() config.Address {
+ result := config.Address(fmt.Sprintf("%s:%d", host, port))
+ port++
+ return result
+ }
+
+ serverKeyPath := filepath.Join(configDir, ServerKeyFile)
+ tlsCertPath := filepath.Join(configDir, TLSKeyFile)
+ tlsKeyPath := filepath.Join(configDir, TLSCertFile)
+ mediaBasePath := filepath.Join(configDir, MediaDir)
+
+ if err := NewMatrixKey(serverKeyPath); err != nil {
+ return nil, 0, err
+ }
+
+ if err := NewTLSKey(tlsKeyPath, tlsCertPath); err != nil {
+ return nil, 0, err
+ }
+
+ cfg.Version = config.Version
+
+ cfg.Matrix.ServerName = gomatrixserverlib.ServerName(assignAddress())
+ cfg.Matrix.PrivateKeyPath = config.Path(serverKeyPath)
+ cfg.Matrix.FederationCertificatePaths = []config.Path{config.Path(tlsCertPath)}
+
+ cfg.Media.BasePath = config.Path(mediaBasePath)
+
+ cfg.Kafka.Addresses = []string{kafkaURI}
+ // TODO: Different servers should be using different topics.
+ // Make this configurable somehow?
+ cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
+ cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
+ cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output"
+ cfg.Kafka.Topics.UserUpdates = "test.user.output"
+
+ // TODO: Use different databases for the different schemas.
+ // Using the same database for every schema currently works because
+ // the table names are globally unique. But we might not want to
+ // rely on that in the future.
+ cfg.Database.Account = config.DataSource(database)
+ cfg.Database.AppService = config.DataSource(database)
+ cfg.Database.Device = config.DataSource(database)
+ cfg.Database.MediaAPI = config.DataSource(database)
+ cfg.Database.RoomServer = config.DataSource(database)
+ cfg.Database.ServerKey = config.DataSource(database)
+ cfg.Database.SyncAPI = config.DataSource(database)
+ cfg.Database.PublicRoomsAPI = config.DataSource(database)
+
+ cfg.Listen.ClientAPI = assignAddress()
+ cfg.Listen.AppServiceAPI = assignAddress()
+ cfg.Listen.FederationAPI = assignAddress()
+ cfg.Listen.MediaAPI = assignAddress()
+ cfg.Listen.RoomServer = assignAddress()
+ cfg.Listen.SyncAPI = assignAddress()
+ cfg.Listen.PublicRoomsAPI = assignAddress()
+ cfg.Listen.EDUServer = assignAddress()
+
+ // Bind to the same address as the listen address
+ // All microservices are run on the same host in testing
+ cfg.Bind.ClientAPI = cfg.Listen.ClientAPI
+ cfg.Bind.AppServiceAPI = cfg.Listen.AppServiceAPI
+ cfg.Bind.FederationAPI = cfg.Listen.FederationAPI
+ cfg.Bind.MediaAPI = cfg.Listen.MediaAPI
+ cfg.Bind.RoomServer = cfg.Listen.RoomServer
+ cfg.Bind.SyncAPI = cfg.Listen.SyncAPI
+ cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI
+ cfg.Bind.EDUServer = cfg.Listen.EDUServer
+
+ return &cfg, port, nil
+}
+
+// WriteConfig writes the config file to the directory.
+func WriteConfig(cfg *config.Dendrite, configDir string) error {
+ data, err := yaml.Marshal(cfg)
+ if err != nil {
+ return err
+ }
+ return ioutil.WriteFile(filepath.Join(configDir, ConfigFile), data, 0666)
+}
+
+// NewMatrixKey generates a new ed25519 matrix server key and writes it to a file.
+func NewMatrixKey(matrixKeyPath string) (err error) {
+ var data [35]byte
+ _, err = rand.Read(data[:])
+ if err != nil {
+ return err
+ }
+ keyOut, err := os.OpenFile(matrixKeyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
+ if err != nil {
+ return err
+ }
+
+ defer (func() {
+ err = keyOut.Close()
+ })()
+
+ err = pem.Encode(keyOut, &pem.Block{
+ Type: "MATRIX PRIVATE KEY",
+ Headers: map[string]string{
+ "Key-ID": "ed25519:" + base64.RawStdEncoding.EncodeToString(data[:3]),
+ },
+ Bytes: data[3:],
+ })
+ return err
+}
+
+const certificateDuration = time.Hour * 24 * 365 * 10
+
+// NewTLSKey generates a new RSA TLS key and certificate and writes it to a file.
+func NewTLSKey(tlsKeyPath, tlsCertPath string) error {
+ priv, err := rsa.GenerateKey(rand.Reader, 4096)
+ if err != nil {
+ return err
+ }
+
+ notBefore := time.Now()
+ notAfter := notBefore.Add(certificateDuration)
+ serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
+ serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
+ if err != nil {
+ return err
+ }
+
+ template := x509.Certificate{
+ SerialNumber: serialNumber,
+ NotBefore: notBefore,
+ NotAfter: notAfter,
+ KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
+ ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
+ BasicConstraintsValid: true,
+ }
+ derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
+ if err != nil {
+ return err
+ }
+ certOut, err := os.Create(tlsCertPath)
+ if err != nil {
+ return err
+ }
+ defer certOut.Close() // nolint: errcheck
+ if err = pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
+ return err
+ }
+
+ keyOut, err := os.OpenFile(tlsKeyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
+ if err != nil {
+ return err
+ }
+ defer keyOut.Close() // nolint: errcheck
+ err = pem.Encode(keyOut, &pem.Block{
+ Type: "RSA PRIVATE KEY",
+ Bytes: x509.MarshalPKCS1PrivateKey(priv),
+ })
+ return err
+}
diff --git a/internal/test/kafka.go b/internal/test/kafka.go
new file mode 100644
index 00000000..cbf24630
--- /dev/null
+++ b/internal/test/kafka.go
@@ -0,0 +1,76 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "io"
+ "os/exec"
+ "path/filepath"
+ "strings"
+)
+
+// KafkaExecutor executes kafka scripts.
+type KafkaExecutor struct {
+ // The location of Zookeeper. Typically this is `localhost:2181`.
+ ZookeeperURI string
+ // The directory where Kafka is installed to. Used to locate kafka scripts.
+ KafkaDirectory string
+ // The location of the Kafka logs. Typically this is `localhost:9092`.
+ KafkaURI string
+ // Where stdout and stderr should be written to. Typically this is `os.Stderr`.
+ OutputWriter io.Writer
+}
+
+// CreateTopic creates a new kafka topic. This is created with a single partition.
+func (e *KafkaExecutor) CreateTopic(topic string) error {
+ cmd := exec.Command(
+ filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
+ "--create",
+ "--zookeeper", e.ZookeeperURI,
+ "--replication-factor", "1",
+ "--partitions", "1",
+ "--topic", topic,
+ )
+ cmd.Stdout = e.OutputWriter
+ cmd.Stderr = e.OutputWriter
+ return cmd.Run()
+}
+
+// WriteToTopic writes data to a kafka topic.
+func (e *KafkaExecutor) WriteToTopic(topic string, data []string) error {
+ cmd := exec.Command(
+ filepath.Join(e.KafkaDirectory, "bin", "kafka-console-producer.sh"),
+ "--broker-list", e.KafkaURI,
+ "--topic", topic,
+ )
+ cmd.Stdout = e.OutputWriter
+ cmd.Stderr = e.OutputWriter
+ cmd.Stdin = strings.NewReader(strings.Join(data, "\n"))
+ return cmd.Run()
+}
+
+// DeleteTopic deletes a given kafka topic if it exists.
+func (e *KafkaExecutor) DeleteTopic(topic string) error {
+ cmd := exec.Command(
+ filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
+ "--delete",
+ "--if-exists",
+ "--zookeeper", e.ZookeeperURI,
+ "--topic", topic,
+ )
+ cmd.Stderr = e.OutputWriter
+ cmd.Stdout = e.OutputWriter
+ return cmd.Run()
+}
diff --git a/internal/test/server.go b/internal/test/server.go
new file mode 100644
index 00000000..1493dac6
--- /dev/null
+++ b/internal/test/server.go
@@ -0,0 +1,105 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+
+ "github.com/matrix-org/dendrite/internal/config"
+)
+
+// Defaulting allows assignment of string variables with a fallback default value
+// Useful for use with os.Getenv() for example
+func Defaulting(value, defaultValue string) string {
+ if value == "" {
+ value = defaultValue
+ }
+ return value
+}
+
+// CreateDatabase creates a new database, dropping it first if it exists
+func CreateDatabase(command string, args []string, database string) error {
+ cmd := exec.Command(command, args...)
+ cmd.Stdin = strings.NewReader(
+ fmt.Sprintf("DROP DATABASE IF EXISTS %s; CREATE DATABASE %s;", database, database),
+ )
+ // Send stdout and stderr to our stderr so that we see error messages from
+ // the psql process
+ cmd.Stdout = os.Stderr
+ cmd.Stderr = os.Stderr
+ return cmd.Run()
+}
+
+// CreateBackgroundCommand creates an executable command
+// The Cmd being executed is returned. A channel is also returned,
+// which will have any termination errors sent down it, followed immediately by the channel being closed.
+func CreateBackgroundCommand(command string, args []string) (*exec.Cmd, chan error) {
+ cmd := exec.Command(command, args...)
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
+
+ if err := cmd.Start(); err != nil {
+ panic("failed to start server: " + err.Error())
+ }
+ cmdChan := make(chan error, 1)
+ go func() {
+ cmdChan <- cmd.Wait()
+ close(cmdChan)
+ }()
+ return cmd, cmdChan
+}
+
+// InitDatabase creates the database and config file needed for the server to run
+func InitDatabase(postgresDatabase, postgresContainerName string, databases []string) {
+ if len(databases) > 0 {
+ var dbCmd string
+ var dbArgs []string
+ if postgresContainerName == "" {
+ dbCmd = "psql"
+ dbArgs = []string{postgresDatabase}
+ } else {
+ dbCmd = "docker"
+ dbArgs = []string{
+ "exec", "-i", postgresContainerName, "psql", "-U", "postgres", postgresDatabase,
+ }
+ }
+ for _, database := range databases {
+ if err := CreateDatabase(dbCmd, dbArgs, database); err != nil {
+ panic(err)
+ }
+ }
+ }
+}
+
+// StartProxy creates a reverse proxy
+func StartProxy(bindAddr string, cfg *config.Dendrite) (*exec.Cmd, chan error) {
+ proxyArgs := []string{
+ "--bind-address", bindAddr,
+ "--sync-api-server-url", "http://" + string(cfg.Listen.SyncAPI),
+ "--client-api-server-url", "http://" + string(cfg.Listen.ClientAPI),
+ "--media-api-server-url", "http://" + string(cfg.Listen.MediaAPI),
+ "--public-rooms-api-server-url", "http://" + string(cfg.Listen.PublicRoomsAPI),
+ "--tls-cert", "server.crt",
+ "--tls-key", "server.key",
+ }
+ return CreateBackgroundCommand(
+ filepath.Join(filepath.Dir(os.Args[0]), "client-api-proxy"),
+ proxyArgs,
+ )
+}
diff --git a/internal/test/slice.go b/internal/test/slice.go
new file mode 100644
index 00000000..00c740db
--- /dev/null
+++ b/internal/test/slice.go
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import "sort"
+
+// UnsortedStringSliceEqual returns true if the slices have same length & elements.
+// Does not modify the given slice.
+func UnsortedStringSliceEqual(first, second []string) bool {
+ if len(first) != len(second) {
+ return false
+ }
+
+ a, b := first[:], second[:]
+ sort.Strings(a)
+ sort.Strings(b)
+ for i := range a {
+ if a[i] != b[i] {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/internal/transactions/transactions.go b/internal/transactions/transactions.go
new file mode 100644
index 00000000..d2eb0f27
--- /dev/null
+++ b/internal/transactions/transactions.go
@@ -0,0 +1,95 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transactions
+
+import (
+ "sync"
+ "time"
+
+ "github.com/matrix-org/util"
+)
+
+// DefaultCleanupPeriod represents the default time duration after which cacheCleanService runs.
+const DefaultCleanupPeriod time.Duration = 30 * time.Minute
+
+type txnsMap map[CacheKey]*util.JSONResponse
+
+// CacheKey is the type for the key in a transactions cache.
+// This is needed because the spec requires transaction IDs to have a per-access token scope.
+type CacheKey struct {
+ AccessToken string
+ TxnID string
+}
+
+// Cache represents a temporary store for response entries.
+// Entries are evicted after a certain period, defined by cleanupPeriod.
+// This works by keeping two maps of entries, and cycling the maps after the cleanupPeriod.
+type Cache struct {
+ sync.RWMutex
+ txnsMaps [2]txnsMap
+ cleanupPeriod time.Duration
+}
+
+// New is a wrapper which calls NewWithCleanupPeriod with DefaultCleanupPeriod as argument.
+func New() *Cache {
+ return NewWithCleanupPeriod(DefaultCleanupPeriod)
+}
+
+// NewWithCleanupPeriod creates a new Cache object, starts cacheCleanService.
+// Takes cleanupPeriod as argument.
+// Returns a reference to newly created Cache.
+func NewWithCleanupPeriod(cleanupPeriod time.Duration) *Cache {
+ t := Cache{txnsMaps: [2]txnsMap{make(txnsMap), make(txnsMap)}}
+ t.cleanupPeriod = cleanupPeriod
+
+ // Start clean service as the Cache is created
+ go cacheCleanService(&t)
+ return &t
+}
+
+// FetchTransaction looks up an entry for the (accessToken, txnID) tuple in Cache.
+// Looks in both the txnMaps.
+// Returns (JSON response, true) if txnID is found, else the returned bool is false.
+func (t *Cache) FetchTransaction(accessToken, txnID string) (*util.JSONResponse, bool) {
+ t.RLock()
+ defer t.RUnlock()
+ for _, txns := range t.txnsMaps {
+ res, ok := txns[CacheKey{accessToken, txnID}]
+ if ok {
+ return res, true
+ }
+ }
+ return nil, false
+}
+
+// AddTransaction adds an entry for the (accessToken, txnID) tuple in Cache.
+// Adds to the front txnMap.
+func (t *Cache) AddTransaction(accessToken, txnID string, res *util.JSONResponse) {
+ t.Lock()
+ defer t.Unlock()
+
+ t.txnsMaps[0][CacheKey{accessToken, txnID}] = res
+}
+
+// cacheCleanService is responsible for cleaning up entries after cleanupPeriod.
+// It guarantees that an entry will be present in cache for at least cleanupPeriod & at most 2 * cleanupPeriod.
+// This cycles the txnMaps forward, i.e. back map is assigned the front and front is assigned an empty map.
+func cacheCleanService(t *Cache) {
+ ticker := time.NewTicker(t.cleanupPeriod).C
+ for range ticker {
+ t.Lock()
+ t.txnsMaps[1] = t.txnsMaps[0]
+ t.txnsMaps[0] = make(txnsMap)
+ t.Unlock()
+ }
+}
diff --git a/internal/transactions/transactions_test.go b/internal/transactions/transactions_test.go
new file mode 100644
index 00000000..f565e484
--- /dev/null
+++ b/internal/transactions/transactions_test.go
@@ -0,0 +1,77 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transactions
+
+import (
+ "net/http"
+ "testing"
+
+ "github.com/matrix-org/util"
+)
+
+type fakeType struct {
+ ID string `json:"ID"`
+}
+
+var (
+ fakeAccessToken = "aRandomAccessToken"
+ fakeAccessToken2 = "anotherRandomAccessToken"
+ fakeTxnID = "aRandomTxnID"
+ fakeResponse = &util.JSONResponse{
+ Code: http.StatusOK, JSON: fakeType{ID: "0"},
+ }
+ fakeResponse2 = &util.JSONResponse{
+ Code: http.StatusOK, JSON: fakeType{ID: "1"},
+ }
+)
+
+// TestCache creates a New Cache and tests AddTransaction & FetchTransaction
+func TestCache(t *testing.T) {
+ fakeTxnCache := New()
+ fakeTxnCache.AddTransaction(fakeAccessToken, fakeTxnID, fakeResponse)
+
+ // Add entries for noise.
+ for i := 1; i <= 100; i++ {
+ fakeTxnCache.AddTransaction(
+ fakeAccessToken,
+ fakeTxnID+string(i),
+ &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: string(i)}},
+ )
+ }
+
+ testResponse, ok := fakeTxnCache.FetchTransaction(fakeAccessToken, fakeTxnID)
+ if !ok {
+ t.Error("Failed to retrieve entry for txnID: ", fakeTxnID)
+ } else if testResponse.JSON != fakeResponse.JSON {
+ t.Error("Fetched response incorrect. Expected: ", fakeResponse.JSON, " got: ", testResponse.JSON)
+ }
+}
+
+// TestCacheScope ensures transactions with the same transaction ID are not shared
+// across multiple access tokens.
+func TestCacheScope(t *testing.T) {
+ cache := New()
+ cache.AddTransaction(fakeAccessToken, fakeTxnID, fakeResponse)
+ cache.AddTransaction(fakeAccessToken2, fakeTxnID, fakeResponse2)
+
+ if res, ok := cache.FetchTransaction(fakeAccessToken, fakeTxnID); !ok {
+ t.Errorf("failed to retrieve entry for (%s, %s)", fakeAccessToken, fakeTxnID)
+ } else if res.JSON != fakeResponse.JSON {
+ t.Errorf("Wrong cache entry for (%s, %s). Expected: %v; got: %v", fakeAccessToken, fakeTxnID, fakeResponse.JSON, res.JSON)
+ }
+ if res, ok := cache.FetchTransaction(fakeAccessToken2, fakeTxnID); !ok {
+ t.Errorf("failed to retrieve entry for (%s, %s)", fakeAccessToken, fakeTxnID)
+ } else if res.JSON != fakeResponse2.JSON {
+ t.Errorf("Wrong cache entry for (%s, %s). Expected: %v; got: %v", fakeAccessToken, fakeTxnID, fakeResponse2.JSON, res.JSON)
+ }
+}
diff --git a/internal/types.go b/internal/types.go
new file mode 100644
index 00000000..be2717f3
--- /dev/null
+++ b/internal/types.go
@@ -0,0 +1,66 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "errors"
+ "strconv"
+)
+
+// ErrProfileNoExists is returned when trying to lookup a user's profile that
+// doesn't exist locally.
+var ErrProfileNoExists = errors.New("no known profile for given user ID")
+
+// AccountData represents account data sent from the client API server to the
+// sync API server
+type AccountData struct {
+ RoomID string `json:"room_id"`
+ Type string `json:"type"`
+}
+
+// ProfileResponse is a struct containing all known user profile data
+type ProfileResponse struct {
+ AvatarURL string `json:"avatar_url"`
+ DisplayName string `json:"displayname"`
+}
+
+// AvatarURL is a struct containing only the URL to a user's avatar
+type AvatarURL struct {
+ AvatarURL string `json:"avatar_url"`
+}
+
+// DisplayName is a struct containing only a user's display name
+type DisplayName struct {
+ DisplayName string `json:"displayname"`
+}
+
+// WeakBoolean is a type that will Unmarshal to true or false even if the encoded
+// representation is "true"/1 or "false"/0, as well as whatever other forms are
+// recognised by strconv.ParseBool
+type WeakBoolean bool
+
+// UnmarshalJSON is overridden here to allow strings vaguely representing a true
+// or false boolean to be set as their closest counterpart
+func (b *WeakBoolean) UnmarshalJSON(data []byte) error {
+ result, err := strconv.ParseBool(string(data))
+ if err != nil {
+ return err
+ }
+
+ // Set boolean value based on string input
+ *b = WeakBoolean(result)
+
+ return nil
+}