diff options
Diffstat (limited to 'internal')
37 files changed, 4580 insertions, 3 deletions
diff --git a/internal/basecomponent/base.go b/internal/basecomponent/base.go new file mode 100644 index 00000000..e9a375a7 --- /dev/null +++ b/internal/basecomponent/base.go @@ -0,0 +1,289 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package basecomponent + +import ( + "database/sql" + "io" + "net/http" + "net/url" + "time" + + "golang.org/x/crypto/ed25519" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/keydb" + "github.com/matrix-org/dendrite/internal/keydb/cache" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/naffka" + + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/internal" + + "github.com/Shopify/sarama" + "github.com/gorilla/mux" + + appserviceAPI "github.com/matrix-org/dendrite/appservice/api" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/sirupsen/logrus" + + _ "net/http/pprof" +) + +// BaseDendrite is a base for creating new instances of dendrite. It parses +// command line flags and config, and exposes methods for creating various +// resources. All errors are handled by logging then exiting, so all methods +// should only be used during start up. +// Must be closed when shutting down. +type BaseDendrite struct { + componentName string + tracerCloser io.Closer + + // APIMux should be used to register new public matrix api endpoints + APIMux *mux.Router + EnableHTTPAPIs bool + httpClient *http.Client + Cfg *config.Dendrite + ImmutableCache caching.ImmutableCache + KafkaConsumer sarama.Consumer + KafkaProducer sarama.SyncProducer +} + +const HTTPServerTimeout = time.Minute * 5 +const HTTPClientTimeout = time.Second * 30 + +// NewBaseDendrite creates a new instance to be used by a component. +// The componentName is used for logging purposes, and should be a friendly name +// of the compontent running, e.g. "SyncAPI" +func NewBaseDendrite(cfg *config.Dendrite, componentName string, enableHTTPAPIs bool) *BaseDendrite { + internal.SetupStdLogging() + internal.SetupHookLogging(cfg.Logging, componentName) + internal.SetupPprof() + + closer, err := cfg.SetupTracing("Dendrite" + componentName) + if err != nil { + logrus.WithError(err).Panicf("failed to start opentracing") + } + + var kafkaConsumer sarama.Consumer + var kafkaProducer sarama.SyncProducer + if cfg.Kafka.UseNaffka { + kafkaConsumer, kafkaProducer = setupNaffka(cfg) + } else { + kafkaConsumer, kafkaProducer = setupKafka(cfg) + } + + cache, err := caching.NewImmutableInMemoryLRUCache() + if err != nil { + logrus.WithError(err).Warnf("Failed to create cache") + } + + return &BaseDendrite{ + componentName: componentName, + EnableHTTPAPIs: enableHTTPAPIs, + tracerCloser: closer, + Cfg: cfg, + ImmutableCache: cache, + APIMux: mux.NewRouter().UseEncodedPath(), + httpClient: &http.Client{Timeout: HTTPClientTimeout}, + KafkaConsumer: kafkaConsumer, + KafkaProducer: kafkaProducer, + } +} + +// Close implements io.Closer +func (b *BaseDendrite) Close() error { + return b.tracerCloser.Close() +} + +// CreateHTTPAppServiceAPIs returns the QueryAPI for hitting the appservice +// component over HTTP. +func (b *BaseDendrite) CreateHTTPAppServiceAPIs() appserviceAPI.AppServiceQueryAPI { + a, err := appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed") + } + return a +} + +// CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI for hitting +// the roomserver over HTTP. +func (b *BaseDendrite) CreateHTTPRoomserverAPIs() roomserverAPI.RoomserverInternalAPI { + rsAPI, err := roomserverAPI.NewRoomserverInternalAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient, b.ImmutableCache) + if err != nil { + logrus.WithError(err).Panic("NewRoomserverInternalAPIHTTP failed", b.httpClient) + } + return rsAPI +} + +// CreateHTTPEDUServerAPIs returns eduInputAPI for hitting the EDU +// server over HTTP +func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI { + e, err := eduServerAPI.NewEDUServerInputAPIHTTP(b.Cfg.EDUServerURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("NewEDUServerInputAPIHTTP failed", b.httpClient) + } + return e +} + +// CreateHTTPFederationSenderAPIs returns FederationSenderInternalAPI for hitting +// the federation sender over HTTP +func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderInternalAPI { + f, err := federationSenderAPI.NewFederationSenderInternalAPIHTTP(b.Cfg.FederationSenderURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("NewFederationSenderInternalAPIHTTP failed", b.httpClient) + } + return f +} + +// CreateDeviceDB creates a new instance of the device database. Should only be +// called once per component. +func (b *BaseDendrite) CreateDeviceDB() devices.Database { + db, err := devices.NewDatabase(string(b.Cfg.Database.Device), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to devices db") + } + + return db +} + +// CreateAccountsDB creates a new instance of the accounts database. Should only +// be called once per component. +func (b *BaseDendrite) CreateAccountsDB() accounts.Database { + db, err := accounts.NewDatabase(string(b.Cfg.Database.Account), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to accounts db") + } + + return db +} + +// CreateKeyDB creates a new instance of the key database. Should only be called +// once per component. +func (b *BaseDendrite) CreateKeyDB() keydb.Database { + db, err := keydb.NewDatabase( + string(b.Cfg.Database.ServerKey), + b.Cfg.DbProperties(), + b.Cfg.Matrix.ServerName, + b.Cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey), + b.Cfg.Matrix.KeyID, + ) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to keys db") + } + + cachedDB, err := cache.NewKeyDatabase(db, b.ImmutableCache) + if err != nil { + logrus.WithError(err).Panicf("failed to create key cache wrapper") + } + return cachedDB +} + +// CreateFederationClient creates a new federation client. Should only be called +// once per component. +func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient { + return gomatrixserverlib.NewFederationClient( + b.Cfg.Matrix.ServerName, b.Cfg.Matrix.KeyID, b.Cfg.Matrix.PrivateKey, + ) +} + +// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on +// ApiMux under /api/ and adds a prometheus handler under /metrics. +func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) { + // If a separate bind address is defined, listen on that. Otherwise use + // the listen address + var addr string + if bindaddr != "" { + addr = bindaddr + } else { + addr = listenaddr + } + + serv := http.Server{ + Addr: addr, + WriteTimeout: HTTPServerTimeout, + } + + internal.SetupHTTPAPI(http.DefaultServeMux, internal.WrapHandlerInCORS(b.APIMux), b.Cfg) + logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr) + + err := serv.ListenAndServe() + if err != nil { + logrus.WithError(err).Fatal("failed to serve http") + } + + logrus.Infof("Stopped %s server on %s", b.componentName, serv.Addr) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { + var err error + var db *sql.DB + var naffkaDB *naffka.DatabaseImpl + + uri, err := url.Parse(string(cfg.Database.Naffka)) + if err != nil || uri.Scheme == "file" { + db, err = sqlutil.Open(internal.SQLiteDriverName(), string(cfg.Database.Naffka), nil) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewSqliteDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } else { + db, err = sqlutil.Open("postgres", string(cfg.Database.Naffka), nil) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewPostgresqlDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } + + if naffkaDB == nil { + panic("naffka connection string not understood") + } + + naff, err := naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + + return naff, naff +} diff --git a/internal/basecomponent/flags.go b/internal/basecomponent/flags.go new file mode 100644 index 00000000..ecef81f6 --- /dev/null +++ b/internal/basecomponent/flags.go @@ -0,0 +1,61 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package basecomponent + +import ( + "flag" + + "github.com/matrix-org/dendrite/internal/config" + + "github.com/sirupsen/logrus" +) + +var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") + +// ParseFlags parses the commandline flags and uses them to create a config. +// If running as a monolith use `ParseMonolithFlags` instead. +func ParseFlags() *config.Dendrite { + flag.Parse() + + if *configPath == "" { + logrus.Fatal("--config must be supplied") + } + + cfg, err := config.Load(*configPath) + + if err != nil { + logrus.Fatalf("Invalid config file: %s", err) + } + + return cfg +} + +// ParseMonolithFlags parses the commandline flags and uses them to create a +// config. Should only be used if running a monolith. See `ParseFlags`. +func ParseMonolithFlags() *config.Dendrite { + flag.Parse() + + if *configPath == "" { + logrus.Fatal("--config must be supplied") + } + + cfg, err := config.LoadMonolithic(*configPath) + + if err != nil { + logrus.Fatalf("Invalid config file: %s", err) + } + + return cfg +} diff --git a/internal/caching/immutablecache.go b/internal/caching/immutablecache.go new file mode 100644 index 00000000..fea05dd1 --- /dev/null +++ b/internal/caching/immutablecache.go @@ -0,0 +1,17 @@ +package caching + +import ( + "github.com/matrix-org/gomatrixserverlib" +) + +const ( + RoomVersionMaxCacheEntries = 1024 + ServerKeysMaxCacheEntries = 1024 +) + +type ImmutableCache interface { + GetRoomVersion(roomId string) (gomatrixserverlib.RoomVersion, bool) + StoreRoomVersion(roomId string, roomVersion gomatrixserverlib.RoomVersion) + GetServerKey(request gomatrixserverlib.PublicKeyLookupRequest) (gomatrixserverlib.PublicKeyLookupResult, bool) + StoreServerKey(request gomatrixserverlib.PublicKeyLookupRequest, response gomatrixserverlib.PublicKeyLookupResult) +} diff --git a/internal/caching/immutableinmemorylru.go b/internal/caching/immutableinmemorylru.go new file mode 100644 index 00000000..36cd56dc --- /dev/null +++ b/internal/caching/immutableinmemorylru.go @@ -0,0 +1,95 @@ +package caching + +import ( + "fmt" + + lru "github.com/hashicorp/golang-lru" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ImmutableInMemoryLRUCache struct { + roomVersions *lru.Cache + serverKeys *lru.Cache +} + +func NewImmutableInMemoryLRUCache() (*ImmutableInMemoryLRUCache, error) { + roomVersionCache, rvErr := lru.New(RoomVersionMaxCacheEntries) + if rvErr != nil { + return nil, rvErr + } + serverKeysCache, rvErr := lru.New(ServerKeysMaxCacheEntries) + if rvErr != nil { + return nil, rvErr + } + cache := &ImmutableInMemoryLRUCache{ + roomVersions: roomVersionCache, + serverKeys: serverKeysCache, + } + cache.configureMetrics() + return cache, nil +} + +func (c *ImmutableInMemoryLRUCache) configureMetrics() { + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "caching", + Name: "number_room_version_entries", + Help: "The number of room version entries cached.", + }, func() float64 { + return float64(c.roomVersions.Len()) + }) + + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "caching", + Name: "number_server_key_entries", + Help: "The number of server key entries cached.", + }, func() float64 { + return float64(c.serverKeys.Len()) + }) +} + +func checkForInvalidMutation(cache *lru.Cache, key string, value interface{}) { + if peek, ok := cache.Peek(key); ok && peek != value { + panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key)) + } +} + +func (c *ImmutableInMemoryLRUCache) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) { + val, found := c.roomVersions.Get(roomID) + if found && val != nil { + if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok { + return roomVersion, true + } + } + return "", false +} + +func (c *ImmutableInMemoryLRUCache) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) { + checkForInvalidMutation(c.roomVersions, roomID, roomVersion) + c.roomVersions.Add(roomID, roomVersion) +} + +func (c *ImmutableInMemoryLRUCache) GetServerKey( + request gomatrixserverlib.PublicKeyLookupRequest, +) (gomatrixserverlib.PublicKeyLookupResult, bool) { + key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) + val, found := c.serverKeys.Get(key) + if found && val != nil { + if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok { + return keyLookupResult, true + } + } + return gomatrixserverlib.PublicKeyLookupResult{}, false +} + +func (c *ImmutableInMemoryLRUCache) StoreServerKey( + request gomatrixserverlib.PublicKeyLookupRequest, + response gomatrixserverlib.PublicKeyLookupResult, +) { + key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) + checkForInvalidMutation(c.roomVersions, key, response) + c.serverKeys.Add(request, response) +} diff --git a/internal/config/appservice.go b/internal/config/appservice.go new file mode 100644 index 00000000..bf5f089b --- /dev/null +++ b/internal/config/appservice.go @@ -0,0 +1,329 @@ +// Copyright 2017 Andrew Morgan <andrew@amorgan.xyz> +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "io/ioutil" + "path/filepath" + "regexp" + "strings" + + log "github.com/sirupsen/logrus" + yaml "gopkg.in/yaml.v2" +) + +// ApplicationServiceNamespace is the namespace that a specific application +// service has management over. +type ApplicationServiceNamespace struct { + // Whether or not the namespace is managed solely by this application service + Exclusive bool `yaml:"exclusive"` + // A regex pattern that represents the namespace + Regex string `yaml:"regex"` + // The ID of an existing group that all users of this application service will + // be added to. This field is only relevant to the `users` namespace. + // Note that users who are joined to this group through an application service + // are not to be listed when querying for the group's members, however the + // group should be listed when querying an application service user's groups. + // This is to prevent making spamming all users of an application service + // trivial. + GroupID string `yaml:"group_id"` + // Regex object representing our pattern. Saves having to recompile every time + RegexpObject *regexp.Regexp +} + +// ApplicationService represents a Matrix application service. +// https://matrix.org/docs/spec/application_service/unstable.html +type ApplicationService struct { + // User-defined, unique, persistent ID of the application service + ID string `yaml:"id"` + // Base URL of the application service + URL string `yaml:"url"` + // Application service token provided in requests to a homeserver + ASToken string `yaml:"as_token"` + // Homeserver token provided in requests to an application service + HSToken string `yaml:"hs_token"` + // Localpart of application service user + SenderLocalpart string `yaml:"sender_localpart"` + // Information about an application service's namespaces. Key is either + // "users", "aliases" or "rooms" + NamespaceMap map[string][]ApplicationServiceNamespace `yaml:"namespaces"` + // Whether rate limiting is applied to each application service user + RateLimited bool `yaml:"rate_limited"` + // Any custom protocols that this application service provides (e.g. IRC) + Protocols []string `yaml:"protocols"` +} + +// IsInterestedInRoomID returns a bool on whether an application service's +// namespace includes the given room ID +func (a *ApplicationService) IsInterestedInRoomID( + roomID string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["rooms"]; ok { + for _, namespace := range namespaceSlice { + if namespace.RegexpObject.MatchString(roomID) { + return true + } + } + } + + return false +} + +// IsInterestedInUserID returns a bool on whether an application service's +// namespace includes the given user ID +func (a *ApplicationService) IsInterestedInUserID( + userID string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["users"]; ok { + for _, namespace := range namespaceSlice { + if namespace.RegexpObject.MatchString(userID) { + return true + } + } + } + + return false +} + +// OwnsNamespaceCoveringUserId returns a bool on whether an application service's +// namespace is exclusive and includes the given user ID +func (a *ApplicationService) OwnsNamespaceCoveringUserId( + userID string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["users"]; ok { + for _, namespace := range namespaceSlice { + if namespace.Exclusive && namespace.RegexpObject.MatchString(userID) { + return true + } + } + } + + return false +} + +// IsInterestedInRoomAlias returns a bool on whether an application service's +// namespace includes the given room alias +func (a *ApplicationService) IsInterestedInRoomAlias( + roomAlias string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["aliases"]; ok { + for _, namespace := range namespaceSlice { + if namespace.RegexpObject.MatchString(roomAlias) { + return true + } + } + } + + return false +} + +// loadAppServices iterates through all application service config files +// and loads their data into the config object for later access. +func loadAppServices(config *Dendrite) error { + for _, configPath := range config.ApplicationServices.ConfigFiles { + // Create a new application service with default options + appservice := ApplicationService{ + RateLimited: true, + } + + // Create an absolute path from a potentially relative path + absPath, err := filepath.Abs(configPath) + if err != nil { + return err + } + + // Read the application service's config file + configData, err := ioutil.ReadFile(absPath) + if err != nil { + return err + } + + // Load the config data into our struct + if err = yaml.UnmarshalStrict(configData, &appservice); err != nil { + return err + } + + // Append the parsed application service to the global config + config.Derived.ApplicationServices = append( + config.Derived.ApplicationServices, appservice, + ) + } + + // Check for any errors in the loaded application services + return checkErrors(config) +} + +// setupRegexps will create regex objects for exclusive and non-exclusive +// usernames, aliases and rooms of all application services, so that other +// methods can quickly check if a particular string matches any of them. +func setupRegexps(cfg *Dendrite) (err error) { + // Combine all exclusive namespaces for later string checking + var exclusiveUsernameStrings, exclusiveAliasStrings []string + + // If an application service's regex is marked as exclusive, add + // its contents to the overall exlusive regex string. Room regex + // not necessary as we aren't denying exclusive room ID creation + for _, appservice := range cfg.Derived.ApplicationServices { + for key, namespaceSlice := range appservice.NamespaceMap { + switch key { + case "users": + appendExclusiveNamespaceRegexs(&exclusiveUsernameStrings, namespaceSlice) + case "aliases": + appendExclusiveNamespaceRegexs(&exclusiveAliasStrings, namespaceSlice) + } + } + } + + // Join the regexes together into one big regex. + // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" + // Later we can check if a username or alias matches any exclusive regex and + // deny access if it isn't from an application service + exclusiveUsernames := strings.Join(exclusiveUsernameStrings, "|") + exclusiveAliases := strings.Join(exclusiveAliasStrings, "|") + + // If there are no exclusive regexes, compile string so that it will not match + // any valid usernames/aliases/roomIDs + if exclusiveUsernames == "" { + exclusiveUsernames = "^$" + } + if exclusiveAliases == "" { + exclusiveAliases = "^$" + } + + // Store compiled Regex + if cfg.Derived.ExclusiveApplicationServicesUsernameRegexp, err = regexp.Compile(exclusiveUsernames); err != nil { + return err + } + if cfg.Derived.ExclusiveApplicationServicesAliasRegexp, err = regexp.Compile(exclusiveAliases); err != nil { + return err + } + + return nil +} + +// appendExclusiveNamespaceRegexs takes a slice of strings and a slice of +// namespaces and will append the regexes of only the exclusive namespaces +// into the string slice +func appendExclusiveNamespaceRegexs( + exclusiveStrings *[]string, namespaces []ApplicationServiceNamespace, +) { + for index, namespace := range namespaces { + if namespace.Exclusive { + // We append parenthesis to later separate each regex when we compile + // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" + *exclusiveStrings = append(*exclusiveStrings, "("+namespace.Regex+")") + } + + // Compile this regex into a Regexp object for later use + namespaces[index].RegexpObject, _ = regexp.Compile(namespace.Regex) + } +} + +// checkErrors checks for any configuration errors amongst the loaded +// application services according to the application service spec. +func checkErrors(config *Dendrite) (err error) { + var idMap = make(map[string]bool) + var tokenMap = make(map[string]bool) + + // Compile regexp object for checking groupIDs + groupIDRegexp := regexp.MustCompile(`\+.*:.*`) + + // Check each application service for any config errors + for _, appservice := range config.Derived.ApplicationServices { + // Namespace-related checks + for key, namespaceSlice := range appservice.NamespaceMap { + for _, namespace := range namespaceSlice { + if err := validateNamespace(&appservice, key, &namespace, groupIDRegexp); err != nil { + return err + } + } + } + + // Check if the url has trailing /'s. If so, remove them + appservice.URL = strings.TrimRight(appservice.URL, "/") + + // Check if we've already seen this ID. No two application services + // can have the same ID or token. + if idMap[appservice.ID] { + return configErrors([]string{fmt.Sprintf( + "Application service ID %s must be unique", appservice.ID, + )}) + } + // Check if we've already seen this token + if tokenMap[appservice.ASToken] { + return configErrors([]string{fmt.Sprintf( + "Application service Token %s must be unique", appservice.ASToken, + )}) + } + + // Add the id/token to their respective maps if we haven't already + // seen them. + idMap[appservice.ID] = true + tokenMap[appservice.ASToken] = true + + // TODO: Remove once rate_limited is implemented + if appservice.RateLimited { + log.Warn("WARNING: Application service option rate_limited is currently unimplemented") + } + // TODO: Remove once protocols is implemented + if len(appservice.Protocols) > 0 { + log.Warn("WARNING: Application service option protocols is currently unimplemented") + } + } + + return setupRegexps(config) +} + +// validateNamespace returns nil or an error based on whether a given +// application service namespace is valid. A namespace is valid if it has the +// required fields, and its regex is correct. +func validateNamespace( + appservice *ApplicationService, + key string, + namespace *ApplicationServiceNamespace, + groupIDRegexp *regexp.Regexp, +) error { + // Check that namespace(s) are valid regex + if !IsValidRegex(namespace.Regex) { + return configErrors([]string{fmt.Sprintf( + "Invalid regex string for Application Service %s", appservice.ID, + )}) + } + + // Check if GroupID for the users namespace is in the correct format + if key == "users" && namespace.GroupID != "" { + // TODO: Remove once group_id is implemented + log.Warn("WARNING: Application service option group_id is currently unimplemented") + + correctFormat := groupIDRegexp.MatchString(namespace.GroupID) + if !correctFormat { + return configErrors([]string{fmt.Sprintf( + "Invalid user group_id field for application service %s.", + appservice.ID, + )}) + } + } + + return nil +} + +// IsValidRegex returns true or false based on whether the +// given string is valid regex or not +func IsValidRegex(regexString string) bool { + _, err := regexp.Compile(regexString) + + return err == nil +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 00000000..e1e96f9d --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,804 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "bytes" + "crypto/sha256" + "encoding/pem" + "fmt" + "io" + "io/ioutil" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "golang.org/x/crypto/ed25519" + yaml "gopkg.in/yaml.v2" + + jaegerconfig "github.com/uber/jaeger-client-go/config" + jaegermetrics "github.com/uber/jaeger-lib/metrics" +) + +// Version is the current version of the config format. +// This will change whenever we make breaking changes to the config format. +const Version = 0 + +// Dendrite contains all the config used by a dendrite process. +// Relative paths are resolved relative to the current working directory +type Dendrite struct { + // The version of the configuration file. + // If the version in a file doesn't match the current dendrite config + // version then we can give a clear error message telling the user + // to update their config file to the current version. + // The version of the file should only be different if there has + // been a breaking change to the config file format. + Version int `yaml:"version"` + + // The configuration required for a matrix server. + Matrix struct { + // The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'. + ServerName gomatrixserverlib.ServerName `yaml:"server_name"` + // Path to the private key which will be used to sign requests and events. + PrivateKeyPath Path `yaml:"private_key"` + // The private key which will be used to sign requests and events. + PrivateKey ed25519.PrivateKey `yaml:"-"` + // An arbitrary string used to uniquely identify the PrivateKey. Must start with the + // prefix "ed25519:". + KeyID gomatrixserverlib.KeyID `yaml:"-"` + // List of paths to X509 certificates used by the external federation listeners. + // These are used to calculate the TLS fingerprints to publish for this server. + // Other matrix servers talking to this server will expect the x509 certificate + // to match one of these certificates. + // The certificates should be in PEM format. + FederationCertificatePaths []Path `yaml:"federation_certificates"` + // A list of SHA256 TLS fingerprints for the X509 certificates used by the + // federation listener for this server. + TLSFingerPrints []gomatrixserverlib.TLSFingerprint `yaml:"-"` + // How long a remote server can cache our server key for before requesting it again. + // Increasing this number will reduce the number of requests made by remote servers + // for our key, but increases the period a compromised key will be considered valid + // by remote servers. + // Defaults to 24 hours. + KeyValidityPeriod time.Duration `yaml:"key_validity_period"` + // List of domains that the server will trust as identity servers to + // verify third-party identifiers. + // Defaults to an empty array. + TrustedIDServers []string `yaml:"trusted_third_party_id_servers"` + // If set, allows registration by anyone who also has the shared + // secret, even if registration is otherwise disabled. + RegistrationSharedSecret string `yaml:"registration_shared_secret"` + // This Home Server's ReCAPTCHA public key. + RecaptchaPublicKey string `yaml:"recaptcha_public_key"` + // This Home Server's ReCAPTCHA private key. + RecaptchaPrivateKey string `yaml:"recaptcha_private_key"` + // Boolean stating whether catpcha registration is enabled + // and required + RecaptchaEnabled bool `yaml:"enable_registration_captcha"` + // Secret used to bypass the captcha registration entirely + RecaptchaBypassSecret string `yaml:"captcha_bypass_secret"` + // HTTP API endpoint used to verify whether the captcha response + // was successful + RecaptchaSiteVerifyAPI string `yaml:"recaptcha_siteverify_api"` + // If set disables new users from registering (except via shared + // secrets) + RegistrationDisabled bool `yaml:"registration_disabled"` + // Perspective keyservers, to use as a backup when direct key fetch + // requests don't succeed + KeyPerspectives KeyPerspectives `yaml:"key_perspectives"` + } `yaml:"matrix"` + + // The configuration specific to the media repostitory. + Media struct { + // The base path to where the media files will be stored. May be relative or absolute. + BasePath Path `yaml:"base_path"` + // The absolute base path to where media files will be stored. + AbsBasePath Path `yaml:"-"` + // The maximum file size in bytes that is allowed to be stored on this server. + // Note: if max_file_size_bytes is set to 0, the size is unlimited. + // Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB) + MaxFileSizeBytes *FileSizeBytes `yaml:"max_file_size_bytes,omitempty"` + // Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated + DynamicThumbnails bool `yaml:"dynamic_thumbnails"` + // The maximum number of simultaneous thumbnail generators. default: 10 + MaxThumbnailGenerators int `yaml:"max_thumbnail_generators"` + // A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content + ThumbnailSizes []ThumbnailSize `yaml:"thumbnail_sizes"` + } `yaml:"media"` + + // The configuration to use for Prometheus metrics + Metrics struct { + // Whether or not the metrics are enabled + Enabled bool `yaml:"enabled"` + // Use BasicAuth for Authorization + BasicAuth struct { + // Authorization via Static Username & Password + // Hardcoded Username and Password + Username string `yaml:"username"` + Password string `yaml:"password"` + } `yaml:"basic_auth"` + } `yaml:"metrics"` + + // The configuration for talking to kafka. + Kafka struct { + // A list of kafka addresses to connect to. + Addresses []string `yaml:"addresses"` + // Whether to use naffka instead of kafka. + // Naffka can only be used when running dendrite as a single monolithic server. + // Kafka can be used both with a monolithic server and when running the + // components as separate servers. + UseNaffka bool `yaml:"use_naffka,omitempty"` + // The names of the topics to use when reading and writing from kafka. + Topics struct { + // Topic for roomserver/api.OutputRoomEvent events. + OutputRoomEvent Topic `yaml:"output_room_event"` + // Topic for sending account data from client API to sync API + OutputClientData Topic `yaml:"output_client_data"` + // Topic for eduserver/api.OutputTypingEvent events. + OutputTypingEvent Topic `yaml:"output_typing_event"` + // Topic for user updates (profile, presence) + UserUpdates Topic `yaml:"user_updates"` + } + } `yaml:"kafka"` + + // Postgres Config + Database struct { + // The Account database stores the login details and account information + // for local users. It is accessed by the ClientAPI. + Account DataSource `yaml:"account"` + // The Device database stores session information for the devices of logged + // in local users. It is accessed by the ClientAPI, the MediaAPI and the SyncAPI. + Device DataSource `yaml:"device"` + // The MediaAPI database stores information about files uploaded and downloaded + // by local users. It is only accessed by the MediaAPI. + MediaAPI DataSource `yaml:"media_api"` + // The ServerKey database caches the public keys of remote servers. + // It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI. + ServerKey DataSource `yaml:"server_key"` + // The SyncAPI stores information used by the SyncAPI server. + // It is only accessed by the SyncAPI server. + SyncAPI DataSource `yaml:"sync_api"` + // The RoomServer database stores information about matrix rooms. + // It is only accessed by the RoomServer. + RoomServer DataSource `yaml:"room_server"` + // The FederationSender database stores information used by the FederationSender + // It is only accessed by the FederationSender. + FederationSender DataSource `yaml:"federation_sender"` + // The AppServices database stores information used by the AppService component. + // It is only accessed by the AppService component. + AppService DataSource `yaml:"appservice"` + // The PublicRoomsAPI database stores information used to compute the public + // room directory. It is only accessed by the PublicRoomsAPI server. + PublicRoomsAPI DataSource `yaml:"public_rooms_api"` + // The Naffka database is used internally by the naffka library, if used. + Naffka DataSource `yaml:"naffka,omitempty"` + // Maximum open connections to the DB (0 = use default, negative means unlimited) + MaxOpenConns int `yaml:"max_open_conns"` + // Maximum idle connections to the DB (0 = use default, negative means unlimited) + MaxIdleConns int `yaml:"max_idle_conns"` + // maximum amount of time (in seconds) a connection may be reused (<= 0 means unlimited) + ConnMaxLifetimeSec int `yaml:"conn_max_lifetime"` + } `yaml:"database"` + + // TURN Server Config + TURN struct { + // TODO Guest Support + // Whether or not guests can request TURN credentials + //AllowGuests bool `yaml:"turn_allow_guests"` + // How long the authorization should last + UserLifetime string `yaml:"turn_user_lifetime"` + // The list of TURN URIs to pass to clients + URIs []string `yaml:"turn_uris"` + + // Authorization via Shared Secret + // The shared secret from coturn + SharedSecret string `yaml:"turn_shared_secret"` + + // Authorization via Static Username & Password + // Hardcoded Username and Password + Username string `yaml:"turn_username"` + Password string `yaml:"turn_password"` + } `yaml:"turn"` + + // The internal addresses the components will listen on. + // These should not be exposed externally as they expose metrics and debugging APIs. + // Falls back to addresses listed in Listen if not specified + Bind struct { + MediaAPI Address `yaml:"media_api"` + ClientAPI Address `yaml:"client_api"` + FederationAPI Address `yaml:"federation_api"` + AppServiceAPI Address `yaml:"appservice_api"` + SyncAPI Address `yaml:"sync_api"` + RoomServer Address `yaml:"room_server"` + FederationSender Address `yaml:"federation_sender"` + PublicRoomsAPI Address `yaml:"public_rooms_api"` + EDUServer Address `yaml:"edu_server"` + KeyServer Address `yaml:"key_server"` + } `yaml:"bind"` + + // The addresses for talking to other microservices. + Listen struct { + MediaAPI Address `yaml:"media_api"` + ClientAPI Address `yaml:"client_api"` + FederationAPI Address `yaml:"federation_api"` + AppServiceAPI Address `yaml:"appservice_api"` + SyncAPI Address `yaml:"sync_api"` + RoomServer Address `yaml:"room_server"` + FederationSender Address `yaml:"federation_sender"` + PublicRoomsAPI Address `yaml:"public_rooms_api"` + EDUServer Address `yaml:"edu_server"` + KeyServer Address `yaml:"key_server"` + } `yaml:"listen"` + + // The config for tracing the dendrite servers. + Tracing struct { + // Set to true to enable tracer hooks. If false, no tracing is set up. + Enabled bool `yaml:"enabled"` + // The config for the jaeger opentracing reporter. + Jaeger jaegerconfig.Configuration `yaml:"jaeger"` + } `yaml:"tracing"` + + // Application Services + // https://matrix.org/docs/spec/application_service/unstable.html + ApplicationServices struct { + // Configuration files for various application services + ConfigFiles []string `yaml:"config_files"` + } `yaml:"application_services"` + + // The config for logging informations. Each hook will be added to logrus. + Logging []LogrusHook `yaml:"logging"` + + // Any information derived from the configuration options for later use. + Derived struct { + Registration struct { + // Flows is a slice of flows, which represent one possible way that the client can authenticate a request. + // http://matrix.org/docs/spec/HEAD/client_server/r0.3.0.html#user-interactive-authentication-api + // As long as the generated flows only rely on config file options, + // we can generate them on startup and store them until needed + Flows []authtypes.Flow `json:"flows"` + + // Params that need to be returned to the client during + // registration in order to complete registration stages. + Params map[string]interface{} `json:"params"` + } + + // Application services parsed from their config files + // The paths of which were given above in the main config file + ApplicationServices []ApplicationService + + // Meta-regexes compiled from all exclusive application service + // Regexes. + // + // When a user registers, we check that their username does not match any + // exclusive application service namespaces + ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp + // When a user creates a room alias, we check that it isn't already + // reserved by an application service + ExclusiveApplicationServicesAliasRegexp *regexp.Regexp + // Note: An Exclusive Regex for room ID isn't necessary as we aren't blocking + // servers from creating RoomIDs in exclusive application service namespaces + } `yaml:"-"` +} + +// KeyPerspectives are used to configure perspective key servers for +// retrieving server keys. +type KeyPerspectives []struct { + // The server name of the perspective key server + ServerName gomatrixserverlib.ServerName `yaml:"server_name"` + // Server keys for the perspective user, used to verify the + // keys have been signed by the perspective server + Keys []struct { + // The key ID, e.g. ed25519:auto + KeyID gomatrixserverlib.KeyID `yaml:"key_id"` + // The public key in base64 unpadded format + PublicKey string `yaml:"public_key"` + } `yaml:"keys"` +} + +// A Path on the filesystem. +type Path string + +// A DataSource for opening a postgresql database using lib/pq. +type DataSource string + +// A Topic in kafka. +type Topic string + +// An Address to listen on. +type Address string + +// FileSizeBytes is a file size in bytes +type FileSizeBytes int64 + +// ThumbnailSize contains a single thumbnail size configuration +type ThumbnailSize struct { + // Maximum width of the thumbnail image + Width int `yaml:"width"` + // Maximum height of the thumbnail image + Height int `yaml:"height"` + // ResizeMethod is one of crop or scale. + // crop scales to fill the requested dimensions and crops the excess. + // scale scales to fit the requested dimensions and one dimension may be smaller than requested. + ResizeMethod string `yaml:"method,omitempty"` +} + +// LogrusHook represents a single logrus hook. At this point, only parsing and +// verification of the proper values for type and level are done. +// Validity/integrity checks on the parameters are done when configuring logrus. +type LogrusHook struct { + // The type of hook, currently only "file" is supported. + Type string `yaml:"type"` + + // The level of the logs to produce. Will output only this level and above. + Level string `yaml:"level"` + + // The parameters for this hook. + Params map[string]interface{} `yaml:"params"` +} + +// configErrors stores problems encountered when parsing a config file. +// It implements the error interface. +type configErrors []string + +// Load a yaml config file for a server run as multiple processes. +// Checks the config to ensure that it is valid. +// The checks are different if the server is run as a monolithic process instead +// of being split into multiple components +func Load(configPath string) (*Dendrite, error) { + configData, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + basePath, err := filepath.Abs(".") + if err != nil { + return nil, err + } + // Pass the current working directory and ioutil.ReadFile so that they can + // be mocked in the tests + monolithic := false + return loadConfig(basePath, configData, ioutil.ReadFile, monolithic) +} + +// LoadMonolithic loads a yaml config file for a server run as a single monolith. +// Checks the config to ensure that it is valid. +// The checks are different if the server is run as a monolithic process instead +// of being split into multiple components +func LoadMonolithic(configPath string) (*Dendrite, error) { + configData, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + basePath, err := filepath.Abs(".") + if err != nil { + return nil, err + } + // Pass the current working directory and ioutil.ReadFile so that they can + // be mocked in the tests + monolithic := true + return loadConfig(basePath, configData, ioutil.ReadFile, monolithic) +} + +func loadConfig( + basePath string, + configData []byte, + readFile func(string) ([]byte, error), + monolithic bool, +) (*Dendrite, error) { + var config Dendrite + var err error + if err = yaml.Unmarshal(configData, &config); err != nil { + return nil, err + } + + config.SetDefaults() + + if err = config.check(monolithic); err != nil { + return nil, err + } + + privateKeyPath := absPath(basePath, config.Matrix.PrivateKeyPath) + privateKeyData, err := readFile(privateKeyPath) + if err != nil { + return nil, err + } + + if config.Matrix.KeyID, config.Matrix.PrivateKey, err = readKeyPEM(privateKeyPath, privateKeyData); err != nil { + return nil, err + } + + for _, certPath := range config.Matrix.FederationCertificatePaths { + absCertPath := absPath(basePath, certPath) + var pemData []byte + pemData, err = readFile(absCertPath) + if err != nil { + return nil, err + } + fingerprint := fingerprintPEM(pemData) + if fingerprint == nil { + return nil, fmt.Errorf("no certificate PEM data in %q", absCertPath) + } + config.Matrix.TLSFingerPrints = append(config.Matrix.TLSFingerPrints, *fingerprint) + } + + config.Media.AbsBasePath = Path(absPath(basePath, config.Media.BasePath)) + + // Generate data from config options + err = config.Derive() + if err != nil { + return nil, err + } + + return &config, nil +} + +// Derive generates data that is derived from various values provided in +// the config file. +func (config *Dendrite) Derive() error { + // Determine registrations flows based off config values + + config.Derived.Registration.Params = make(map[string]interface{}) + + // TODO: Add email auth type + // TODO: Add MSISDN auth type + + if config.Matrix.RecaptchaEnabled { + config.Derived.Registration.Params[authtypes.LoginTypeRecaptcha] = map[string]string{"public_key": config.Matrix.RecaptchaPublicKey} + config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, + authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeRecaptcha}}) + } else { + config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, + authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeDummy}}) + } + + // Load application service configuration files + if err := loadAppServices(config); err != nil { + return err + } + + return nil +} + +// SetDefaults sets default config values if they are not explicitly set. +func (config *Dendrite) SetDefaults() { + if config.Matrix.KeyValidityPeriod == 0 { + config.Matrix.KeyValidityPeriod = 24 * time.Hour + } + + if config.Matrix.TrustedIDServers == nil { + config.Matrix.TrustedIDServers = []string{} + } + + if config.Media.MaxThumbnailGenerators == 0 { + config.Media.MaxThumbnailGenerators = 10 + } + + if config.Media.MaxFileSizeBytes == nil { + defaultMaxFileSizeBytes := FileSizeBytes(10485760) + config.Media.MaxFileSizeBytes = &defaultMaxFileSizeBytes + } + + if config.Database.MaxIdleConns == 0 { + config.Database.MaxIdleConns = 2 + } + + if config.Database.MaxOpenConns == 0 { + config.Database.MaxOpenConns = 100 + } + +} + +// Error returns a string detailing how many errors were contained within a +// configErrors type. +func (errs configErrors) Error() string { + if len(errs) == 1 { + return errs[0] + } + return fmt.Sprintf( + "%s (and %d other problems)", errs[0], len(errs)-1, + ) +} + +// Add appends an error to the list of errors in this configErrors. +// It is a wrapper to the builtin append and hides pointers from +// the client code. +// This method is safe to use with an uninitialized configErrors because +// if it is nil, it will be properly allocated. +func (errs *configErrors) Add(str string) { + *errs = append(*errs, str) +} + +// checkNotEmpty verifies the given value is not empty in the configuration. +// If it is, adds an error to the list. +func checkNotEmpty(configErrs *configErrors, key, value string) { + if value == "" { + configErrs.Add(fmt.Sprintf("missing config key %q", key)) + } +} + +// checkNotZero verifies the given value is not zero in the configuration. +// If it is, adds an error to the list. +func checkNotZero(configErrs *configErrors, key string, value int64) { + if value == 0 { + configErrs.Add(fmt.Sprintf("missing config key %q", key)) + } +} + +// checkPositive verifies the given value is positive (zero included) +// in the configuration. If it is not, adds an error to the list. +func checkPositive(configErrs *configErrors, key string, value int64) { + if value < 0 { + configErrs.Add(fmt.Sprintf("invalid value for config key %q: %d", key, value)) + } +} + +// checkTurn verifies the parameters turn.* are valid. +func (config *Dendrite) checkTurn(configErrs *configErrors) { + value := config.TURN.UserLifetime + if value != "" { + if _, err := time.ParseDuration(value); err != nil { + configErrs.Add(fmt.Sprintf("invalid duration for config key %q: %s", "turn.turn_user_lifetime", value)) + } + } +} + +// checkMatrix verifies the parameters matrix.* are valid. +func (config *Dendrite) checkMatrix(configErrs *configErrors) { + checkNotEmpty(configErrs, "matrix.server_name", string(config.Matrix.ServerName)) + checkNotEmpty(configErrs, "matrix.private_key", string(config.Matrix.PrivateKeyPath)) + checkNotZero(configErrs, "matrix.federation_certificates", int64(len(config.Matrix.FederationCertificatePaths))) + if config.Matrix.RecaptchaEnabled { + checkNotEmpty(configErrs, "matrix.recaptcha_public_key", string(config.Matrix.RecaptchaPublicKey)) + checkNotEmpty(configErrs, "matrix.recaptcha_private_key", string(config.Matrix.RecaptchaPrivateKey)) + checkNotEmpty(configErrs, "matrix.recaptcha_siteverify_api", string(config.Matrix.RecaptchaSiteVerifyAPI)) + } +} + +// checkMedia verifies the parameters media.* are valid. +func (config *Dendrite) checkMedia(configErrs *configErrors) { + checkNotEmpty(configErrs, "media.base_path", string(config.Media.BasePath)) + checkPositive(configErrs, "media.max_file_size_bytes", int64(*config.Media.MaxFileSizeBytes)) + checkPositive(configErrs, "media.max_thumbnail_generators", int64(config.Media.MaxThumbnailGenerators)) + + for i, size := range config.Media.ThumbnailSizes { + checkPositive(configErrs, fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width)) + checkPositive(configErrs, fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height)) + } +} + +// checkKafka verifies the parameters kafka.* and the related +// database.naffka are valid. +func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) { + + if config.Kafka.UseNaffka { + if !monolithic { + configErrs.Add(fmt.Sprintf("naffka can only be used in a monolithic server")) + } + + checkNotEmpty(configErrs, "database.naffka", string(config.Database.Naffka)) + } else { + // If we aren't using naffka then we need to have at least one kafka + // server to talk to. + checkNotZero(configErrs, "kafka.addresses", int64(len(config.Kafka.Addresses))) + } + checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) + checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) + checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent)) + checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) +} + +// checkDatabase verifies the parameters database.* are valid. +func (config *Dendrite) checkDatabase(configErrs *configErrors) { + checkNotEmpty(configErrs, "database.account", string(config.Database.Account)) + checkNotEmpty(configErrs, "database.device", string(config.Database.Device)) + checkNotEmpty(configErrs, "database.server_key", string(config.Database.ServerKey)) + checkNotEmpty(configErrs, "database.media_api", string(config.Database.MediaAPI)) + checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI)) + checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer)) +} + +// checkListen verifies the parameters listen.* are valid. +func (config *Dendrite) checkListen(configErrs *configErrors) { + checkNotEmpty(configErrs, "listen.media_api", string(config.Listen.MediaAPI)) + checkNotEmpty(configErrs, "listen.client_api", string(config.Listen.ClientAPI)) + checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI)) + checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI)) + checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer)) + checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer)) +} + +// checkLogging verifies the parameters logging.* are valid. +func (config *Dendrite) checkLogging(configErrs *configErrors) { + for _, logrusHook := range config.Logging { + checkNotEmpty(configErrs, "logging.type", string(logrusHook.Type)) + checkNotEmpty(configErrs, "logging.level", string(logrusHook.Level)) + } +} + +// check returns an error type containing all errors found within the config +// file. +func (config *Dendrite) check(monolithic bool) error { + var configErrs configErrors + + if config.Version != Version { + configErrs.Add(fmt.Sprintf( + "unknown config version %q, expected %q", config.Version, Version, + )) + return configErrs + } + + config.checkMatrix(&configErrs) + config.checkMedia(&configErrs) + config.checkTurn(&configErrs) + config.checkKafka(&configErrs, monolithic) + config.checkDatabase(&configErrs) + config.checkLogging(&configErrs) + + if !monolithic { + config.checkListen(&configErrs) + } + + // Due to how Golang manages its interface types, this condition is not redundant. + // In order to get the proper behaviour, it is necessary to return an explicit nil + // and not a nil configErrors. + // This is because the following equalities hold: + // error(nil) == nil + // error(configErrors(nil)) != nil + if configErrs != nil { + return configErrs + } + return nil +} + +// absPath returns the absolute path for a given relative or absolute path. +func absPath(dir string, path Path) string { + if filepath.IsAbs(string(path)) { + // filepath.Join cleans the path so we should clean the absolute paths as well for consistency. + return filepath.Clean(string(path)) + } + return filepath.Join(dir, string(path)) +} + +func readKeyPEM(path string, data []byte) (gomatrixserverlib.KeyID, ed25519.PrivateKey, error) { + for { + var keyBlock *pem.Block + keyBlock, data = pem.Decode(data) + if data == nil { + return "", nil, fmt.Errorf("no matrix private key PEM data in %q", path) + } + if keyBlock == nil { + return "", nil, fmt.Errorf("keyBlock is nil %q", path) + } + if keyBlock.Type == "MATRIX PRIVATE KEY" { + keyID := keyBlock.Headers["Key-ID"] + if keyID == "" { + return "", nil, fmt.Errorf("missing key ID in PEM data in %q", path) + } + if !strings.HasPrefix(keyID, "ed25519:") { + return "", nil, fmt.Errorf("key ID %q doesn't start with \"ed25519:\" in %q", keyID, path) + } + _, privKey, err := ed25519.GenerateKey(bytes.NewReader(keyBlock.Bytes)) + if err != nil { + return "", nil, err + } + return gomatrixserverlib.KeyID(keyID), privKey, nil + } + } +} + +func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint { + for { + var certDERBlock *pem.Block + certDERBlock, data = pem.Decode(data) + if data == nil { + return nil + } + if certDERBlock.Type == "CERTIFICATE" { + digest := sha256.Sum256(certDERBlock.Bytes) + return &gomatrixserverlib.TLSFingerprint{SHA256: digest[:]} + } + } +} + +// AppServiceURL returns a HTTP URL for where the appservice component is listening. +func (config *Dendrite) AppServiceURL() string { + // Hard code the appservice server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return "http://" + string(config.Listen.AppServiceAPI) +} + +// RoomServerURL returns an HTTP URL for where the roomserver is listening. +func (config *Dendrite) RoomServerURL() string { + // Hard code the roomserver to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return "http://" + string(config.Listen.RoomServer) +} + +// EDUServerURL returns an HTTP URL for where the EDU server is listening. +func (config *Dendrite) EDUServerURL() string { + // Hard code the EDU server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return "http://" + string(config.Listen.EDUServer) +} + +// FederationSenderURL returns an HTTP URL for where the federation sender is listening. +func (config *Dendrite) FederationSenderURL() string { + // Hard code the federation sender server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return "http://" + string(config.Listen.FederationSender) +} + +// SetupTracing configures the opentracing using the supplied configuration. +func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { + if !config.Tracing.Enabled { + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + } + return config.Tracing.Jaeger.InitGlobalTracer( + serviceName, + jaegerconfig.Logger(logrusLogger{logrus.StandardLogger()}), + jaegerconfig.Metrics(jaegermetrics.NullFactory), + ) +} + +// MaxIdleConns returns maximum idle connections to the DB +func (config Dendrite) MaxIdleConns() int { + return config.Database.MaxIdleConns +} + +// MaxOpenConns returns maximum open connections to the DB +func (config Dendrite) MaxOpenConns() int { + return config.Database.MaxOpenConns +} + +// ConnMaxLifetime returns maximum amount of time a connection may be reused +func (config Dendrite) ConnMaxLifetime() time.Duration { + return time.Duration(config.Database.ConnMaxLifetimeSec) * time.Second +} + +// DbProperties functions return properties used by database/sql/DB +type DbProperties interface { + MaxIdleConns() int + MaxOpenConns() int + ConnMaxLifetime() time.Duration +} + +// DbProperties returns cfg as a DbProperties interface +func (config Dendrite) DbProperties() DbProperties { + return config +} + +// logrusLogger is a small wrapper that implements jaeger.Logger using logrus. +type logrusLogger struct { + l *logrus.Logger +} + +func (l logrusLogger) Error(msg string) { + l.l.Error(msg) +} + +func (l logrusLogger) Infof(msg string, args ...interface{}) { + l.l.Infof(msg, args...) +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 00000000..b72f5fad --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,145 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "testing" +) + +func TestLoadConfigRelative(t *testing.T) { + _, err := loadConfig("/my/config/dir", []byte(testConfig), + mockReadFile{ + "/my/config/dir/matrix_key.pem": testKey, + "/my/config/dir/tls_cert.pem": testCert, + }.readFile, + false, + ) + if err != nil { + t.Error("failed to load config:", err) + } +} + +const testConfig = ` +version: 0 +matrix: + server_name: localhost + private_key: matrix_key.pem + federation_certificates: [tls_cert.pem] +media: + base_path: media_store +kafka: + addresses: ["localhost:9092"] + topics: + output_room_event: output.room + output_client_data: output.client + output_typing_event: output.typing + user_updates: output.user +database: + media_api: "postgresql:///media_api" + account: "postgresql:///account" + device: "postgresql:///device" + server_key: "postgresql:///server_keys" + sync_api: "postgresql:///syn_api" + room_server: "postgresql:///room_server" + appservice: "postgresql:///appservice" +listen: + room_server: "localhost:7770" + client_api: "localhost:7771" + federation_api: "localhost:7772" + sync_api: "localhost:7773" + media_api: "localhost:7774" + appservice_api: "localhost:7777" + edu_server: "localhost:7778" +logging: + - type: "file" + level: "info" + params: + path: "/my/log/dir" +` + +type mockReadFile map[string]string + +func (m mockReadFile) readFile(path string) ([]byte, error) { + data, ok := m[path] + if !ok { + return nil, fmt.Errorf("no such file %q", path) + } + return []byte(data), nil +} + +func TestReadKey(t *testing.T) { + keyID, _, err := readKeyPEM("path/to/key", []byte(testKey)) + if err != nil { + t.Error("failed to load private key:", err) + } + wantKeyID := testKeyID + if wantKeyID != string(keyID) { + t.Errorf("wanted key ID to be %q, got %q", wantKeyID, keyID) + } +} + +const testKeyID = "ed25519:c8NsuQ" + +const testKey = ` +-----BEGIN MATRIX PRIVATE KEY----- +Key-ID: ` + testKeyID + ` +7KRZiZ2sTyRR8uqqUjRwczuwRXXkUMYIUHq4Mc3t4bE= +-----END MATRIX PRIVATE KEY----- +` + +func TestFingerprintPEM(t *testing.T) { + got := fingerprintPEM([]byte(testCert)) + if got == nil { + t.Error("failed to calculate fingerprint") + } + if string(got.SHA256) != testCertFingerprint { + t.Errorf("bad fingerprint: wanted %q got %q", got, testCertFingerprint) + } + +} + +const testCertFingerprint = "56.\\SPQxE\xd4\x95\xfb\xf6\xd5\x04\x91\xcb/\x07\xb1^\x88\x08\xe3\xc1p\xdfY\x04\x19w\xcb" + +const testCert = ` +-----BEGIN CERTIFICATE----- +MIIE0zCCArugAwIBAgIJAPype3u24LJeMA0GCSqGSIb3DQEBCwUAMAAwHhcNMTcw +NjEzMTQyODU4WhcNMTgwNjEzMTQyODU4WjAAMIICIjANBgkqhkiG9w0BAQEFAAOC +Ag8AMIICCgKCAgEA3vNSr7lCh/alxPFqairp/PYohwdsqPvOD7zf7dJCNhy0gbdC +9/APwIbPAPL9nU+o9ud1ACNCKBCQin/9LnI5vd5pa/Ne+mmRADDLB/BBBoywSJWG +NSfKJ9n3XY1bjgtqi53uUh+RDdQ7sXudDqCUxiiJZmS7oqK/mp88XXAgCbuXUY29 +GmzbbDz37vntuSxDgUOnJ8uPSvRp5YPKogA3JwW1SyrlLt4Z30CQ6nH3Y2Q5SVfJ +NIQyMrnwyjA9bCdXezv1cLXoTYn7U9BRyzXTZeXs3y3ldnRfISXN35CU04Az1F8j +lfj7nXMEqI/qAj/qhxZ8nVBB+rpNOZy9RJko3O+G5Qa/EvzkQYV1rW4TM2Yme88A +QyJspoV/0bXk6gG987PonK2Uk5djxSULhnGVIqswydyH0Nzb+slRp2bSoWbaNlee ++6TIeiyTQYc055pCHOp22gtLrC5LQGchksi02St2ZzRHdnlfqCJ8S9sS7x3trzds +cYueg1sGI+O8szpQ3eUM7OhJOBrx6OlR7+QYnQg1wr/V+JAz1qcyTC1URcwfeqtg +QjxFdBD9LfCtfK+AO51H9ugtsPJqOh33PmvfvUBEM05OHCA0lNaWJHROGpm4T4cc +YQI9JQk/0lB7itF1qK5RG74qgKdjkBkfZxi0OqkUgHk6YHtJlKfET8zfrtcCAwEA +AaNQME4wHQYDVR0OBBYEFGwb0NgH0Zr7Ga23njEJ85Ozf8M9MB8GA1UdIwQYMBaA +FGwb0NgH0Zr7Ga23njEJ85Ozf8M9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggIBAKU3RHXggbq/pLhGinU5q/9QT0TB/0bBnF1wNFkKQC0FrNJ+ZnBNmusy +oqOn7DEohBCCDxT0kgOC05gLEsGLkSXlVyqCsPFfycCFhtu1QzSRtQNRxB3pW3Wq +4/RFVYv0PGBjVBKxImQlEmXJWEDwemGKqDQZPtqR/FTHTbJcaT0xQr5+1oG6lawt +I/2cW6GQ0kYW/Szps8FgNdSNgVqCjjNIzBYbWhRWMx/63qD1ReUbY7/Yw9KKT8nK +zXERpbTM9k+Pnm0g9Gep+9HJ1dBFJeuTPugKeSeyqg2OJbENw1hxGs/HjBXw7580 +ioiMn/kMj6Tg/f3HCfKrdHHBFQw0/fJW6o17QImYIpPOPzc5RjXBrCJWb34kxqEd +NQdKgejWiV/LlVsguIF8hVZH2kRzvoyypkVUtSUYGmjvA5UXoORQZfJ+b41llq1B +GcSF6iaVbAFKnsUyyr1i9uHz/6Muqflphv/SfZxGheIn5u3PnhXrzDagvItjw0NS +n0Xq64k7fc42HXJpF8CGBkSaIhtlzcruO+vqR80B9r62+D0V7VmHOnP135MT6noU +8F0JQfEtP+I8NII5jHSF/khzSgP5g80LS9tEc2ILnIHK1StkInAoRQQ+/HsQsgbz +ANAf5kxmMsM0zlN2hkxl0H6o7wKlBSw3RI3cjfilXiMWRPJrzlc4 +-----END CERTIFICATE----- +` diff --git a/internal/consumers.go b/internal/consumers.go new file mode 100644 index 00000000..df68cbfa --- /dev/null +++ b/internal/consumers.go @@ -0,0 +1,125 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "fmt" + + "github.com/Shopify/sarama" +) + +// A PartitionOffset is the offset into a partition of the input log. +type PartitionOffset struct { + // The ID of the partition. + Partition int32 + // The offset into the partition. + Offset int64 +} + +// A PartitionStorer has the storage APIs needed by the consumer. +type PartitionStorer interface { + // PartitionOffsets returns the offsets the consumer has reached for each partition. + PartitionOffsets(ctx context.Context, topic string) ([]PartitionOffset, error) + // SetPartitionOffset records where the consumer has reached for a partition. + SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error +} + +// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to +// remember the offset it reached. +type ContinualConsumer struct { + // The kafkaesque topic to consume events from. + // This is the name used in kafka to identify the stream to consume events from. + Topic string + // A kafkaesque stream consumer providing the APIs for talking to the event source. + // The interface is taken from a client library for Apache Kafka. + // But any equivalent event streaming protocol could be made to implement the same interface. + Consumer sarama.Consumer + // A thing which can load and save partition offsets for a topic. + PartitionStore PartitionStorer + // ProcessMessage is a function which will be called for each message in the log. Return an error to + // stop processing messages. See ErrShutdown for specific control signals. + ProcessMessage func(msg *sarama.ConsumerMessage) error + // ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved. + // It is optional. + ShutdownCallback func() +} + +// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer. +var ErrShutdown = fmt.Errorf("shutdown") + +// Start starts the consumer consuming. +// Starts up a goroutine for each partition in the kafka stream. +// Returns nil once all the goroutines are started. +// Returns an error if it can't start consuming for any of the partitions. +func (c *ContinualConsumer) Start() error { + offsets := map[int32]int64{} + + partitions, err := c.Consumer.Partitions(c.Topic) + if err != nil { + return err + } + for _, partition := range partitions { + // Default all the offsets to the beginning of the stream. + offsets[partition] = sarama.OffsetOldest + } + + storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) + if err != nil { + return err + } + for _, offset := range storedOffsets { + // We've already processed events from this partition so advance the offset to where we got to. + // ConsumePartition will start streaming from the message with the given offset (inclusive), + // so increment 1 to avoid getting the same message a second time. + offsets[offset.Partition] = 1 + offset.Offset + } + + var partitionConsumers []sarama.PartitionConsumer + for partition, offset := range offsets { + pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset) + if err != nil { + for _, p := range partitionConsumers { + p.Close() // nolint: errcheck + } + return err + } + partitionConsumers = append(partitionConsumers, pc) + } + for _, pc := range partitionConsumers { + go c.consumePartition(pc) + } + + return nil +} + +// consumePartition consumes the room events for a single partition of the kafkaesque stream. +func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { + defer pc.Close() // nolint: errcheck + for message := range pc.Messages() { + msgErr := c.ProcessMessage(message) + // Advance our position in the stream so that we will start at the right position after a restart. + if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil { + panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err)) + } + // Shutdown if we were told to do so. + if msgErr == ErrShutdown { + if c.ShutdownCallback != nil { + c.ShutdownCallback() + } + return + } + } +} diff --git a/internal/eventcontent.go b/internal/eventcontent.go new file mode 100644 index 00000000..64512836 --- /dev/null +++ b/internal/eventcontent.go @@ -0,0 +1,86 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import "github.com/matrix-org/gomatrixserverlib" + +// NameContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-name +type NameContent struct { + Name string `json:"name"` +} + +// TopicContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-topic +type TopicContent struct { + Topic string `json:"topic"` +} + +// GuestAccessContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-guest-access +type GuestAccessContent struct { + GuestAccess string `json:"guest_access"` +} + +// HistoryVisibilityContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-history-visibility +type HistoryVisibilityContent struct { + HistoryVisibility string `json:"history_visibility"` +} + +// CanonicalAlias is the event content for https://matrix.org/docs/spec/client_server/r0.6.0#m-room-canonical-alias +type CanonicalAlias struct { + Alias string `json:"alias"` +} + +// InitialPowerLevelsContent returns the initial values for m.room.power_levels on room creation +// if they have not been specified. +// http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-power-levels +// https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/handlers/room.py#L294 +func InitialPowerLevelsContent(roomCreator string) (c gomatrixserverlib.PowerLevelContent) { + c.Defaults() + c.Events = map[string]int64{ + "m.room.name": 50, + "m.room.power_levels": 100, + "m.room.history_visibility": 100, + "m.room.canonical_alias": 50, + "m.room.avatar": 50, + "m.room.aliases": 0, // anyone can publish aliases by default. Has to be 0 else state_default is used. + } + c.Users = map[string]int64{roomCreator: 100} + return c +} + +// AliasesContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-aliases +type AliasesContent struct { + Aliases []string `json:"aliases"` +} + +// CanonicalAliasContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-canonical-alias +type CanonicalAliasContent struct { + Alias string `json:"alias"` +} + +// AvatarContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar +type AvatarContent struct { + Info ImageInfo `json:"info,omitempty"` + URL string `json:"url"` + ThumbnailURL string `json:"thumbnail_url,omitempty"` + ThumbnailInfo ImageInfo `json:"thumbnail_info,omitempty"` +} + +// ImageInfo implements the ImageInfo structure from http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar +type ImageInfo struct { + Mimetype string `json:"mimetype"` + Height int64 `json:"h"` + Width int64 `json:"w"` + Size int64 `json:"size"` +} diff --git a/internal/events.go b/internal/events.go new file mode 100644 index 00000000..89c82e03 --- /dev/null +++ b/internal/events.go @@ -0,0 +1,150 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" + + "github.com/matrix-org/gomatrixserverlib" +) + +// ErrRoomNoExists is returned when trying to lookup the state of a room that +// doesn't exist +var ErrRoomNoExists = errors.New("Room does not exist") + +// BuildEvent builds a Matrix event using the event builder and roomserver query +// API client provided. If also fills roomserver query API response (if provided) +// in case the function calling FillBuilder needs to use it. +// Returns ErrRoomNoExists if the state of the room could not be retrieved because +// the room doesn't exist +// Returns an error if something else went wrong +func BuildEvent( + ctx context.Context, + builder *gomatrixserverlib.EventBuilder, cfg *config.Dendrite, evTime time.Time, + rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, +) (*gomatrixserverlib.Event, error) { + if queryRes == nil { + queryRes = &api.QueryLatestEventsAndStateResponse{} + } + + err := AddPrevEventsToEvent(ctx, builder, rsAPI, queryRes) + if err != nil { + // This can pass through a ErrRoomNoExists to the caller + return nil, err + } + + event, err := builder.Build( + evTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, + cfg.Matrix.PrivateKey, queryRes.RoomVersion, + ) + if err != nil { + return nil, err + } + + return &event, nil +} + +// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder +func AddPrevEventsToEvent( + ctx context.Context, + builder *gomatrixserverlib.EventBuilder, + rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, +) error { + eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) + if err != nil { + return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) + } + + if len(eventsNeeded.Tuples()) == 0 { + return errors.New("expecting state tuples for event builder, got none") + } + + // Ask the roomserver for information about this room + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: builder.RoomID, + StateToFetch: eventsNeeded.Tuples(), + } + if err = rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil { + return fmt.Errorf("rsAPI.QueryLatestEventsAndState: %w", err) + } + + if !queryRes.RoomExists { + return ErrRoomNoExists + } + + eventFormat, err := queryRes.RoomVersion.EventFormat() + if err != nil { + return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err) + } + + builder.Depth = queryRes.Depth + + authEvents := gomatrixserverlib.NewAuthEvents(nil) + + for i := range queryRes.StateEvents { + err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) + if err != nil { + return fmt.Errorf("authEvents.AddEvent: %w", err) + } + } + + refs, err := eventsNeeded.AuthEventReferences(&authEvents) + if err != nil { + return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) + } + + truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) + switch eventFormat { + case gomatrixserverlib.EventFormatV1: + builder.AuthEvents = truncAuth + builder.PrevEvents = truncPrev + case gomatrixserverlib.EventFormatV2: + v2AuthRefs, v2PrevRefs := []string{}, []string{} + for _, ref := range truncAuth { + v2AuthRefs = append(v2AuthRefs, ref.EventID) + } + for _, ref := range truncPrev { + v2PrevRefs = append(v2PrevRefs, ref.EventID) + } + builder.AuthEvents = v2AuthRefs + builder.PrevEvents = v2PrevRefs + } + + return nil +} + +// truncateAuthAndPrevEvents limits the number of events we add into +// an event as prev_events or auth_events. +// NOTSPEC: The limits here feel a bit arbitrary but they are currently +// here because of https://github.com/matrix-org/matrix-doc/issues/2307 +// and because Synapse will just drop events that don't comply. +func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( + truncAuth, truncPrev []gomatrixserverlib.EventReference, +) { + truncAuth, truncPrev = auth, prev + if len(truncAuth) > 10 { + truncAuth = truncAuth[:10] + } + if len(truncPrev) > 20 { + truncPrev = truncPrev[:20] + } + return +} diff --git a/internal/http/http.go b/internal/http/http.go new file mode 100644 index 00000000..3c647544 --- /dev/null +++ b/internal/http/http.go @@ -0,0 +1,57 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +// PostJSON performs a POST request with JSON on an internal HTTP API +func PostJSON( + ctx context.Context, span opentracing.Span, httpClient *http.Client, + apiURL string, request, response interface{}, +) error { + jsonBytes, err := json.Marshal(request) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, apiURL, bytes.NewReader(jsonBytes)) + if err != nil { + return err + } + + // Mark the span as being an RPC client. + ext.SpanKindRPCClient.Set(span) + carrier := opentracing.HTTPHeadersCarrier(req.Header) + tracer := opentracing.GlobalTracer() + + if err = tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + res, err := httpClient.Do(req.WithContext(ctx)) + if res != nil { + defer (func() { err = res.Body.Close() })() + } + if err != nil { + return err + } + if res.StatusCode != http.StatusOK { + var errorBody struct { + Message string `json:"message"` + } + if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil { + return err + } + return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message) + } + return json.NewDecoder(res.Body).Decode(response) +} diff --git a/internal/httpapi.go b/internal/httpapi.go new file mode 100644 index 00000000..b4c53f58 --- /dev/null +++ b/internal/httpapi.go @@ -0,0 +1,234 @@ +package internal + +import ( + "io" + "net/http" + "net/http/httptest" + "net/http/httputil" + "os" + "strings" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +// BasicAuth is used for authorization on /metrics handlers +type BasicAuth struct { + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request. +func MakeAuthAPI( + metricsName string, data auth.Data, + f func(*http.Request, *authtypes.Device) util.JSONResponse, +) http.Handler { + h := func(req *http.Request) util.JSONResponse { + device, err := auth.VerifyUserFromRequest(req, data) + if err != nil { + return *err + } + // add the user ID to the logger + logger := util.GetLogger((req.Context())) + logger = logger.WithField("user_id", device.UserID) + req = req.WithContext(util.ContextWithLogger(req.Context(), logger)) + + return f(req, device) + } + return MakeExternalAPI(metricsName, h) +} + +// MakeExternalAPI turns a util.JSONRequestHandler function into an http.Handler. +// This is used for APIs that are called from the internet. +func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler { + // TODO: We shouldn't be directly reading env vars here, inject it in instead. + // Refactor this when we split out config structs. + verbose := false + if os.Getenv("DENDRITE_TRACE_HTTP") == "1" { + verbose = true + } + h := util.MakeJSONAPI(util.NewJSONRequestHandler(f)) + withSpan := func(w http.ResponseWriter, req *http.Request) { + nextWriter := w + if verbose { + logger := logrus.NewEntry(logrus.StandardLogger()) + // Log outgoing response + rec := httptest.NewRecorder() + nextWriter = rec + defer func() { + resp := rec.Result() + dump, err := httputil.DumpResponse(resp, true) + if err != nil { + logger.Debugf("Failed to dump outgoing response: %s", err) + } else { + strSlice := strings.Split(string(dump), "\n") + for _, s := range strSlice { + logger.Debug(s) + } + } + // copy the response to the client + for hdr, vals := range resp.Header { + for _, val := range vals { + w.Header().Add(hdr, val) + } + } + w.WriteHeader(resp.StatusCode) + // discard errors as this is for debugging + _, _ = io.Copy(w, resp.Body) + _ = resp.Body.Close() + }() + + // Log incoming request + dump, err := httputil.DumpRequest(req, true) + if err != nil { + logger.Debugf("Failed to dump incoming request: %s", err) + } else { + strSlice := strings.Split(string(dump), "\n") + for _, s := range strSlice { + logger.Debug(s) + } + } + } + + span := opentracing.StartSpan(metricsName) + defer span.Finish() + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) + h.ServeHTTP(nextWriter, req) + + } + + return http.HandlerFunc(withSpan) +} + +// MakeHTMLAPI adds Span metrics to the HTML Handler function +// This is used to serve HTML alongside JSON error messages +func MakeHTMLAPI(metricsName string, f func(http.ResponseWriter, *http.Request) *util.JSONResponse) http.Handler { + withSpan := func(w http.ResponseWriter, req *http.Request) { + span := opentracing.StartSpan(metricsName) + defer span.Finish() + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) + if err := f(w, req); err != nil { + h := util.MakeJSONAPI(util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { + return *err + })) + h.ServeHTTP(w, req) + } + } + + return promhttp.InstrumentHandlerCounter( + promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: metricsName, + Help: "Total number of http requests for HTML resources", + }, + []string{"code"}, + ), + http.HandlerFunc(withSpan), + ) +} + +// MakeInternalAPI turns a util.JSONRequestHandler function into an http.Handler. +// This is used for APIs that are internal to dendrite. +// If we are passed a tracing context in the request headers then we use that +// as the parent of any tracing spans we create. +func MakeInternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler { + h := util.MakeJSONAPI(util.NewJSONRequestHandler(f)) + withSpan := func(w http.ResponseWriter, req *http.Request) { + carrier := opentracing.HTTPHeadersCarrier(req.Header) + tracer := opentracing.GlobalTracer() + clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier) + var span opentracing.Span + if err == nil { + // Default to a span without RPC context. + span = tracer.StartSpan(metricsName) + } else { + // Set the RPC context. + span = tracer.StartSpan(metricsName, ext.RPCServerOption(clientContext)) + } + defer span.Finish() + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) + h.ServeHTTP(w, req) + } + + return http.HandlerFunc(withSpan) +} + +// MakeFedAPI makes an http.Handler that checks matrix federation authentication. +func MakeFedAPI( + metricsName string, + serverName gomatrixserverlib.ServerName, + keyRing gomatrixserverlib.KeyRing, + f func(*http.Request, *gomatrixserverlib.FederationRequest) util.JSONResponse, +) http.Handler { + h := func(req *http.Request) util.JSONResponse { + fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( + req, time.Now(), serverName, keyRing, + ) + if fedReq == nil { + return errResp + } + return f(req, fedReq) + } + return MakeExternalAPI(metricsName, h) +} + +// SetupHTTPAPI registers an HTTP API mux under /api and sets up a metrics +// listener. +func SetupHTTPAPI(servMux *http.ServeMux, apiMux http.Handler, cfg *config.Dendrite) { + if cfg.Metrics.Enabled { + servMux.Handle("/metrics", WrapHandlerInBasicAuth(promhttp.Handler(), cfg.Metrics.BasicAuth)) + } + servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) +} + +// WrapHandlerInBasicAuth adds basic auth to a handler. Only used for /metrics +func WrapHandlerInBasicAuth(h http.Handler, b BasicAuth) http.HandlerFunc { + if b.Username == "" || b.Password == "" { + logrus.Warn("Metrics are exposed without protection. Make sure you set up protection at proxy level.") + } + return func(w http.ResponseWriter, r *http.Request) { + // Serve without authorization if either Username or Password is unset + if b.Username == "" || b.Password == "" { + h.ServeHTTP(w, r) + return + } + user, pass, ok := r.BasicAuth() + + if !ok || user != b.Username || pass != b.Password { + http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden) + return + } + h.ServeHTTP(w, r) + } +} + +// WrapHandlerInCORS adds CORS headers to all responses, including all error +// responses. +// Handles OPTIONS requests directly. +func WrapHandlerInCORS(h http.Handler) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization") + + if r.Method == http.MethodOptions && r.Header.Get("Access-Control-Request-Method") != "" { + // Its easiest just to always return a 200 OK for everything. Whether + // this is technically correct or not is a question, but in the end this + // is what a lot of other people do (including synapse) and the clients + // are perfectly happy with it. + w.WriteHeader(http.StatusOK) + } else { + h.ServeHTTP(w, r) + } + }) +} diff --git a/internal/httpapi_test.go b/internal/httpapi_test.go new file mode 100644 index 00000000..6f159c8d --- /dev/null +++ b/internal/httpapi_test.go @@ -0,0 +1,95 @@ +package internal + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestWrapHandlerInBasicAuth(t *testing.T) { + type args struct { + h http.Handler + b BasicAuth + } + + dummyHandler := http.HandlerFunc(func(h http.ResponseWriter, r *http.Request) { + h.WriteHeader(http.StatusOK) + }) + + tests := []struct { + name string + args args + want int + reqAuth bool + }{ + { + name: "no user or password setup", + args: args{h: dummyHandler}, + want: http.StatusOK, + reqAuth: false, + }, + { + name: "only user set", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test"}, // no basic auth + }, + want: http.StatusOK, + reqAuth: false, + }, + { + name: "only pass set", + args: args{ + h: dummyHandler, + b: BasicAuth{Password: "test"}, // no basic auth + }, + want: http.StatusOK, + reqAuth: false, + }, + { + name: "credentials correct", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled + }, + want: http.StatusOK, + reqAuth: true, + }, + { + name: "credentials wrong", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test1", Password: "test"}, // basic auth enabled + }, + want: http.StatusForbidden, + reqAuth: true, + }, + { + name: "no basic auth in request", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled + }, + want: http.StatusForbidden, + reqAuth: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baHandler := WrapHandlerInBasicAuth(tt.args.h, tt.args.b) + + req := httptest.NewRequest("GET", "http://localhost/metrics", nil) + if tt.reqAuth { + req.SetBasicAuth("test", "test") + } + + w := httptest.NewRecorder() + baHandler(w, req) + resp := w.Result() + + if resp.StatusCode != tt.want { + t.Errorf("Expected status code %d, got %d", resp.StatusCode, tt.want) + } + }) + } +} diff --git a/internal/keydb/cache/keydb.go b/internal/keydb/cache/keydb.go new file mode 100644 index 00000000..87573ed2 --- /dev/null +++ b/internal/keydb/cache/keydb.go @@ -0,0 +1,69 @@ +package cache + +import ( + "context" + "errors" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/keydb" + "github.com/matrix-org/gomatrixserverlib" +) + +// A Database implements gomatrixserverlib.KeyDatabase and is used to store +// the public keys for other matrix servers. +type KeyDatabase struct { + inner keydb.Database + cache caching.ImmutableCache +} + +func NewKeyDatabase(inner keydb.Database, cache caching.ImmutableCache) (*KeyDatabase, error) { + if inner == nil { + return nil, errors.New("inner database can't be nil") + } + if cache == nil { + return nil, errors.New("cache can't be nil") + } + return &KeyDatabase{ + inner: inner, + cache: cache, + }, nil +} + +// FetcherName implements KeyFetcher +func (d KeyDatabase) FetcherName() string { + return "InMemoryKeyCache" +} + +// FetchKeys implements gomatrixserverlib.KeyDatabase +func (d *KeyDatabase) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + results := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) + for req := range requests { + if res, cached := d.cache.GetServerKey(req); cached { + results[req] = res + delete(requests, req) + } + } + fromDB, err := d.inner.FetchKeys(ctx, requests) + if err != nil { + return results, err + } + for req, res := range fromDB { + results[req] = res + d.cache.StoreServerKey(req, res) + } + return results, nil +} + +// StoreKeys implements gomatrixserverlib.KeyDatabase +func (d *KeyDatabase) StoreKeys( + ctx context.Context, + keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + for req, res := range keyMap { + d.cache.StoreServerKey(req, res) + } + return d.inner.StoreKeys(ctx, keyMap) +} diff --git a/internal/keydb/interface.go b/internal/keydb/interface.go new file mode 100644 index 00000000..c9a20fdd --- /dev/null +++ b/internal/keydb/interface.go @@ -0,0 +1,13 @@ +package keydb + +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" +) + +type Database interface { + FetcherName() string + FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) + StoreKeys(ctx context.Context, keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error +} diff --git a/internal/keydb/keydb.go b/internal/keydb/keydb.go new file mode 100644 index 00000000..ad6d56b8 --- /dev/null +++ b/internal/keydb/keydb.go @@ -0,0 +1,50 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !wasm + +package keydb + +import ( + "net/url" + + "golang.org/x/crypto/ed25519" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/keydb/postgres" + "github.com/matrix-org/dendrite/internal/keydb/sqlite3" + "github.com/matrix-org/gomatrixserverlib" +) + +// NewDatabase opens a database connection. +func NewDatabase( + dataSourceName string, + dbProperties internal.DbProperties, + serverName gomatrixserverlib.ServerName, + serverKey ed25519.PublicKey, + serverKeyID gomatrixserverlib.KeyID, +) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return postgres.NewDatabase(dataSourceName, dbProperties, serverName, serverKey, serverKeyID) + } + switch uri.Scheme { + case "postgres": + return postgres.NewDatabase(dataSourceName, dbProperties, serverName, serverKey, serverKeyID) + case "file": + return sqlite3.NewDatabase(dataSourceName, serverName, serverKey, serverKeyID) + default: + return postgres.NewDatabase(dataSourceName, dbProperties, serverName, serverKey, serverKeyID) + } +} diff --git a/internal/keydb/keydb_wasm.go b/internal/keydb/keydb_wasm.go new file mode 100644 index 00000000..349381ee --- /dev/null +++ b/internal/keydb/keydb_wasm.go @@ -0,0 +1,48 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keydb + +import ( + "fmt" + "net/url" + + "golang.org/x/crypto/ed25519" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/keydb/sqlite3" + "github.com/matrix-org/gomatrixserverlib" +) + +// NewDatabase opens a database connection. +func NewDatabase( + dataSourceName string, + dbProperties internal.DbProperties, // nolint:unparam + serverName gomatrixserverlib.ServerName, + serverKey ed25519.PublicKey, + serverKeyID gomatrixserverlib.KeyID, +) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return nil, err + } + switch uri.Scheme { + case "postgres": + return nil, fmt.Errorf("Cannot use postgres implementation") + case "file": + return sqlite3.NewDatabase(dataSourceName, serverName, serverKey, serverKeyID) + default: + return nil, fmt.Errorf("Cannot use postgres implementation") + } +} diff --git a/internal/keydb/keyring.go b/internal/keydb/keyring.go new file mode 100644 index 00000000..d0b1904e --- /dev/null +++ b/internal/keydb/keyring.go @@ -0,0 +1,74 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keydb + +import ( + "encoding/base64" + + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "golang.org/x/crypto/ed25519" +) + +// CreateKeyRing creates and configures a KeyRing object. +// +// It creates the necessary key fetchers and collects them into a KeyRing +// backed by the given KeyDatabase. +func CreateKeyRing(client gomatrixserverlib.Client, + keyDB gomatrixserverlib.KeyDatabase, + cfg config.KeyPerspectives) gomatrixserverlib.KeyRing { + + fetchers := gomatrixserverlib.KeyRing{ + KeyFetchers: []gomatrixserverlib.KeyFetcher{ + &gomatrixserverlib.DirectKeyFetcher{ + Client: client, + }, + }, + KeyDatabase: keyDB, + } + + logrus.Info("Enabled direct key fetcher") + + var b64e = base64.StdEncoding.WithPadding(base64.NoPadding) + for _, ps := range cfg { + perspective := &gomatrixserverlib.PerspectiveKeyFetcher{ + PerspectiveServerName: ps.ServerName, + PerspectiveServerKeys: map[gomatrixserverlib.KeyID]ed25519.PublicKey{}, + Client: client, + } + + for _, key := range ps.Keys { + rawkey, err := b64e.DecodeString(key.PublicKey) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "server_name": ps.ServerName, + "public_key": key.PublicKey, + }).Warn("Couldn't parse perspective key") + continue + } + perspective.PerspectiveServerKeys[key.KeyID] = rawkey + } + + fetchers.KeyFetchers = append(fetchers.KeyFetchers, perspective) + + logrus.WithFields(logrus.Fields{ + "server_name": ps.ServerName, + "num_public_keys": len(ps.Keys), + }).Info("Enabled perspective key fetcher") + } + + return fetchers +} diff --git a/internal/keydb/postgres/keydb.go b/internal/keydb/postgres/keydb.go new file mode 100644 index 00000000..da3a4d37 --- /dev/null +++ b/internal/keydb/postgres/keydb.go @@ -0,0 +1,115 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "time" + + "golang.org/x/crypto/ed25519" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +// A Database implements gomatrixserverlib.KeyDatabase and is used to store +// the public keys for other matrix servers. +type Database struct { + statements serverKeyStatements +} + +// NewDatabase prepares a new key database. +// It creates the necessary tables if they don't already exist. +// It prepares all the SQL statements that it will use. +// Returns an error if there was a problem talking to the database. +func NewDatabase( + dataSourceName string, + dbProperties internal.DbProperties, + serverName gomatrixserverlib.ServerName, + serverKey ed25519.PublicKey, + serverKeyID gomatrixserverlib.KeyID, +) (*Database, error) { + db, err := sqlutil.Open("postgres", dataSourceName, dbProperties) + if err != nil { + return nil, err + } + d := &Database{} + err = d.statements.prepare(db) + if err != nil { + return nil, err + } + // Store our own keys so that we don't end up making HTTP requests to find our + // own keys + index := gomatrixserverlib.PublicKeyLookupRequest{ + ServerName: serverName, + KeyID: serverKeyID, + } + value := gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: gomatrixserverlib.VerifyKey{ + Key: gomatrixserverlib.Base64String(serverKey), + }, + ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(100 * 365 * 24 * time.Hour)), + ExpiredTS: gomatrixserverlib.PublicKeyNotExpired, + } + err = d.StoreKeys( + context.Background(), + map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{ + index: value, + }, + ) + if err != nil { + return nil, err + } + return d, nil +} + +// FetcherName implements KeyFetcher +func (d Database) FetcherName() string { + return "PostgresKeyDatabase" +} + +// FetchKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + return d.statements.bulkSelectServerKeys(ctx, requests) +} + +// StoreKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) StoreKeys( + ctx context.Context, + keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + // TODO: Inserting all the keys within a single transaction may + // be more efficient since the transaction overhead can be quite + // high for a single insert statement. + var lastErr error + for request, keys := range keyMap { + if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil { + // Rather than returning immediately on error we try to insert the + // remaining keys. + // Since we are inserting the keys outside of a transaction it is + // possible for some of the inserts to succeed even though some + // of the inserts have failed. + // Ensuring that we always insert all the keys we can means that + // this behaviour won't depend on the iteration order of the map. + lastErr = err + } + } + return lastErr +} diff --git a/internal/keydb/postgres/server_key_table.go b/internal/keydb/postgres/server_key_table.go new file mode 100644 index 00000000..b3c26a48 --- /dev/null +++ b/internal/keydb/postgres/server_key_table.go @@ -0,0 +1,144 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + + "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +const serverKeysSchema = ` +-- A cache of signing keys downloaded from remote servers. +CREATE TABLE IF NOT EXISTS keydb_server_keys ( + -- The name of the matrix server the key is for. + server_name TEXT NOT NULL, + -- The ID of the server key. + server_key_id TEXT NOT NULL, + -- Combined server name and key ID separated by the ASCII unit separator + -- to make it easier to run bulk queries. + server_name_and_key_id TEXT NOT NULL, + -- When the key is valid until as a millisecond timestamp. + -- 0 if this is an expired key (in which case expired_ts will be non-zero) + valid_until_ts BIGINT NOT NULL, + -- When the key expired as a millisecond timestamp. + -- 0 if this is an active key (in which case valid_until_ts will be non-zero) + expired_ts BIGINT NOT NULL, + -- The base64-encoded public key. + server_key TEXT NOT NULL, + CONSTRAINT keydb_server_keys_unique UNIQUE (server_name, server_key_id) +); + +CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id); +` + +const bulkSelectServerKeysSQL = "" + + "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " + + " server_key FROM keydb_server_keys" + + " WHERE server_name_and_key_id = ANY($1)" + +const upsertServerKeysSQL = "" + + "INSERT INTO keydb_server_keys (server_name, server_key_id," + + " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + + " ON CONFLICT ON CONSTRAINT keydb_server_keys_unique" + + " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6" + +type serverKeyStatements struct { + bulkSelectServerKeysStmt *sql.Stmt + upsertServerKeysStmt *sql.Stmt +} + +func (s *serverKeyStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(serverKeysSchema) + if err != nil { + return + } + if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerKeysSQL); err != nil { + return + } + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil { + return + } + return +} + +func (s *serverKeyStatements) bulkSelectServerKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + var nameAndKeyIDs []string + for request := range requests { + nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request)) + } + stmt := s.bulkSelectServerKeysStmt + rows, err := stmt.QueryContext(ctx, pq.StringArray(nameAndKeyIDs)) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectServerKeys: rows.close() failed") + results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{} + for rows.Next() { + var serverName string + var keyID string + var key string + var validUntilTS int64 + var expiredTS int64 + if err = rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil { + return nil, err + } + r := gomatrixserverlib.PublicKeyLookupRequest{ + ServerName: gomatrixserverlib.ServerName(serverName), + KeyID: gomatrixserverlib.KeyID(keyID), + } + vk := gomatrixserverlib.VerifyKey{} + err = vk.Key.Decode(key) + if err != nil { + return nil, err + } + results[r] = gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: vk, + ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS), + ExpiredTS: gomatrixserverlib.Timestamp(expiredTS), + } + } + return results, rows.Err() +} + +func (s *serverKeyStatements) upsertServerKeys( + ctx context.Context, + request gomatrixserverlib.PublicKeyLookupRequest, + key gomatrixserverlib.PublicKeyLookupResult, +) error { + _, err := s.upsertServerKeysStmt.ExecContext( + ctx, + string(request.ServerName), + string(request.KeyID), + nameAndKeyID(request), + key.ValidUntilTS, + key.ExpiredTS, + key.Key.Encode(), + ) + return err +} + +func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string { + return string(request.ServerName) + "\x1F" + string(request.KeyID) +} diff --git a/internal/keydb/sqlite3/keydb.go b/internal/keydb/sqlite3/keydb.go new file mode 100644 index 00000000..d1dc61c9 --- /dev/null +++ b/internal/keydb/sqlite3/keydb.go @@ -0,0 +1,116 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "time" + + "golang.org/x/crypto/ed25519" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" + + _ "github.com/mattn/go-sqlite3" +) + +// A Database implements gomatrixserverlib.KeyDatabase and is used to store +// the public keys for other matrix servers. +type Database struct { + statements serverKeyStatements +} + +// NewDatabase prepares a new key database. +// It creates the necessary tables if they don't already exist. +// It prepares all the SQL statements that it will use. +// Returns an error if there was a problem talking to the database. +func NewDatabase( + dataSourceName string, + serverName gomatrixserverlib.ServerName, + serverKey ed25519.PublicKey, + serverKeyID gomatrixserverlib.KeyID, +) (*Database, error) { + db, err := sqlutil.Open(internal.SQLiteDriverName(), dataSourceName, nil) + if err != nil { + return nil, err + } + d := &Database{} + err = d.statements.prepare(db) + if err != nil { + return nil, err + } + // Store our own keys so that we don't end up making HTTP requests to find our + // own keys + index := gomatrixserverlib.PublicKeyLookupRequest{ + ServerName: serverName, + KeyID: serverKeyID, + } + value := gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: gomatrixserverlib.VerifyKey{ + Key: gomatrixserverlib.Base64String(serverKey), + }, + ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(100 * 365 * 24 * time.Hour)), + ExpiredTS: gomatrixserverlib.PublicKeyNotExpired, + } + err = d.StoreKeys( + context.Background(), + map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{ + index: value, + }, + ) + if err != nil { + return nil, err + } + return d, nil +} + +// FetcherName implements KeyFetcher +func (d Database) FetcherName() string { + return "SqliteKeyDatabase" +} + +// FetchKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + return d.statements.bulkSelectServerKeys(ctx, requests) +} + +// StoreKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) StoreKeys( + ctx context.Context, + keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + // TODO: Inserting all the keys within a single transaction may + // be more efficient since the transaction overhead can be quite + // high for a single insert statement. + var lastErr error + for request, keys := range keyMap { + if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil { + // Rather than returning immediately on error we try to insert the + // remaining keys. + // Since we are inserting the keys outside of a transaction it is + // possible for some of the inserts to succeed even though some + // of the inserts have failed. + // Ensuring that we always insert all the keys we can means that + // this behaviour won't depend on the iteration order of the map. + lastErr = err + } + } + return lastErr +} diff --git a/internal/keydb/sqlite3/server_key_table.go b/internal/keydb/sqlite3/server_key_table.go new file mode 100644 index 00000000..ae24a14d --- /dev/null +++ b/internal/keydb/sqlite3/server_key_table.go @@ -0,0 +1,152 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "strings" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/gomatrixserverlib" +) + +const serverKeysSchema = ` +-- A cache of signing keys downloaded from remote servers. +CREATE TABLE IF NOT EXISTS keydb_server_keys ( + -- The name of the matrix server the key is for. + server_name TEXT NOT NULL, + -- The ID of the server key. + server_key_id TEXT NOT NULL, + -- Combined server name and key ID separated by the ASCII unit separator + -- to make it easier to run bulk queries. + server_name_and_key_id TEXT NOT NULL, + -- When the key is valid until as a millisecond timestamp. + -- 0 if this is an expired key (in which case expired_ts will be non-zero) + valid_until_ts BIGINT NOT NULL, + -- When the key expired as a millisecond timestamp. + -- 0 if this is an active key (in which case valid_until_ts will be non-zero) + expired_ts BIGINT NOT NULL, + -- The base64-encoded public key. + server_key TEXT NOT NULL, + UNIQUE (server_name, server_key_id) +); + +CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id); +` + +const bulkSelectServerKeysSQL = "" + + "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " + + " server_key FROM keydb_server_keys" + + " WHERE server_name_and_key_id IN ($1)" + +const upsertServerKeysSQL = "" + + "INSERT INTO keydb_server_keys (server_name, server_key_id," + + " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + + " ON CONFLICT (server_name, server_key_id)" + + " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6" + +type serverKeyStatements struct { + db *sql.DB + bulkSelectServerKeysStmt *sql.Stmt + upsertServerKeysStmt *sql.Stmt +} + +func (s *serverKeyStatements) prepare(db *sql.DB) (err error) { + s.db = db + _, err = db.Exec(serverKeysSchema) + if err != nil { + return + } + if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerKeysSQL); err != nil { + return + } + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil { + return + } + return +} + +func (s *serverKeyStatements) bulkSelectServerKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + var nameAndKeyIDs []string + for request := range requests { + nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request)) + } + + query := strings.Replace(bulkSelectServerKeysSQL, "($1)", internal.QueryVariadic(len(nameAndKeyIDs)), 1) + + iKeyIDs := make([]interface{}, len(nameAndKeyIDs)) + for i, v := range nameAndKeyIDs { + iKeyIDs[i] = v + } + + rows, err := s.db.QueryContext(ctx, query, iKeyIDs...) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectServerKeys: rows.close() failed") + results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{} + for rows.Next() { + var serverName string + var keyID string + var key string + var validUntilTS int64 + var expiredTS int64 + if err = rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil { + return nil, err + } + r := gomatrixserverlib.PublicKeyLookupRequest{ + ServerName: gomatrixserverlib.ServerName(serverName), + KeyID: gomatrixserverlib.KeyID(keyID), + } + vk := gomatrixserverlib.VerifyKey{} + err = vk.Key.Decode(key) + if err != nil { + return nil, err + } + results[r] = gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: vk, + ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS), + ExpiredTS: gomatrixserverlib.Timestamp(expiredTS), + } + } + return results, nil +} + +func (s *serverKeyStatements) upsertServerKeys( + ctx context.Context, + request gomatrixserverlib.PublicKeyLookupRequest, + key gomatrixserverlib.PublicKeyLookupResult, +) error { + _, err := s.upsertServerKeysStmt.ExecContext( + ctx, + string(request.ServerName), + string(request.KeyID), + nameAndKeyID(request), + key.ValidUntilTS, + key.ExpiredTS, + key.Key.Encode(), + ) + return err +} + +func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string { + return string(request.ServerName) + "\x1F" + string(request.KeyID) +} diff --git a/internal/log.go b/internal/log.go new file mode 100644 index 00000000..fd2b84ab --- /dev/null +++ b/internal/log.go @@ -0,0 +1,188 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/matrix-org/util" + + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dugong" + "github.com/sirupsen/logrus" +) + +type utcFormatter struct { + logrus.Formatter +} + +func (f utcFormatter) Format(entry *logrus.Entry) ([]byte, error) { + entry.Time = entry.Time.UTC() + return f.Formatter.Format(entry) +} + +// Logrus hook which wraps another hook and filters log entries according to their level. +// (Note that we cannot use solely logrus.SetLevel, because Dendrite supports multiple +// levels of logging at the same time.) +type logLevelHook struct { + level logrus.Level + logrus.Hook +} + +// Levels returns all the levels supported by this hook. +func (h *logLevelHook) Levels() []logrus.Level { + levels := make([]logrus.Level, 0) + + for _, level := range logrus.AllLevels { + if level <= h.level { + levels = append(levels, level) + } + } + + return levels +} + +// callerPrettyfier is a function that given a runtime.Frame object, will +// extract the calling function's name and file, and return them in a nicely +// formatted way +func callerPrettyfier(f *runtime.Frame) (string, string) { + // Retrieve just the function name + s := strings.Split(f.Function, ".") + funcname := s[len(s)-1] + + // Append a newline + tab to it to move the actual log content to its own line + funcname += "\n\t" + + // Surround the filepath in brackets and append line number so IDEs can quickly + // navigate + filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line) + + return funcname, filename +} + +// SetupPprof starts a pprof listener. We use the DefaultServeMux here because it is +// simplest, and it gives us the freedom to run pprof on a separate port. +func SetupPprof() { + if hostPort := os.Getenv("PPROFLISTEN"); hostPort != "" { + logrus.Warn("Starting pprof on ", hostPort) + go func() { + logrus.WithError(http.ListenAndServe(hostPort, nil)).Error("Failed to setup pprof listener") + }() + } +} + +// SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded. +func SetupStdLogging() { + logrus.SetReportCaller(true) + logrus.SetFormatter(&utcFormatter{ + &logrus.TextFormatter{ + TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00", + FullTimestamp: true, + DisableColors: false, + DisableTimestamp: false, + QuoteEmptyFields: true, + CallerPrettyfier: callerPrettyfier, + }, + }) +} + +// SetupHookLogging configures the logging hooks defined in the configuration. +// If something fails here it means that the logging was improperly configured, +// so we just exit with the error +func SetupHookLogging(hooks []config.LogrusHook, componentName string) { + logrus.SetReportCaller(true) + for _, hook := range hooks { + // Check we received a proper logging level + level, err := logrus.ParseLevel(hook.Level) + if err != nil { + logrus.Fatalf("Unrecognised logging level %s: %q", hook.Level, err) + } + + // Perform a first filter on the logs according to the lowest level of all + // (Eg: If we have hook for info and above, prevent logrus from processing debug logs) + if logrus.GetLevel() < level { + logrus.SetLevel(level) + } + + switch hook.Type { + case "file": + checkFileHookParams(hook.Params) + setupFileHook(hook, level, componentName) + default: + logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type) + } + } +} + +// File type hooks should be provided a path to a directory to store log files +func checkFileHookParams(params map[string]interface{}) { + path, ok := params["path"] + if !ok { + logrus.Fatalf("Expecting a parameter \"path\" for logging hook of type \"file\"") + } + + if _, ok := path.(string); !ok { + logrus.Fatalf("Parameter \"path\" for logging hook of type \"file\" should be a string") + } +} + +// Add a new FSHook to the logger. Each component will log in its own file +func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName string) { + dirPath := (hook.Params["path"]).(string) + fullPath := filepath.Join(dirPath, componentName+".log") + + if err := os.MkdirAll(path.Dir(fullPath), os.ModePerm); err != nil { + logrus.Fatalf("Couldn't create directory %s: %q", path.Dir(fullPath), err) + } + + logrus.AddHook(&logLevelHook{ + level, + dugong.NewFSHook( + fullPath, + &utcFormatter{ + &logrus.TextFormatter{ + TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00", + DisableColors: true, + DisableTimestamp: false, + DisableSorting: false, + QuoteEmptyFields: true, + }, + }, + &dugong.DailyRotationSchedule{GZip: true}, + ), + }) +} + +//CloseAndLogIfError Closes io.Closer and logs the error if any +func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) { + if closer == nil { + return + } + err := closer.Close() + if ctx == nil { + ctx = context.TODO() + } + if err != nil { + util.GetLogger(ctx).WithError(err).Error(message) + } +} diff --git a/internal/partition_offset_table.go b/internal/partition_offset_table.go new file mode 100644 index 00000000..8b72819b --- /dev/null +++ b/internal/partition_offset_table.go @@ -0,0 +1,111 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "database/sql" + "strings" +) + +const partitionOffsetsSchema = ` +-- The offsets that the server has processed up to. +CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets ( + -- The name of the topic. + topic TEXT NOT NULL, + -- The 32-bit partition ID + partition INTEGER NOT NULL, + -- The 64-bit offset. + partition_offset BIGINT NOT NULL, + UNIQUE (topic, partition) +); +` + +const selectPartitionOffsetsSQL = "" + + "SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1" + +const upsertPartitionOffsetsSQL = "" + + "INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + + " ON CONFLICT (topic, partition)" + + " DO UPDATE SET partition_offset = $3" + +// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. +type PartitionOffsetStatements struct { + selectPartitionOffsetsStmt *sql.Stmt + upsertPartitionOffsetStmt *sql.Stmt +} + +// Prepare converts the raw SQL statements into prepared statements. +// Takes a prefix to prepend to the table name used to store the partition offsets. +// This allows multiple components to share the same database schema. +func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) { + _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) + if err != nil { + return + } + if s.selectPartitionOffsetsStmt, err = db.Prepare( + strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1), + ); err != nil { + return + } + if s.upsertPartitionOffsetStmt, err = db.Prepare( + strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1), + ); err != nil { + return + } + return +} + +// PartitionOffsets implements PartitionStorer +func (s *PartitionOffsetStatements) PartitionOffsets( + ctx context.Context, topic string, +) ([]PartitionOffset, error) { + return s.selectPartitionOffsets(ctx, topic) +} + +// SetPartitionOffset implements PartitionStorer +func (s *PartitionOffsetStatements) SetPartitionOffset( + ctx context.Context, topic string, partition int32, offset int64, +) error { + return s.upsertPartitionOffset(ctx, topic, partition, offset) +} + +// selectPartitionOffsets returns all the partition offsets for the given topic. +func (s *PartitionOffsetStatements) selectPartitionOffsets( + ctx context.Context, topic string, +) ([]PartitionOffset, error) { + rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic) + if err != nil { + return nil, err + } + defer CloseAndLogIfError(ctx, rows, "selectPartitionOffsets: rows.close() failed") + var results []PartitionOffset + for rows.Next() { + var offset PartitionOffset + if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { + return nil, err + } + results = append(results, offset) + } + return results, rows.Err() +} + +// UpsertPartitionOffset updates or inserts the partition offset for the given topic. +func (s *PartitionOffsetStatements) upsertPartitionOffset( + ctx context.Context, topic string, partition int32, offset int64, +) error { + _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) + return err +} diff --git a/internal/postgres.go b/internal/postgres.go new file mode 100644 index 00000000..7ae40d8f --- /dev/null +++ b/internal/postgres.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !wasm + +package internal + +import "github.com/lib/pq" + +// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error +func IsUniqueConstraintViolationErr(err error) bool { + pqErr, ok := err.(*pq.Error) + return ok && pqErr.Code == "23505" +} diff --git a/internal/postgres_wasm.go b/internal/postgres_wasm.go new file mode 100644 index 00000000..64d32829 --- /dev/null +++ b/internal/postgres_wasm.go @@ -0,0 +1,22 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build wasm + +package internal + +// IsUniqueConstraintViolationErr no-ops for this architecture +func IsUniqueConstraintViolationErr(err error) bool { + return false +} diff --git a/internal/routing.go b/internal/routing.go new file mode 100644 index 00000000..4462c70c --- /dev/null +++ b/internal/routing.go @@ -0,0 +1,35 @@ +// Copyright 2019 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "net/url" +) + +// URLDecodeMapValues is a function that iterates through each of the items in a +// map, URL decodes the value, and returns a new map with the decoded values +// under the same key names +func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) { + decoded := make(map[string]string, len(vmap)) + for key, value := range vmap { + decodedVal, err := url.PathUnescape(value) + if err != nil { + return make(map[string]string), err + } + decoded[key] = decodedVal + } + + return decoded, nil +} diff --git a/internal/sql.go b/internal/sql.go new file mode 100644 index 00000000..d6a5a308 --- /dev/null +++ b/internal/sql.go @@ -0,0 +1,109 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "database/sql" + "fmt" + "runtime" + "time" +) + +// A Transaction is something that can be committed or rolledback. +type Transaction interface { + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +} + +// EndTransaction ends a transaction. +// If the transaction succeeded then it is committed, otherwise it is rolledback. +// You MUST check the error returned from this function to be sure that the transaction +// was applied correctly. For example, 'database is locked' errors in sqlite will happen here. +func EndTransaction(txn Transaction, succeeded *bool) error { + if *succeeded { + return txn.Commit() // nolint: errcheck + } else { + return txn.Rollback() // nolint: errcheck + } +} + +// WithTransaction runs a block of code passing in an SQL transaction +// If the code returns an error or panics then the transactions is rolledback +// Otherwise the transaction is committed. +func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { + txn, err := db.Begin() + if err != nil { + return + } + succeeded := false + defer func() { + err2 := EndTransaction(txn, &succeeded) + if err == nil && err2 != nil { // failed to commit/rollback + err = err2 + } + }() + + err = fn(txn) + if err != nil { + return + } + + succeeded = true + return +} + +// TxStmt wraps an SQL stmt inside an optional transaction. +// If the transaction is nil then it returns the original statement that will +// run outside of a transaction. +// Otherwise returns a copy of the statement that will run inside the transaction. +func TxStmt(transaction *sql.Tx, statement *sql.Stmt) *sql.Stmt { + if transaction != nil { + statement = transaction.Stmt(statement) + } + return statement +} + +// Hack of the century +func QueryVariadic(count int) string { + return QueryVariadicOffset(count, 0) +} + +func QueryVariadicOffset(count, offset int) string { + str := "(" + for i := 0; i < count; i++ { + str += fmt.Sprintf("$%d", i+offset+1) + if i < (count - 1) { + str += ", " + } + } + str += ")" + return str +} + +func SQLiteDriverName() string { + if runtime.GOOS == "js" { + return "sqlite3_js" + } + return "sqlite3" +} + +// DbProperties functions return properties used by database/sql/DB +type DbProperties interface { + MaxIdleConns() int + MaxOpenConns() int + ConnMaxLifetime() time.Duration +} diff --git a/internal/sqlutil/trace.go b/internal/sqlutil/trace.go index 42ac4e58..1b008e1b 100644 --- a/internal/sqlutil/trace.go +++ b/internal/sqlutil/trace.go @@ -25,7 +25,7 @@ import ( "strings" "time" - "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/internal" "github.com/ngrok/sqlmw" "github.com/sirupsen/logrus" ) @@ -78,7 +78,7 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [ // Open opens a database specified by its database driver name and a driver-specific data source name, // usually consisting of at least a database name and connection information. Includes tracing driver // if DENDRITE_TRACE_SQL=1 -func Open(driverName, dsn string, dbProperties common.DbProperties) (*sql.DB, error) { +func Open(driverName, dsn string, dbProperties internal.DbProperties) (*sql.DB, error) { if tracingEnabled { // install the wrapped driver driverName += "-trace" @@ -87,7 +87,7 @@ func Open(driverName, dsn string, dbProperties common.DbProperties) (*sql.DB, er if err != nil { return nil, err } - if driverName != common.SQLiteDriverName() && dbProperties != nil { + if driverName != internal.SQLiteDriverName() && dbProperties != nil { logrus.WithFields(logrus.Fields{ "MaxOpenConns": dbProperties.MaxOpenConns(), "MaxIdleConns": dbProperties.MaxIdleConns(), diff --git a/internal/test/client.go b/internal/test/client.go new file mode 100644 index 00000000..a38540ac --- /dev/null +++ b/internal/test/client.go @@ -0,0 +1,158 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "crypto/tls" + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +// Request contains the information necessary to issue a request and test its result +type Request struct { + Req *http.Request + WantedBody string + WantedStatusCode int + LastErr *LastRequestErr +} + +// LastRequestErr is a synchronised error wrapper +// Useful for obtaining the last error from a set of requests +type LastRequestErr struct { + sync.Mutex + Err error +} + +// Set sets the error +func (r *LastRequestErr) Set(err error) { + r.Lock() + defer r.Unlock() + r.Err = err +} + +// Get gets the error +func (r *LastRequestErr) Get() error { + r.Lock() + defer r.Unlock() + return r.Err +} + +// CanonicalJSONInput canonicalises a slice of JSON strings +// Useful for test input +func CanonicalJSONInput(jsonData []string) []string { + for i := range jsonData { + jsonBytes, err := gomatrixserverlib.CanonicalJSON([]byte(jsonData[i])) + if err != nil && err != io.EOF { + panic(err) + } + jsonData[i] = string(jsonBytes) + } + return jsonData +} + +// Do issues a request and checks the status code and body of the response +func (r *Request) Do() (err error) { + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + res, err := client.Do(r.Req) + if err != nil { + return err + } + defer (func() { err = res.Body.Close() })() + + if res.StatusCode != r.WantedStatusCode { + return fmt.Errorf("incorrect status code. Expected: %d Got: %d", r.WantedStatusCode, res.StatusCode) + } + + if r.WantedBody != "" { + resBytes, err := ioutil.ReadAll(res.Body) + if err != nil { + return err + } + jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes) + if err != nil { + return err + } + if string(jsonBytes) != r.WantedBody { + return fmt.Errorf("returned wrong bytes. Expected:\n%s\n\nGot:\n%s", r.WantedBody, string(jsonBytes)) + } + } + + return nil +} + +// DoUntilSuccess blocks and repeats the same request until the response returns the desired status code and body. +// It then closes the given channel and returns. +func (r *Request) DoUntilSuccess(done chan error) { + r.LastErr = &LastRequestErr{} + for { + if err := r.Do(); err != nil { + r.LastErr.Set(err) + time.Sleep(1 * time.Second) // don't tightloop + continue + } + close(done) + return + } +} + +// Run repeatedly issues a request until success, error or a timeout is reached +func (r *Request) Run(label string, timeout time.Duration, serverCmdChan chan error) { + fmt.Printf("==TESTING== %v (timeout: %v)\n", label, timeout) + done := make(chan error, 1) + + // We need to wait for the server to: + // - have connected to the database + // - have created the tables + // - be listening on the given port + go r.DoUntilSuccess(done) + + // wait for one of: + // - the test to pass (done channel is closed) + // - the server to exit with an error (error sent on serverCmdChan) + // - our test timeout to expire + // We don't need to clean up since the main() function handles that in the event we panic + select { + case <-time.After(timeout): + fmt.Printf("==TESTING== %v TIMEOUT\n", label) + if reqErr := r.LastErr.Get(); reqErr != nil { + fmt.Println("Last /sync request error:") + fmt.Println(reqErr) + } + panic(fmt.Sprintf("%v server timed out", label)) + case err := <-serverCmdChan: + if err != nil { + fmt.Println("=============================================================================================") + fmt.Printf("%v server failed to run. If failing with 'pq: password authentication failed for user' try:", label) + fmt.Println(" export PGHOST=/var/run/postgresql") + fmt.Println("=============================================================================================") + panic(err) + } + case <-done: + fmt.Printf("==TESTING== %v PASSED\n", label) + } +} diff --git a/internal/test/config.go b/internal/test/config.go new file mode 100644 index 00000000..06510c8b --- /dev/null +++ b/internal/test/config.go @@ -0,0 +1,208 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/base64" + "encoding/pem" + "fmt" + "io/ioutil" + "math/big" + "os" + "path/filepath" + "time" + + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/gomatrixserverlib" + "gopkg.in/yaml.v2" +) + +const ( + // ConfigFile is the name of the config file for a server. + ConfigFile = "dendrite.yaml" + // ServerKeyFile is the name of the file holding the matrix server private key. + ServerKeyFile = "server_key.pem" + // TLSCertFile is the name of the file holding the TLS certificate used for federation. + TLSCertFile = "tls_cert.pem" + // TLSKeyFile is the name of the file holding the TLS key used for federation. + TLSKeyFile = "tls_key.pem" + // MediaDir is the name of the directory used to store media. + MediaDir = "media" +) + +// MakeConfig makes a config suitable for running integration tests. +// Generates new matrix and TLS keys for the server. +func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*config.Dendrite, int, error) { + var cfg config.Dendrite + + port := startPort + assignAddress := func() config.Address { + result := config.Address(fmt.Sprintf("%s:%d", host, port)) + port++ + return result + } + + serverKeyPath := filepath.Join(configDir, ServerKeyFile) + tlsCertPath := filepath.Join(configDir, TLSKeyFile) + tlsKeyPath := filepath.Join(configDir, TLSCertFile) + mediaBasePath := filepath.Join(configDir, MediaDir) + + if err := NewMatrixKey(serverKeyPath); err != nil { + return nil, 0, err + } + + if err := NewTLSKey(tlsKeyPath, tlsCertPath); err != nil { + return nil, 0, err + } + + cfg.Version = config.Version + + cfg.Matrix.ServerName = gomatrixserverlib.ServerName(assignAddress()) + cfg.Matrix.PrivateKeyPath = config.Path(serverKeyPath) + cfg.Matrix.FederationCertificatePaths = []config.Path{config.Path(tlsCertPath)} + + cfg.Media.BasePath = config.Path(mediaBasePath) + + cfg.Kafka.Addresses = []string{kafkaURI} + // TODO: Different servers should be using different topics. + // Make this configurable somehow? + cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" + cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" + cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output" + cfg.Kafka.Topics.UserUpdates = "test.user.output" + + // TODO: Use different databases for the different schemas. + // Using the same database for every schema currently works because + // the table names are globally unique. But we might not want to + // rely on that in the future. + cfg.Database.Account = config.DataSource(database) + cfg.Database.AppService = config.DataSource(database) + cfg.Database.Device = config.DataSource(database) + cfg.Database.MediaAPI = config.DataSource(database) + cfg.Database.RoomServer = config.DataSource(database) + cfg.Database.ServerKey = config.DataSource(database) + cfg.Database.SyncAPI = config.DataSource(database) + cfg.Database.PublicRoomsAPI = config.DataSource(database) + + cfg.Listen.ClientAPI = assignAddress() + cfg.Listen.AppServiceAPI = assignAddress() + cfg.Listen.FederationAPI = assignAddress() + cfg.Listen.MediaAPI = assignAddress() + cfg.Listen.RoomServer = assignAddress() + cfg.Listen.SyncAPI = assignAddress() + cfg.Listen.PublicRoomsAPI = assignAddress() + cfg.Listen.EDUServer = assignAddress() + + // Bind to the same address as the listen address + // All microservices are run on the same host in testing + cfg.Bind.ClientAPI = cfg.Listen.ClientAPI + cfg.Bind.AppServiceAPI = cfg.Listen.AppServiceAPI + cfg.Bind.FederationAPI = cfg.Listen.FederationAPI + cfg.Bind.MediaAPI = cfg.Listen.MediaAPI + cfg.Bind.RoomServer = cfg.Listen.RoomServer + cfg.Bind.SyncAPI = cfg.Listen.SyncAPI + cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI + cfg.Bind.EDUServer = cfg.Listen.EDUServer + + return &cfg, port, nil +} + +// WriteConfig writes the config file to the directory. +func WriteConfig(cfg *config.Dendrite, configDir string) error { + data, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return ioutil.WriteFile(filepath.Join(configDir, ConfigFile), data, 0666) +} + +// NewMatrixKey generates a new ed25519 matrix server key and writes it to a file. +func NewMatrixKey(matrixKeyPath string) (err error) { + var data [35]byte + _, err = rand.Read(data[:]) + if err != nil { + return err + } + keyOut, err := os.OpenFile(matrixKeyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + + defer (func() { + err = keyOut.Close() + })() + + err = pem.Encode(keyOut, &pem.Block{ + Type: "MATRIX PRIVATE KEY", + Headers: map[string]string{ + "Key-ID": "ed25519:" + base64.RawStdEncoding.EncodeToString(data[:3]), + }, + Bytes: data[3:], + }) + return err +} + +const certificateDuration = time.Hour * 24 * 365 * 10 + +// NewTLSKey generates a new RSA TLS key and certificate and writes it to a file. +func NewTLSKey(tlsKeyPath, tlsCertPath string) error { + priv, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return err + } + + notBefore := time.Now() + notAfter := notBefore.Add(certificateDuration) + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return err + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return err + } + certOut, err := os.Create(tlsCertPath) + if err != nil { + return err + } + defer certOut.Close() // nolint: errcheck + if err = pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil { + return err + } + + keyOut, err := os.OpenFile(tlsKeyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + defer keyOut.Close() // nolint: errcheck + err = pem.Encode(keyOut, &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(priv), + }) + return err +} diff --git a/internal/test/kafka.go b/internal/test/kafka.go new file mode 100644 index 00000000..cbf24630 --- /dev/null +++ b/internal/test/kafka.go @@ -0,0 +1,76 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "io" + "os/exec" + "path/filepath" + "strings" +) + +// KafkaExecutor executes kafka scripts. +type KafkaExecutor struct { + // The location of Zookeeper. Typically this is `localhost:2181`. + ZookeeperURI string + // The directory where Kafka is installed to. Used to locate kafka scripts. + KafkaDirectory string + // The location of the Kafka logs. Typically this is `localhost:9092`. + KafkaURI string + // Where stdout and stderr should be written to. Typically this is `os.Stderr`. + OutputWriter io.Writer +} + +// CreateTopic creates a new kafka topic. This is created with a single partition. +func (e *KafkaExecutor) CreateTopic(topic string) error { + cmd := exec.Command( + filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"), + "--create", + "--zookeeper", e.ZookeeperURI, + "--replication-factor", "1", + "--partitions", "1", + "--topic", topic, + ) + cmd.Stdout = e.OutputWriter + cmd.Stderr = e.OutputWriter + return cmd.Run() +} + +// WriteToTopic writes data to a kafka topic. +func (e *KafkaExecutor) WriteToTopic(topic string, data []string) error { + cmd := exec.Command( + filepath.Join(e.KafkaDirectory, "bin", "kafka-console-producer.sh"), + "--broker-list", e.KafkaURI, + "--topic", topic, + ) + cmd.Stdout = e.OutputWriter + cmd.Stderr = e.OutputWriter + cmd.Stdin = strings.NewReader(strings.Join(data, "\n")) + return cmd.Run() +} + +// DeleteTopic deletes a given kafka topic if it exists. +func (e *KafkaExecutor) DeleteTopic(topic string) error { + cmd := exec.Command( + filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"), + "--delete", + "--if-exists", + "--zookeeper", e.ZookeeperURI, + "--topic", topic, + ) + cmd.Stderr = e.OutputWriter + cmd.Stdout = e.OutputWriter + return cmd.Run() +} diff --git a/internal/test/server.go b/internal/test/server.go new file mode 100644 index 00000000..1493dac6 --- /dev/null +++ b/internal/test/server.go @@ -0,0 +1,105 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/matrix-org/dendrite/internal/config" +) + +// Defaulting allows assignment of string variables with a fallback default value +// Useful for use with os.Getenv() for example +func Defaulting(value, defaultValue string) string { + if value == "" { + value = defaultValue + } + return value +} + +// CreateDatabase creates a new database, dropping it first if it exists +func CreateDatabase(command string, args []string, database string) error { + cmd := exec.Command(command, args...) + cmd.Stdin = strings.NewReader( + fmt.Sprintf("DROP DATABASE IF EXISTS %s; CREATE DATABASE %s;", database, database), + ) + // Send stdout and stderr to our stderr so that we see error messages from + // the psql process + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + return cmd.Run() +} + +// CreateBackgroundCommand creates an executable command +// The Cmd being executed is returned. A channel is also returned, +// which will have any termination errors sent down it, followed immediately by the channel being closed. +func CreateBackgroundCommand(command string, args []string) (*exec.Cmd, chan error) { + cmd := exec.Command(command, args...) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stderr + + if err := cmd.Start(); err != nil { + panic("failed to start server: " + err.Error()) + } + cmdChan := make(chan error, 1) + go func() { + cmdChan <- cmd.Wait() + close(cmdChan) + }() + return cmd, cmdChan +} + +// InitDatabase creates the database and config file needed for the server to run +func InitDatabase(postgresDatabase, postgresContainerName string, databases []string) { + if len(databases) > 0 { + var dbCmd string + var dbArgs []string + if postgresContainerName == "" { + dbCmd = "psql" + dbArgs = []string{postgresDatabase} + } else { + dbCmd = "docker" + dbArgs = []string{ + "exec", "-i", postgresContainerName, "psql", "-U", "postgres", postgresDatabase, + } + } + for _, database := range databases { + if err := CreateDatabase(dbCmd, dbArgs, database); err != nil { + panic(err) + } + } + } +} + +// StartProxy creates a reverse proxy +func StartProxy(bindAddr string, cfg *config.Dendrite) (*exec.Cmd, chan error) { + proxyArgs := []string{ + "--bind-address", bindAddr, + "--sync-api-server-url", "http://" + string(cfg.Listen.SyncAPI), + "--client-api-server-url", "http://" + string(cfg.Listen.ClientAPI), + "--media-api-server-url", "http://" + string(cfg.Listen.MediaAPI), + "--public-rooms-api-server-url", "http://" + string(cfg.Listen.PublicRoomsAPI), + "--tls-cert", "server.crt", + "--tls-key", "server.key", + } + return CreateBackgroundCommand( + filepath.Join(filepath.Dir(os.Args[0]), "client-api-proxy"), + proxyArgs, + ) +} diff --git a/internal/test/slice.go b/internal/test/slice.go new file mode 100644 index 00000000..00c740db --- /dev/null +++ b/internal/test/slice.go @@ -0,0 +1,34 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import "sort" + +// UnsortedStringSliceEqual returns true if the slices have same length & elements. +// Does not modify the given slice. +func UnsortedStringSliceEqual(first, second []string) bool { + if len(first) != len(second) { + return false + } + + a, b := first[:], second[:] + sort.Strings(a) + sort.Strings(b) + for i := range a { + if a[i] != b[i] { + return false + } + } + + return true +} diff --git a/internal/transactions/transactions.go b/internal/transactions/transactions.go new file mode 100644 index 00000000..d2eb0f27 --- /dev/null +++ b/internal/transactions/transactions.go @@ -0,0 +1,95 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transactions + +import ( + "sync" + "time" + + "github.com/matrix-org/util" +) + +// DefaultCleanupPeriod represents the default time duration after which cacheCleanService runs. +const DefaultCleanupPeriod time.Duration = 30 * time.Minute + +type txnsMap map[CacheKey]*util.JSONResponse + +// CacheKey is the type for the key in a transactions cache. +// This is needed because the spec requires transaction IDs to have a per-access token scope. +type CacheKey struct { + AccessToken string + TxnID string +} + +// Cache represents a temporary store for response entries. +// Entries are evicted after a certain period, defined by cleanupPeriod. +// This works by keeping two maps of entries, and cycling the maps after the cleanupPeriod. +type Cache struct { + sync.RWMutex + txnsMaps [2]txnsMap + cleanupPeriod time.Duration +} + +// New is a wrapper which calls NewWithCleanupPeriod with DefaultCleanupPeriod as argument. +func New() *Cache { + return NewWithCleanupPeriod(DefaultCleanupPeriod) +} + +// NewWithCleanupPeriod creates a new Cache object, starts cacheCleanService. +// Takes cleanupPeriod as argument. +// Returns a reference to newly created Cache. +func NewWithCleanupPeriod(cleanupPeriod time.Duration) *Cache { + t := Cache{txnsMaps: [2]txnsMap{make(txnsMap), make(txnsMap)}} + t.cleanupPeriod = cleanupPeriod + + // Start clean service as the Cache is created + go cacheCleanService(&t) + return &t +} + +// FetchTransaction looks up an entry for the (accessToken, txnID) tuple in Cache. +// Looks in both the txnMaps. +// Returns (JSON response, true) if txnID is found, else the returned bool is false. +func (t *Cache) FetchTransaction(accessToken, txnID string) (*util.JSONResponse, bool) { + t.RLock() + defer t.RUnlock() + for _, txns := range t.txnsMaps { + res, ok := txns[CacheKey{accessToken, txnID}] + if ok { + return res, true + } + } + return nil, false +} + +// AddTransaction adds an entry for the (accessToken, txnID) tuple in Cache. +// Adds to the front txnMap. +func (t *Cache) AddTransaction(accessToken, txnID string, res *util.JSONResponse) { + t.Lock() + defer t.Unlock() + + t.txnsMaps[0][CacheKey{accessToken, txnID}] = res +} + +// cacheCleanService is responsible for cleaning up entries after cleanupPeriod. +// It guarantees that an entry will be present in cache for at least cleanupPeriod & at most 2 * cleanupPeriod. +// This cycles the txnMaps forward, i.e. back map is assigned the front and front is assigned an empty map. +func cacheCleanService(t *Cache) { + ticker := time.NewTicker(t.cleanupPeriod).C + for range ticker { + t.Lock() + t.txnsMaps[1] = t.txnsMaps[0] + t.txnsMaps[0] = make(txnsMap) + t.Unlock() + } +} diff --git a/internal/transactions/transactions_test.go b/internal/transactions/transactions_test.go new file mode 100644 index 00000000..f565e484 --- /dev/null +++ b/internal/transactions/transactions_test.go @@ -0,0 +1,77 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transactions + +import ( + "net/http" + "testing" + + "github.com/matrix-org/util" +) + +type fakeType struct { + ID string `json:"ID"` +} + +var ( + fakeAccessToken = "aRandomAccessToken" + fakeAccessToken2 = "anotherRandomAccessToken" + fakeTxnID = "aRandomTxnID" + fakeResponse = &util.JSONResponse{ + Code: http.StatusOK, JSON: fakeType{ID: "0"}, + } + fakeResponse2 = &util.JSONResponse{ + Code: http.StatusOK, JSON: fakeType{ID: "1"}, + } +) + +// TestCache creates a New Cache and tests AddTransaction & FetchTransaction +func TestCache(t *testing.T) { + fakeTxnCache := New() + fakeTxnCache.AddTransaction(fakeAccessToken, fakeTxnID, fakeResponse) + + // Add entries for noise. + for i := 1; i <= 100; i++ { + fakeTxnCache.AddTransaction( + fakeAccessToken, + fakeTxnID+string(i), + &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: string(i)}}, + ) + } + + testResponse, ok := fakeTxnCache.FetchTransaction(fakeAccessToken, fakeTxnID) + if !ok { + t.Error("Failed to retrieve entry for txnID: ", fakeTxnID) + } else if testResponse.JSON != fakeResponse.JSON { + t.Error("Fetched response incorrect. Expected: ", fakeResponse.JSON, " got: ", testResponse.JSON) + } +} + +// TestCacheScope ensures transactions with the same transaction ID are not shared +// across multiple access tokens. +func TestCacheScope(t *testing.T) { + cache := New() + cache.AddTransaction(fakeAccessToken, fakeTxnID, fakeResponse) + cache.AddTransaction(fakeAccessToken2, fakeTxnID, fakeResponse2) + + if res, ok := cache.FetchTransaction(fakeAccessToken, fakeTxnID); !ok { + t.Errorf("failed to retrieve entry for (%s, %s)", fakeAccessToken, fakeTxnID) + } else if res.JSON != fakeResponse.JSON { + t.Errorf("Wrong cache entry for (%s, %s). Expected: %v; got: %v", fakeAccessToken, fakeTxnID, fakeResponse.JSON, res.JSON) + } + if res, ok := cache.FetchTransaction(fakeAccessToken2, fakeTxnID); !ok { + t.Errorf("failed to retrieve entry for (%s, %s)", fakeAccessToken, fakeTxnID) + } else if res.JSON != fakeResponse2.JSON { + t.Errorf("Wrong cache entry for (%s, %s). Expected: %v; got: %v", fakeAccessToken, fakeTxnID, fakeResponse2.JSON, res.JSON) + } +} diff --git a/internal/types.go b/internal/types.go new file mode 100644 index 00000000..be2717f3 --- /dev/null +++ b/internal/types.go @@ -0,0 +1,66 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "errors" + "strconv" +) + +// ErrProfileNoExists is returned when trying to lookup a user's profile that +// doesn't exist locally. +var ErrProfileNoExists = errors.New("no known profile for given user ID") + +// AccountData represents account data sent from the client API server to the +// sync API server +type AccountData struct { + RoomID string `json:"room_id"` + Type string `json:"type"` +} + +// ProfileResponse is a struct containing all known user profile data +type ProfileResponse struct { + AvatarURL string `json:"avatar_url"` + DisplayName string `json:"displayname"` +} + +// AvatarURL is a struct containing only the URL to a user's avatar +type AvatarURL struct { + AvatarURL string `json:"avatar_url"` +} + +// DisplayName is a struct containing only a user's display name +type DisplayName struct { + DisplayName string `json:"displayname"` +} + +// WeakBoolean is a type that will Unmarshal to true or false even if the encoded +// representation is "true"/1 or "false"/0, as well as whatever other forms are +// recognised by strconv.ParseBool +type WeakBoolean bool + +// UnmarshalJSON is overridden here to allow strings vaguely representing a true +// or false boolean to be set as their closest counterpart +func (b *WeakBoolean) UnmarshalJSON(data []byte) error { + result, err := strconv.ParseBool(string(data)) + if err != nil { + return err + } + + // Set boolean value based on string input + *b = WeakBoolean(result) + + return nil +} |