diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-02 17:41:00 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-02 17:41:00 +0000 |
commit | b5aa7ca3ab1c91397700637c91d60860a0535f1e (patch) | |
tree | 9da277c7b22027f09a7f45b0b0d771e44949e8f0 /setup | |
parent | 3ef6187e96ca2d68b3014bbd150e69971b6f7800 (diff) |
Top-level setup package (#1605)
* Move config, setup, mscs into "setup" top-level folder
* oops, forgot the EDU server
* Add setup
* goimports
Diffstat (limited to 'setup')
25 files changed, 3840 insertions, 0 deletions
diff --git a/setup/base.go b/setup/base.go new file mode 100644 index 00000000..acbf2d35 --- /dev/null +++ b/setup/base.go @@ -0,0 +1,359 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package setup + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/url" + "time" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/userapi/storage/accounts" + + "github.com/gorilla/mux" + + appserviceAPI "github.com/matrix-org/dendrite/appservice/api" + asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" + eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp" + keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" + keyinthttp "github.com/matrix-org/dendrite/keyserver/inthttp" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp" + "github.com/matrix-org/dendrite/setup/config" + skapi "github.com/matrix-org/dendrite/signingkeyserver/api" + skinthttp "github.com/matrix-org/dendrite/signingkeyserver/inthttp" + userapi "github.com/matrix-org/dendrite/userapi/api" + userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp" + "github.com/sirupsen/logrus" + + _ "net/http/pprof" +) + +// BaseDendrite is a base for creating new instances of dendrite. It parses +// command line flags and config, and exposes methods for creating various +// resources. All errors are handled by logging then exiting, so all methods +// should only be used during start up. +// Must be closed when shutting down. +type BaseDendrite struct { + componentName string + tracerCloser io.Closer + PublicClientAPIMux *mux.Router + PublicFederationAPIMux *mux.Router + PublicKeyAPIMux *mux.Router + PublicMediaAPIMux *mux.Router + InternalAPIMux *mux.Router + UseHTTPAPIs bool + apiHttpClient *http.Client + httpClient *http.Client + Cfg *config.Dendrite + Caches *caching.Caches + // KafkaConsumer sarama.Consumer + // KafkaProducer sarama.SyncProducer +} + +const HTTPServerTimeout = time.Minute * 5 +const HTTPClientTimeout = time.Second * 30 + +const NoListener = "" + +// NewBaseDendrite creates a new instance to be used by a component. +// The componentName is used for logging purposes, and should be a friendly name +// of the compontent running, e.g. "SyncAPI" +func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite { + configErrors := &config.ConfigErrors{} + cfg.Verify(configErrors, componentName == "Monolith") // TODO: better way? + if len(*configErrors) > 0 { + for _, err := range *configErrors { + logrus.Errorf("Configuration error: %s", err) + } + logrus.Fatalf("Failed to start due to configuration errors") + } + + internal.SetupStdLogging() + internal.SetupHookLogging(cfg.Logging, componentName) + internal.SetupPprof() + + logrus.Infof("Dendrite version %s", internal.VersionString()) + + closer, err := cfg.SetupTracing("Dendrite" + componentName) + if err != nil { + logrus.WithError(err).Panicf("failed to start opentracing") + } + + cache, err := caching.NewInMemoryLRUCache(true) + if err != nil { + logrus.WithError(err).Warnf("Failed to create cache") + } + + apiClient := http.Client{ + Timeout: time.Minute * 10, + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { + // Ordinarily HTTP/2 would expect TLS, but the remote listener is + // H2C-enabled (HTTP/2 without encryption). Overriding the DialTLS + // function with a plain Dial allows us to trick the HTTP client + // into establishing a HTTP/2 connection without TLS. + // TODO: Eventually we will want to look at authenticating and + // encrypting these internal HTTP APIs, at which point we will have + // to reconsider H2C and change all this anyway. + return net.Dial(network, addr) + }, + }, + } + client := http.Client{Timeout: HTTPClientTimeout} + if cfg.FederationSender.Proxy.Enabled { + client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{ + Scheme: cfg.FederationSender.Proxy.Protocol, + Host: fmt.Sprintf("%s:%d", cfg.FederationSender.Proxy.Host, cfg.FederationSender.Proxy.Port), + })} + } + + // Ideally we would only use SkipClean on routes which we know can allow '/' but due to + // https://github.com/gorilla/mux/issues/460 we have to attach this at the top router. + // When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing + // path parameters: + // /foo/bar%2Fbaz == [foo, bar%2Fbaz] (from UseEncodedPath) + // /foo/bar%2F%2Fbaz == [foo, bar%2F%2Fbaz] (from SkipClean) + // In particular, rooms v3 event IDs are not urlsafe and can include '/' and because they + // are randomly generated it results in flakey tests. + // We need to be careful with media APIs if they read from a filesystem to make sure they + // are not inadvertently reading paths without cleaning, else this could introduce a + // directory traversal attack e.g /../../../etc/passwd + return &BaseDendrite{ + componentName: componentName, + UseHTTPAPIs: useHTTPAPIs, + tracerCloser: closer, + Cfg: cfg, + Caches: cache, + PublicClientAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicClientPathPrefix).Subrouter().UseEncodedPath(), + PublicFederationAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath(), + PublicKeyAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicKeyPathPrefix).Subrouter().UseEncodedPath(), + PublicMediaAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicMediaPathPrefix).Subrouter().UseEncodedPath(), + InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), + apiHttpClient: &apiClient, + httpClient: &client, + } +} + +// Close implements io.Closer +func (b *BaseDendrite) Close() error { + return b.tracerCloser.Close() +} + +// AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP. +func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI { + a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.apiHttpClient) + if err != nil { + logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed") + } + return a +} + +// RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP. +func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI { + rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.apiHttpClient, b.Caches) + if err != nil { + logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.apiHttpClient) + } + return rsAPI +} + +// UserAPIClient returns UserInternalAPI for hitting the userapi over HTTP. +func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI { + userAPI, err := userapiinthttp.NewUserAPIClient(b.Cfg.UserAPIURL(), b.apiHttpClient) + if err != nil { + logrus.WithError(err).Panic("UserAPIClient failed", b.apiHttpClient) + } + return userAPI +} + +// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP +func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI { + e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.apiHttpClient) + if err != nil { + logrus.WithError(err).Panic("EDUServerClient failed", b.apiHttpClient) + } + return e +} + +// FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting +// the federation sender over HTTP +func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI { + f, err := fsinthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.apiHttpClient) + if err != nil { + logrus.WithError(err).Panic("FederationSenderHTTPClient failed", b.apiHttpClient) + } + return f +} + +// SigningKeyServerHTTPClient returns SigningKeyServer for hitting the signing key server over HTTP +func (b *BaseDendrite) SigningKeyServerHTTPClient() skapi.SigningKeyServerAPI { + f, err := skinthttp.NewSigningKeyServerClient( + b.Cfg.SigningKeyServerURL(), + b.apiHttpClient, + b.Caches, + ) + if err != nil { + logrus.WithError(err).Panic("SigningKeyServerHTTPClient failed", b.httpClient) + } + return f +} + +// KeyServerHTTPClient returns KeyInternalAPI for hitting the key server over HTTP +func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI { + f, err := keyinthttp.NewKeyServerClient(b.Cfg.KeyServerURL(), b.apiHttpClient) + if err != nil { + logrus.WithError(err).Panic("KeyServerHTTPClient failed", b.apiHttpClient) + } + return f +} + +// CreateAccountsDB creates a new instance of the accounts database. Should only +// be called once per component. +func (b *BaseDendrite) CreateAccountsDB() accounts.Database { + db, err := accounts.NewDatabase(&b.Cfg.UserAPI.AccountDatabase, b.Cfg.Global.ServerName) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to accounts db") + } + + return db +} + +// CreateClient creates a new client (normally used for media fetch requests). +// Should only be called once per component. +func (b *BaseDendrite) CreateClient() *gomatrixserverlib.Client { + if b.Cfg.Global.DisableFederation { + return gomatrixserverlib.NewClientWithTransport(noOpHTTPTransport) + } + client := gomatrixserverlib.NewClient( + b.Cfg.FederationSender.DisableTLSValidation, + ) + client.SetUserAgent(fmt.Sprintf("Dendrite/%s", internal.VersionString())) + return client +} + +// CreateFederationClient creates a new federation client. Should only be called +// once per component. +func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient { + if b.Cfg.Global.DisableFederation { + return gomatrixserverlib.NewFederationClientWithTransport( + b.Cfg.Global.ServerName, b.Cfg.Global.KeyID, b.Cfg.Global.PrivateKey, + b.Cfg.FederationSender.DisableTLSValidation, noOpHTTPTransport, + ) + } + client := gomatrixserverlib.NewFederationClientWithTimeout( + b.Cfg.Global.ServerName, b.Cfg.Global.KeyID, b.Cfg.Global.PrivateKey, + b.Cfg.FederationSender.DisableTLSValidation, time.Minute*5, + ) + client.SetUserAgent(fmt.Sprintf("Dendrite/%s", internal.VersionString())) + return client +} + +// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on +// ApiMux under /api/ and adds a prometheus handler under /metrics. +// nolint:gocyclo +func (b *BaseDendrite) SetupAndServeHTTP( + internalHTTPAddr, externalHTTPAddr config.HTTPAddress, + certFile, keyFile *string, +) { + internalAddr, _ := internalHTTPAddr.Address() + externalAddr, _ := externalHTTPAddr.Address() + + externalRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() + internalRouter := externalRouter + + externalServ := &http.Server{ + Addr: string(externalAddr), + WriteTimeout: HTTPServerTimeout, + Handler: externalRouter, + } + internalServ := externalServ + + if internalAddr != NoListener && externalAddr != internalAddr { + // H2C allows us to accept HTTP/2 connections without TLS + // encryption. Since we don't currently require any form of + // authentication or encryption on these internal HTTP APIs, + // H2C gives us all of the advantages of HTTP/2 (such as + // stream multiplexing and avoiding head-of-line blocking) + // without enabling TLS. + internalH2S := &http2.Server{} + internalRouter = mux.NewRouter().SkipClean(true).UseEncodedPath() + internalServ = &http.Server{ + Addr: string(internalAddr), + Handler: h2c.NewHandler(internalRouter, internalH2S), + } + } + + internalRouter.PathPrefix(httputil.InternalPathPrefix).Handler(b.InternalAPIMux) + if b.Cfg.Global.Metrics.Enabled { + internalRouter.Handle("/metrics", httputil.WrapHandlerInBasicAuth(promhttp.Handler(), b.Cfg.Global.Metrics.BasicAuth)) + } + + externalRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(b.PublicClientAPIMux) + if !b.Cfg.Global.DisableFederation { + externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux) + externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(b.PublicFederationAPIMux) + } + externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux) + + if internalAddr != NoListener && internalAddr != externalAddr { + go func() { + logrus.Infof("Starting internal %s listener on %s", b.componentName, internalServ.Addr) + if certFile != nil && keyFile != nil { + if err := internalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil { + logrus.WithError(err).Fatal("failed to serve HTTPS") + } + } else { + if err := internalServ.ListenAndServe(); err != nil { + logrus.WithError(err).Fatal("failed to serve HTTP") + } + } + logrus.Infof("Stopped internal %s listener on %s", b.componentName, internalServ.Addr) + }() + } + + if externalAddr != NoListener { + go func() { + logrus.Infof("Starting external %s listener on %s", b.componentName, externalServ.Addr) + if certFile != nil && keyFile != nil { + if err := externalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil { + logrus.WithError(err).Fatal("failed to serve HTTPS") + } + } else { + if err := externalServ.ListenAndServe(); err != nil { + logrus.WithError(err).Fatal("failed to serve HTTP") + } + } + logrus.Infof("Stopped external %s listener on %s", b.componentName, externalServ.Addr) + }() + } + + select {} +} diff --git a/setup/config/config.go b/setup/config/config.go new file mode 100644 index 00000000..b8b12d0c --- /dev/null +++ b/setup/config/config.go @@ -0,0 +1,572 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "bytes" + "encoding/pem" + "fmt" + "io" + "io/ioutil" + "net/url" + "path/filepath" + "regexp" + "strings" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "golang.org/x/crypto/ed25519" + yaml "gopkg.in/yaml.v2" + + jaegerconfig "github.com/uber/jaeger-client-go/config" + jaegermetrics "github.com/uber/jaeger-lib/metrics" +) + +// keyIDRegexp defines allowable characters in Key IDs. +var keyIDRegexp = regexp.MustCompile("^ed25519:[a-zA-Z0-9_]+$") + +// Version is the current version of the config format. +// This will change whenever we make breaking changes to the config format. +const Version = 1 + +// Dendrite contains all the config used by a dendrite process. +// Relative paths are resolved relative to the current working directory +type Dendrite struct { + // The version of the configuration file. + // If the version in a file doesn't match the current dendrite config + // version then we can give a clear error message telling the user + // to update their config file to the current version. + // The version of the file should only be different if there has + // been a breaking change to the config file format. + Version int `yaml:"version"` + + Global Global `yaml:"global"` + AppServiceAPI AppServiceAPI `yaml:"app_service_api"` + ClientAPI ClientAPI `yaml:"client_api"` + EDUServer EDUServer `yaml:"edu_server"` + FederationAPI FederationAPI `yaml:"federation_api"` + FederationSender FederationSender `yaml:"federation_sender"` + KeyServer KeyServer `yaml:"key_server"` + MediaAPI MediaAPI `yaml:"media_api"` + RoomServer RoomServer `yaml:"room_server"` + SigningKeyServer SigningKeyServer `yaml:"signing_key_server"` + SyncAPI SyncAPI `yaml:"sync_api"` + UserAPI UserAPI `yaml:"user_api"` + + MSCs MSCs `yaml:"mscs"` + + // The config for tracing the dendrite servers. + Tracing struct { + // Set to true to enable tracer hooks. If false, no tracing is set up. + Enabled bool `yaml:"enabled"` + // The config for the jaeger opentracing reporter. + Jaeger jaegerconfig.Configuration `yaml:"jaeger"` + } `yaml:"tracing"` + + // The config for logging informations. Each hook will be added to logrus. + Logging []LogrusHook `yaml:"logging"` + + // Any information derived from the configuration options for later use. + Derived Derived `yaml:"-"` +} + +// TODO: Kill Derived +type Derived struct { + Registration struct { + // Flows is a slice of flows, which represent one possible way that the client can authenticate a request. + // http://matrix.org/docs/spec/HEAD/client_server/r0.3.0.html#user-interactive-authentication-api + // As long as the generated flows only rely on config file options, + // we can generate them on startup and store them until needed + Flows []authtypes.Flow `json:"flows"` + + // Params that need to be returned to the client during + // registration in order to complete registration stages. + Params map[string]interface{} `json:"params"` + } + + // Application services parsed from their config files + // The paths of which were given above in the main config file + ApplicationServices []ApplicationService + + // Meta-regexes compiled from all exclusive application service + // Regexes. + // + // When a user registers, we check that their username does not match any + // exclusive application service namespaces + ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp + // When a user creates a room alias, we check that it isn't already + // reserved by an application service + ExclusiveApplicationServicesAliasRegexp *regexp.Regexp + // Note: An Exclusive Regex for room ID isn't necessary as we aren't blocking + // servers from creating RoomIDs in exclusive application service namespaces +} + +type InternalAPIOptions struct { + Listen HTTPAddress `yaml:"listen"` + Connect HTTPAddress `yaml:"connect"` +} + +type ExternalAPIOptions struct { + Listen HTTPAddress `yaml:"listen"` +} + +// A Path on the filesystem. +type Path string + +// A DataSource for opening a postgresql database using lib/pq. +type DataSource string + +func (d DataSource) IsSQLite() bool { + return strings.HasPrefix(string(d), "file:") +} + +func (d DataSource) IsPostgres() bool { + // commented line may not always be true? + // return strings.HasPrefix(string(d), "postgres:") + return !d.IsSQLite() +} + +// A Topic in kafka. +type Topic string + +// An Address to listen on. +type Address string + +// An HTTPAddress to listen on, starting with either http:// or https://. +type HTTPAddress string + +func (h HTTPAddress) Address() (Address, error) { + url, err := url.Parse(string(h)) + if err != nil { + return "", err + } + return Address(url.Host), nil +} + +// FileSizeBytes is a file size in bytes +type FileSizeBytes int64 + +// ThumbnailSize contains a single thumbnail size configuration +type ThumbnailSize struct { + // Maximum width of the thumbnail image + Width int `yaml:"width"` + // Maximum height of the thumbnail image + Height int `yaml:"height"` + // ResizeMethod is one of crop or scale. + // crop scales to fill the requested dimensions and crops the excess. + // scale scales to fit the requested dimensions and one dimension may be smaller than requested. + ResizeMethod string `yaml:"method,omitempty"` +} + +// LogrusHook represents a single logrus hook. At this point, only parsing and +// verification of the proper values for type and level are done. +// Validity/integrity checks on the parameters are done when configuring logrus. +type LogrusHook struct { + // The type of hook, currently only "file" is supported. + Type string `yaml:"type"` + + // The level of the logs to produce. Will output only this level and above. + Level string `yaml:"level"` + + // The parameters for this hook. + Params map[string]interface{} `yaml:"params"` +} + +// ConfigErrors stores problems encountered when parsing a config file. +// It implements the error interface. +type ConfigErrors []string + +// Load a yaml config file for a server run as multiple processes or as a monolith. +// Checks the config to ensure that it is valid. +func Load(configPath string, monolith bool) (*Dendrite, error) { + configData, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + basePath, err := filepath.Abs(".") + if err != nil { + return nil, err + } + // Pass the current working directory and ioutil.ReadFile so that they can + // be mocked in the tests + return loadConfig(basePath, configData, ioutil.ReadFile, monolith) +} + +func loadConfig( + basePath string, + configData []byte, + readFile func(string) ([]byte, error), + monolithic bool, +) (*Dendrite, error) { + var c Dendrite + c.Defaults() + + var err error + if err = yaml.Unmarshal(configData, &c); err != nil { + return nil, err + } + + if err = c.check(monolithic); err != nil { + return nil, err + } + + privateKeyPath := absPath(basePath, c.Global.PrivateKeyPath) + privateKeyData, err := readFile(privateKeyPath) + if err != nil { + return nil, err + } + + if c.Global.KeyID, c.Global.PrivateKey, err = readKeyPEM(privateKeyPath, privateKeyData, true); err != nil { + return nil, err + } + + for i, oldPrivateKey := range c.Global.OldVerifyKeys { + var oldPrivateKeyData []byte + + oldPrivateKeyPath := absPath(basePath, oldPrivateKey.PrivateKeyPath) + oldPrivateKeyData, err = readFile(oldPrivateKeyPath) + if err != nil { + return nil, err + } + + // NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are + // a number of private keys out there with non-compatible symbols in them due + // to lack of validation in Synapse, we won't enforce that for old verify keys. + keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false) + if perr != nil { + return nil, perr + } + + c.Global.OldVerifyKeys[i].KeyID, c.Global.OldVerifyKeys[i].PrivateKey = keyID, privateKey + } + + c.MediaAPI.AbsBasePath = Path(absPath(basePath, c.MediaAPI.BasePath)) + + // Generate data from config options + err = c.Derive() + if err != nil { + return nil, err + } + + c.Wiring() + return &c, nil +} + +// Derive generates data that is derived from various values provided in +// the config file. +func (config *Dendrite) Derive() error { + // Determine registrations flows based off config values + + config.Derived.Registration.Params = make(map[string]interface{}) + + // TODO: Add email auth type + // TODO: Add MSISDN auth type + + if config.ClientAPI.RecaptchaEnabled { + config.Derived.Registration.Params[authtypes.LoginTypeRecaptcha] = map[string]string{"public_key": config.ClientAPI.RecaptchaPublicKey} + config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, + authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeRecaptcha}}) + } else { + config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, + authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeDummy}}) + } + + // Load application service configuration files + if err := loadAppServices(&config.AppServiceAPI, &config.Derived); err != nil { + return err + } + + return nil +} + +// SetDefaults sets default config values if they are not explicitly set. +func (c *Dendrite) Defaults() { + c.Version = 1 + + c.Global.Defaults() + c.ClientAPI.Defaults() + c.EDUServer.Defaults() + c.FederationAPI.Defaults() + c.FederationSender.Defaults() + c.KeyServer.Defaults() + c.MediaAPI.Defaults() + c.RoomServer.Defaults() + c.SigningKeyServer.Defaults() + c.SyncAPI.Defaults() + c.UserAPI.Defaults() + c.AppServiceAPI.Defaults() + c.MSCs.Defaults() + + c.Wiring() +} + +func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { + type verifiable interface { + Verify(configErrs *ConfigErrors, isMonolith bool) + } + for _, c := range []verifiable{ + &c.Global, &c.ClientAPI, + &c.EDUServer, &c.FederationAPI, &c.FederationSender, + &c.KeyServer, &c.MediaAPI, &c.RoomServer, + &c.SigningKeyServer, &c.SyncAPI, &c.UserAPI, + &c.AppServiceAPI, &c.MSCs, + } { + c.Verify(configErrs, isMonolith) + } +} + +func (c *Dendrite) Wiring() { + c.ClientAPI.Matrix = &c.Global + c.EDUServer.Matrix = &c.Global + c.FederationAPI.Matrix = &c.Global + c.FederationSender.Matrix = &c.Global + c.KeyServer.Matrix = &c.Global + c.MediaAPI.Matrix = &c.Global + c.RoomServer.Matrix = &c.Global + c.SigningKeyServer.Matrix = &c.Global + c.SyncAPI.Matrix = &c.Global + c.UserAPI.Matrix = &c.Global + c.AppServiceAPI.Matrix = &c.Global + c.MSCs.Matrix = &c.Global + + c.ClientAPI.Derived = &c.Derived + c.AppServiceAPI.Derived = &c.Derived +} + +// Error returns a string detailing how many errors were contained within a +// configErrors type. +func (errs ConfigErrors) Error() string { + if len(errs) == 1 { + return errs[0] + } + return fmt.Sprintf( + "%s (and %d other problems)", errs[0], len(errs)-1, + ) +} + +// Add appends an error to the list of errors in this configErrors. +// It is a wrapper to the builtin append and hides pointers from +// the client code. +// This method is safe to use with an uninitialized configErrors because +// if it is nil, it will be properly allocated. +func (errs *ConfigErrors) Add(str string) { + *errs = append(*errs, str) +} + +// checkNotEmpty verifies the given value is not empty in the configuration. +// If it is, adds an error to the list. +func checkNotEmpty(configErrs *ConfigErrors, key, value string) { + if value == "" { + configErrs.Add(fmt.Sprintf("missing config key %q", key)) + } +} + +// checkNotZero verifies the given value is not zero in the configuration. +// If it is, adds an error to the list. +func checkNotZero(configErrs *ConfigErrors, key string, value int64) { + if value == 0 { + configErrs.Add(fmt.Sprintf("missing config key %q", key)) + } +} + +// checkPositive verifies the given value is positive (zero included) +// in the configuration. If it is not, adds an error to the list. +func checkPositive(configErrs *ConfigErrors, key string, value int64) { + if value < 0 { + configErrs.Add(fmt.Sprintf("invalid value for config key %q: %d", key, value)) + } +} + +// checkURL verifies that the parameter is a valid URL +func checkURL(configErrs *ConfigErrors, key, value string) { + if value == "" { + configErrs.Add(fmt.Sprintf("missing config key %q", key)) + return + } + url, err := url.Parse(value) + if err != nil { + configErrs.Add(fmt.Sprintf("config key %q contains invalid URL (%s)", key, err.Error())) + return + } + switch url.Scheme { + case "http": + case "https": + default: + configErrs.Add(fmt.Sprintf("config key %q URL should be http:// or https://", key)) + return + } +} + +// checkLogging verifies the parameters logging.* are valid. +func (config *Dendrite) checkLogging(configErrs *ConfigErrors) { + for _, logrusHook := range config.Logging { + checkNotEmpty(configErrs, "logging.type", string(logrusHook.Type)) + checkNotEmpty(configErrs, "logging.level", string(logrusHook.Level)) + } +} + +// check returns an error type containing all errors found within the config +// file. +func (config *Dendrite) check(_ bool) error { // monolithic + var configErrs ConfigErrors + + if config.Version != Version { + configErrs.Add(fmt.Sprintf( + "unknown config version %q, expected %q", config.Version, Version, + )) + return configErrs + } + + config.checkLogging(&configErrs) + + // Due to how Golang manages its interface types, this condition is not redundant. + // In order to get the proper behaviour, it is necessary to return an explicit nil + // and not a nil configErrors. + // This is because the following equalities hold: + // error(nil) == nil + // error(configErrors(nil)) != nil + if configErrs != nil { + return configErrs + } + return nil +} + +// absPath returns the absolute path for a given relative or absolute path. +func absPath(dir string, path Path) string { + if filepath.IsAbs(string(path)) { + // filepath.Join cleans the path so we should clean the absolute paths as well for consistency. + return filepath.Clean(string(path)) + } + return filepath.Join(dir, string(path)) +} + +func readKeyPEM(path string, data []byte, enforceKeyIDFormat bool) (gomatrixserverlib.KeyID, ed25519.PrivateKey, error) { + for { + var keyBlock *pem.Block + keyBlock, data = pem.Decode(data) + if data == nil { + return "", nil, fmt.Errorf("no matrix private key PEM data in %q", path) + } + if keyBlock == nil { + return "", nil, fmt.Errorf("keyBlock is nil %q", path) + } + if keyBlock.Type == "MATRIX PRIVATE KEY" { + keyID := keyBlock.Headers["Key-ID"] + if keyID == "" { + return "", nil, fmt.Errorf("missing key ID in PEM data in %q", path) + } + if !strings.HasPrefix(keyID, "ed25519:") { + return "", nil, fmt.Errorf("key ID %q doesn't start with \"ed25519:\" in %q", keyID, path) + } + if enforceKeyIDFormat && !keyIDRegexp.MatchString(keyID) { + return "", nil, fmt.Errorf("key ID %q in %q contains illegal characters (use a-z, A-Z, 0-9 and _ only)", keyID, path) + } + _, privKey, err := ed25519.GenerateKey(bytes.NewReader(keyBlock.Bytes)) + if err != nil { + return "", nil, err + } + return gomatrixserverlib.KeyID(keyID), privKey, nil + } + } +} + +// AppServiceURL returns a HTTP URL for where the appservice component is listening. +func (config *Dendrite) AppServiceURL() string { + // Hard code the appservice server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.AppServiceAPI.InternalAPI.Connect) +} + +// RoomServerURL returns an HTTP URL for where the roomserver is listening. +func (config *Dendrite) RoomServerURL() string { + // Hard code the roomserver to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.RoomServer.InternalAPI.Connect) +} + +// UserAPIURL returns an HTTP URL for where the userapi is listening. +func (config *Dendrite) UserAPIURL() string { + // Hard code the userapi to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.UserAPI.InternalAPI.Connect) +} + +// EDUServerURL returns an HTTP URL for where the EDU server is listening. +func (config *Dendrite) EDUServerURL() string { + // Hard code the EDU server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.EDUServer.InternalAPI.Connect) +} + +// FederationSenderURL returns an HTTP URL for where the federation sender is listening. +func (config *Dendrite) FederationSenderURL() string { + // Hard code the federation sender server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.FederationSender.InternalAPI.Connect) +} + +// SigningKeyServerURL returns an HTTP URL for where the signing key server is listening. +func (config *Dendrite) SigningKeyServerURL() string { + // Hard code the signing key server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.SigningKeyServer.InternalAPI.Connect) +} + +// KeyServerURL returns an HTTP URL for where the key server is listening. +func (config *Dendrite) KeyServerURL() string { + // Hard code the key server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.KeyServer.InternalAPI.Connect) +} + +// SetupTracing configures the opentracing using the supplied configuration. +func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { + if !config.Tracing.Enabled { + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + } + return config.Tracing.Jaeger.InitGlobalTracer( + serviceName, + jaegerconfig.Logger(logrusLogger{logrus.StandardLogger()}), + jaegerconfig.Metrics(jaegermetrics.NullFactory), + ) +} + +// logrusLogger is a small wrapper that implements jaeger.Logger using logrus. +type logrusLogger struct { + l *logrus.Logger +} + +func (l logrusLogger) Error(msg string) { + l.l.Error(msg) +} + +func (l logrusLogger) Infof(msg string, args ...interface{}) { + l.l.Infof(msg, args...) +} diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go new file mode 100644 index 00000000..a042691d --- /dev/null +++ b/setup/config/config_appservice.go @@ -0,0 +1,353 @@ +// Copyright 2017 Andrew Morgan <andrew@amorgan.xyz> +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "io/ioutil" + "path/filepath" + "regexp" + "strings" + + log "github.com/sirupsen/logrus" + yaml "gopkg.in/yaml.v2" +) + +type AppServiceAPI struct { + Matrix *Global `yaml:"-"` + Derived *Derived `yaml:"-"` // TODO: Nuke Derived from orbit + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + Database DatabaseOptions `yaml:"database"` + + ConfigFiles []string `yaml:"config_files"` +} + +func (c *AppServiceAPI) Defaults() { + c.InternalAPI.Listen = "http://localhost:7777" + c.InternalAPI.Connect = "http://localhost:7777" + c.Database.Defaults() + c.Database.ConnectionString = "file:appservice.db" +} + +func (c *AppServiceAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "app_service_api.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "app_service_api.internal_api.bind", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "app_service_api.database.connection_string", string(c.Database.ConnectionString)) +} + +// ApplicationServiceNamespace is the namespace that a specific application +// service has management over. +type ApplicationServiceNamespace struct { + // Whether or not the namespace is managed solely by this application service + Exclusive bool `yaml:"exclusive"` + // A regex pattern that represents the namespace + Regex string `yaml:"regex"` + // The ID of an existing group that all users of this application service will + // be added to. This field is only relevant to the `users` namespace. + // Note that users who are joined to this group through an application service + // are not to be listed when querying for the group's members, however the + // group should be listed when querying an application service user's groups. + // This is to prevent making spamming all users of an application service + // trivial. + GroupID string `yaml:"group_id"` + // Regex object representing our pattern. Saves having to recompile every time + RegexpObject *regexp.Regexp +} + +// ApplicationService represents a Matrix application service. +// https://matrix.org/docs/spec/application_service/unstable.html +type ApplicationService struct { + // User-defined, unique, persistent ID of the application service + ID string `yaml:"id"` + // Base URL of the application service + URL string `yaml:"url"` + // Application service token provided in requests to a homeserver + ASToken string `yaml:"as_token"` + // Homeserver token provided in requests to an application service + HSToken string `yaml:"hs_token"` + // Localpart of application service user + SenderLocalpart string `yaml:"sender_localpart"` + // Information about an application service's namespaces. Key is either + // "users", "aliases" or "rooms" + NamespaceMap map[string][]ApplicationServiceNamespace `yaml:"namespaces"` + // Whether rate limiting is applied to each application service user + RateLimited bool `yaml:"rate_limited"` + // Any custom protocols that this application service provides (e.g. IRC) + Protocols []string `yaml:"protocols"` +} + +// IsInterestedInRoomID returns a bool on whether an application service's +// namespace includes the given room ID +func (a *ApplicationService) IsInterestedInRoomID( + roomID string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["rooms"]; ok { + for _, namespace := range namespaceSlice { + if namespace.RegexpObject.MatchString(roomID) { + return true + } + } + } + + return false +} + +// IsInterestedInUserID returns a bool on whether an application service's +// namespace includes the given user ID +func (a *ApplicationService) IsInterestedInUserID( + userID string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["users"]; ok { + for _, namespace := range namespaceSlice { + if namespace.RegexpObject.MatchString(userID) { + return true + } + } + } + + return false +} + +// OwnsNamespaceCoveringUserId returns a bool on whether an application service's +// namespace is exclusive and includes the given user ID +func (a *ApplicationService) OwnsNamespaceCoveringUserId( + userID string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["users"]; ok { + for _, namespace := range namespaceSlice { + if namespace.Exclusive && namespace.RegexpObject.MatchString(userID) { + return true + } + } + } + + return false +} + +// IsInterestedInRoomAlias returns a bool on whether an application service's +// namespace includes the given room alias +func (a *ApplicationService) IsInterestedInRoomAlias( + roomAlias string, +) bool { + if namespaceSlice, ok := a.NamespaceMap["aliases"]; ok { + for _, namespace := range namespaceSlice { + if namespace.RegexpObject.MatchString(roomAlias) { + return true + } + } + } + + return false +} + +// loadAppServices iterates through all application service config files +// and loads their data into the config object for later access. +func loadAppServices(config *AppServiceAPI, derived *Derived) error { + for _, configPath := range config.ConfigFiles { + // Create a new application service with default options + appservice := ApplicationService{ + RateLimited: true, + } + + // Create an absolute path from a potentially relative path + absPath, err := filepath.Abs(configPath) + if err != nil { + return err + } + + // Read the application service's config file + configData, err := ioutil.ReadFile(absPath) + if err != nil { + return err + } + + // Load the config data into our struct + if err = yaml.UnmarshalStrict(configData, &appservice); err != nil { + return err + } + + // Append the parsed application service to the global config + derived.ApplicationServices = append( + derived.ApplicationServices, appservice, + ) + } + + // Check for any errors in the loaded application services + return checkErrors(config, derived) +} + +// setupRegexps will create regex objects for exclusive and non-exclusive +// usernames, aliases and rooms of all application services, so that other +// methods can quickly check if a particular string matches any of them. +func setupRegexps(_ *AppServiceAPI, derived *Derived) (err error) { + // Combine all exclusive namespaces for later string checking + var exclusiveUsernameStrings, exclusiveAliasStrings []string + + // If an application service's regex is marked as exclusive, add + // its contents to the overall exlusive regex string. Room regex + // not necessary as we aren't denying exclusive room ID creation + for _, appservice := range derived.ApplicationServices { + for key, namespaceSlice := range appservice.NamespaceMap { + switch key { + case "users": + appendExclusiveNamespaceRegexs(&exclusiveUsernameStrings, namespaceSlice) + case "aliases": + appendExclusiveNamespaceRegexs(&exclusiveAliasStrings, namespaceSlice) + } + } + } + + // Join the regexes together into one big regex. + // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" + // Later we can check if a username or alias matches any exclusive regex and + // deny access if it isn't from an application service + exclusiveUsernames := strings.Join(exclusiveUsernameStrings, "|") + exclusiveAliases := strings.Join(exclusiveAliasStrings, "|") + + // If there are no exclusive regexes, compile string so that it will not match + // any valid usernames/aliases/roomIDs + if exclusiveUsernames == "" { + exclusiveUsernames = "^$" + } + if exclusiveAliases == "" { + exclusiveAliases = "^$" + } + + // Store compiled Regex + if derived.ExclusiveApplicationServicesUsernameRegexp, err = regexp.Compile(exclusiveUsernames); err != nil { + return err + } + if derived.ExclusiveApplicationServicesAliasRegexp, err = regexp.Compile(exclusiveAliases); err != nil { + return err + } + + return nil +} + +// appendExclusiveNamespaceRegexs takes a slice of strings and a slice of +// namespaces and will append the regexes of only the exclusive namespaces +// into the string slice +func appendExclusiveNamespaceRegexs( + exclusiveStrings *[]string, namespaces []ApplicationServiceNamespace, +) { + for index, namespace := range namespaces { + if namespace.Exclusive { + // We append parenthesis to later separate each regex when we compile + // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" + *exclusiveStrings = append(*exclusiveStrings, "("+namespace.Regex+")") + } + + // Compile this regex into a Regexp object for later use + namespaces[index].RegexpObject, _ = regexp.Compile(namespace.Regex) + } +} + +// checkErrors checks for any configuration errors amongst the loaded +// application services according to the application service spec. +func checkErrors(config *AppServiceAPI, derived *Derived) (err error) { + var idMap = make(map[string]bool) + var tokenMap = make(map[string]bool) + + // Compile regexp object for checking groupIDs + groupIDRegexp := regexp.MustCompile(`\+.*:.*`) + + // Check each application service for any config errors + for _, appservice := range derived.ApplicationServices { + // Namespace-related checks + for key, namespaceSlice := range appservice.NamespaceMap { + for _, namespace := range namespaceSlice { + if err := validateNamespace(&appservice, key, &namespace, groupIDRegexp); err != nil { + return err + } + } + } + + // Check if the url has trailing /'s. If so, remove them + appservice.URL = strings.TrimRight(appservice.URL, "/") + + // Check if we've already seen this ID. No two application services + // can have the same ID or token. + if idMap[appservice.ID] { + return ConfigErrors([]string{fmt.Sprintf( + "Application service ID %s must be unique", appservice.ID, + )}) + } + // Check if we've already seen this token + if tokenMap[appservice.ASToken] { + return ConfigErrors([]string{fmt.Sprintf( + "Application service Token %s must be unique", appservice.ASToken, + )}) + } + + // Add the id/token to their respective maps if we haven't already + // seen them. + idMap[appservice.ID] = true + tokenMap[appservice.ASToken] = true + + // TODO: Remove once rate_limited is implemented + if appservice.RateLimited { + log.Warn("WARNING: Application service option rate_limited is currently unimplemented") + } + // TODO: Remove once protocols is implemented + if len(appservice.Protocols) > 0 { + log.Warn("WARNING: Application service option protocols is currently unimplemented") + } + } + + return setupRegexps(config, derived) +} + +// validateNamespace returns nil or an error based on whether a given +// application service namespace is valid. A namespace is valid if it has the +// required fields, and its regex is correct. +func validateNamespace( + appservice *ApplicationService, + key string, + namespace *ApplicationServiceNamespace, + groupIDRegexp *regexp.Regexp, +) error { + // Check that namespace(s) are valid regex + if !IsValidRegex(namespace.Regex) { + return ConfigErrors([]string{fmt.Sprintf( + "Invalid regex string for Application Service %s", appservice.ID, + )}) + } + + // Check if GroupID for the users namespace is in the correct format + if key == "users" && namespace.GroupID != "" { + // TODO: Remove once group_id is implemented + log.Warn("WARNING: Application service option group_id is currently unimplemented") + + correctFormat := groupIDRegexp.MatchString(namespace.GroupID) + if !correctFormat { + return ConfigErrors([]string{fmt.Sprintf( + "Invalid user group_id field for application service %s.", + appservice.ID, + )}) + } + } + + return nil +} + +// IsValidRegex returns true or false based on whether the +// given string is valid regex or not +func IsValidRegex(regexString string) bool { + _, err := regexp.Compile(regexString) + + return err == nil +} diff --git a/setup/config/config_clientapi.go b/setup/config/config_clientapi.go new file mode 100644 index 00000000..52115491 --- /dev/null +++ b/setup/config/config_clientapi.go @@ -0,0 +1,123 @@ +package config + +import ( + "fmt" + "time" +) + +type ClientAPI struct { + Matrix *Global `yaml:"-"` + Derived *Derived `yaml:"-"` // TODO: Nuke Derived from orbit + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + ExternalAPI ExternalAPIOptions `yaml:"external_api"` + + // If set disables new users from registering (except via shared + // secrets) + RegistrationDisabled bool `yaml:"registration_disabled"` + // If set, allows registration by anyone who also has the shared + // secret, even if registration is otherwise disabled. + RegistrationSharedSecret string `yaml:"registration_shared_secret"` + + // Boolean stating whether catpcha registration is enabled + // and required + RecaptchaEnabled bool `yaml:"enable_registration_captcha"` + // This Home Server's ReCAPTCHA public key. + RecaptchaPublicKey string `yaml:"recaptcha_public_key"` + // This Home Server's ReCAPTCHA private key. + RecaptchaPrivateKey string `yaml:"recaptcha_private_key"` + // Secret used to bypass the captcha registration entirely + RecaptchaBypassSecret string `yaml:"recaptcha_bypass_secret"` + // HTTP API endpoint used to verify whether the captcha response + // was successful + RecaptchaSiteVerifyAPI string `yaml:"recaptcha_siteverify_api"` + + // TURN options + TURN TURN `yaml:"turn"` + + // Rate-limiting options + RateLimiting RateLimiting `yaml:"rate_limiting"` +} + +func (c *ClientAPI) Defaults() { + c.InternalAPI.Listen = "http://localhost:7771" + c.InternalAPI.Connect = "http://localhost:7771" + c.ExternalAPI.Listen = "http://[::]:8071" + c.RegistrationSharedSecret = "" + c.RecaptchaPublicKey = "" + c.RecaptchaPrivateKey = "" + c.RecaptchaEnabled = false + c.RecaptchaBypassSecret = "" + c.RecaptchaSiteVerifyAPI = "" + c.RegistrationDisabled = false + c.RateLimiting.Defaults() +} + +func (c *ClientAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "client_api.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "client_api.internal_api.connect", string(c.InternalAPI.Connect)) + if !isMonolith { + checkURL(configErrs, "client_api.external_api.listen", string(c.ExternalAPI.Listen)) + } + if c.RecaptchaEnabled { + checkNotEmpty(configErrs, "client_api.recaptcha_public_key", string(c.RecaptchaPublicKey)) + checkNotEmpty(configErrs, "client_api.recaptcha_private_key", string(c.RecaptchaPrivateKey)) + checkNotEmpty(configErrs, "client_api.recaptcha_siteverify_api", string(c.RecaptchaSiteVerifyAPI)) + } + c.TURN.Verify(configErrs) + c.RateLimiting.Verify(configErrs) +} + +type TURN struct { + // TODO Guest Support + // Whether or not guests can request TURN credentials + // AllowGuests bool `yaml:"turn_allow_guests"` + // How long the authorization should last + UserLifetime string `yaml:"turn_user_lifetime"` + // The list of TURN URIs to pass to clients + URIs []string `yaml:"turn_uris"` + + // Authorization via Shared Secret + // The shared secret from coturn + SharedSecret string `yaml:"turn_shared_secret"` + + // Authorization via Static Username & Password + // Hardcoded Username and Password + Username string `yaml:"turn_username"` + Password string `yaml:"turn_password"` +} + +func (c *TURN) Verify(configErrs *ConfigErrors) { + value := c.UserLifetime + if value != "" { + if _, err := time.ParseDuration(value); err != nil { + configErrs.Add(fmt.Sprintf("invalid duration for config key %q: %s", "client_api.turn.turn_user_lifetime", value)) + } + } +} + +type RateLimiting struct { + // Is rate limiting enabled or disabled? + Enabled bool `yaml:"enabled"` + + // How many "slots" a user can occupy sending requests to a rate-limited + // endpoint before we apply rate-limiting + Threshold int64 `yaml:"threshold"` + + // The cooloff period in milliseconds after a request before the "slot" + // is freed again + CooloffMS int64 `yaml:"cooloff_ms"` +} + +func (r *RateLimiting) Verify(configErrs *ConfigErrors) { + if r.Enabled { + checkPositive(configErrs, "client_api.rate_limiting.threshold", r.Threshold) + checkPositive(configErrs, "client_api.rate_limiting.cooloff_ms", r.CooloffMS) + } +} + +func (r *RateLimiting) Defaults() { + r.Enabled = true + r.Threshold = 5 + r.CooloffMS = 500 +} diff --git a/setup/config/config_eduserver.go b/setup/config/config_eduserver.go new file mode 100644 index 00000000..a2ff3697 --- /dev/null +++ b/setup/config/config_eduserver.go @@ -0,0 +1,17 @@ +package config + +type EDUServer struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` +} + +func (c *EDUServer) Defaults() { + c.InternalAPI.Listen = "http://localhost:7778" + c.InternalAPI.Connect = "http://localhost:7778" +} + +func (c *EDUServer) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "edu_server.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "edu_server.internal_api.connect", string(c.InternalAPI.Connect)) +} diff --git a/setup/config/config_federationapi.go b/setup/config/config_federationapi.go new file mode 100644 index 00000000..64803d95 --- /dev/null +++ b/setup/config/config_federationapi.go @@ -0,0 +1,31 @@ +package config + +type FederationAPI struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + ExternalAPI ExternalAPIOptions `yaml:"external_api"` + + // List of paths to X509 certificates used by the external federation listeners. + // These are used to calculate the TLS fingerprints to publish for this server. + // Other matrix servers talking to this server will expect the x509 certificate + // to match one of these certificates. + // The certificates should be in PEM format. + FederationCertificatePaths []Path `yaml:"federation_certificates"` +} + +func (c *FederationAPI) Defaults() { + c.InternalAPI.Listen = "http://localhost:7772" + c.InternalAPI.Connect = "http://localhost:7772" + c.ExternalAPI.Listen = "http://[::]:8072" +} + +func (c *FederationAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "federation_api.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "federation_api.internal_api.connect", string(c.InternalAPI.Connect)) + if !isMonolith { + checkURL(configErrs, "federation_api.external_api.listen", string(c.ExternalAPI.Listen)) + } + // TODO: not applicable always, e.g. in demos + //checkNotZero(configErrs, "federation_api.federation_certificates", int64(len(c.FederationCertificatePaths))) +} diff --git a/setup/config/config_federationsender.go b/setup/config/config_federationsender.go new file mode 100644 index 00000000..84f5b6f4 --- /dev/null +++ b/setup/config/config_federationsender.go @@ -0,0 +1,63 @@ +package config + +type FederationSender struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + // The FederationSender database stores information used by the FederationSender + // It is only accessed by the FederationSender. + Database DatabaseOptions `yaml:"database"` + + // Federation failure threshold. How many consecutive failures that we should + // tolerate when sending federation requests to a specific server. The backoff + // is 2**x seconds, so 1 = 2 seconds, 2 = 4 seconds, 3 = 8 seconds, etc. + // The default value is 16 if not specified, which is circa 18 hours. + FederationMaxRetries uint32 `yaml:"send_max_retries"` + + // FederationDisableTLSValidation disables the validation of X.509 TLS certs + // on remote federation endpoints. This is not recommended in production! + DisableTLSValidation bool `yaml:"disable_tls_validation"` + + Proxy Proxy `yaml:"proxy_outbound"` +} + +func (c *FederationSender) Defaults() { + c.InternalAPI.Listen = "http://localhost:7775" + c.InternalAPI.Connect = "http://localhost:7775" + c.Database.Defaults() + c.Database.ConnectionString = "file:federationsender.db" + + c.FederationMaxRetries = 16 + c.DisableTLSValidation = false + + c.Proxy.Defaults() +} + +func (c *FederationSender) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "federation_sender.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "federation_sender.internal_api.connect", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "federation_sender.database.connection_string", string(c.Database.ConnectionString)) +} + +// The config for setting a proxy to use for server->server requests +type Proxy struct { + // Is the proxy enabled? + Enabled bool `yaml:"enabled"` + // The protocol for the proxy (http / https / socks5) + Protocol string `yaml:"protocol"` + // The host where the proxy is listening + Host string `yaml:"host"` + // The port on which the proxy is listening + Port uint16 `yaml:"port"` +} + +func (c *Proxy) Defaults() { + c.Enabled = false + c.Protocol = "http" + c.Host = "localhost" + c.Port = 8080 +} + +func (c *Proxy) Verify(configErrs *ConfigErrors) { +} diff --git a/setup/config/config_global.go b/setup/config/config_global.go new file mode 100644 index 00000000..95652217 --- /dev/null +++ b/setup/config/config_global.go @@ -0,0 +1,142 @@ +package config + +import ( + "math/rand" + "time" + + "github.com/matrix-org/gomatrixserverlib" + "golang.org/x/crypto/ed25519" +) + +type Global struct { + // The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'. + ServerName gomatrixserverlib.ServerName `yaml:"server_name"` + + // Path to the private key which will be used to sign requests and events. + PrivateKeyPath Path `yaml:"private_key"` + + // The private key which will be used to sign requests and events. + PrivateKey ed25519.PrivateKey `yaml:"-"` + + // An arbitrary string used to uniquely identify the PrivateKey. Must start with the + // prefix "ed25519:". + KeyID gomatrixserverlib.KeyID `yaml:"-"` + + // Information about old private keys that used to be used to sign requests and + // events on this domain. They will not be used but will be advertised to other + // servers that ask for them to help verify old events. + OldVerifyKeys []OldVerifyKeys `yaml:"old_private_keys"` + + // How long a remote server can cache our server key for before requesting it again. + // Increasing this number will reduce the number of requests made by remote servers + // for our key, but increases the period a compromised key will be considered valid + // by remote servers. + // Defaults to 24 hours. + KeyValidityPeriod time.Duration `yaml:"key_validity_period"` + + // Disables federation. Dendrite will not be able to make any outbound HTTP requests + // to other servers and the federation API will not be exposed. + DisableFederation bool `yaml:"disable_federation"` + + // List of domains that the server will trust as identity servers to + // verify third-party identifiers. + // Defaults to an empty array. + TrustedIDServers []string `yaml:"trusted_third_party_id_servers"` + + // Kafka/Naffka configuration + Kafka Kafka `yaml:"kafka"` + + // Metrics configuration + Metrics Metrics `yaml:"metrics"` +} + +func (c *Global) Defaults() { + c.ServerName = "localhost" + c.PrivateKeyPath = "matrix_key.pem" + _, c.PrivateKey, _ = ed25519.GenerateKey(rand.New(rand.NewSource(0))) + c.KeyID = "ed25519:auto" + c.KeyValidityPeriod = time.Hour * 24 * 7 + + c.Kafka.Defaults() + c.Metrics.Defaults() +} + +func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkNotEmpty(configErrs, "global.server_name", string(c.ServerName)) + checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath)) + + c.Kafka.Verify(configErrs, isMonolith) + c.Metrics.Verify(configErrs, isMonolith) +} + +type OldVerifyKeys struct { + // Path to the private key. + PrivateKeyPath Path `yaml:"private_key"` + + // The private key itself. + PrivateKey ed25519.PrivateKey `yaml:"-"` + + // The key ID of the private key. + KeyID gomatrixserverlib.KeyID `yaml:"-"` + + // When the private key was designed as "expired", as a UNIX timestamp + // in millisecond precision. + ExpiredAt gomatrixserverlib.Timestamp `yaml:"expired_at"` +} + +// The configuration to use for Prometheus metrics +type Metrics struct { + // Whether or not the metrics are enabled + Enabled bool `yaml:"enabled"` + // Use BasicAuth for Authorization + BasicAuth struct { + // Authorization via Static Username & Password + // Hardcoded Username and Password + Username string `yaml:"username"` + Password string `yaml:"password"` + } `yaml:"basic_auth"` +} + +func (c *Metrics) Defaults() { + c.Enabled = false + c.BasicAuth.Username = "metrics" + c.BasicAuth.Password = "metrics" +} + +func (c *Metrics) Verify(configErrs *ConfigErrors, isMonolith bool) { +} + +type DatabaseOptions struct { + // The connection string, file:filename.db or postgres://server.... + ConnectionString DataSource `yaml:"connection_string"` + // Maximum open connections to the DB (0 = use default, negative means unlimited) + MaxOpenConnections int `yaml:"max_open_conns"` + // Maximum idle connections to the DB (0 = use default, negative means unlimited) + MaxIdleConnections int `yaml:"max_idle_conns"` + // maximum amount of time (in seconds) a connection may be reused (<= 0 means unlimited) + ConnMaxLifetimeSeconds int `yaml:"conn_max_lifetime"` +} + +func (c *DatabaseOptions) Defaults() { + c.MaxOpenConnections = 100 + c.MaxIdleConnections = 2 + c.ConnMaxLifetimeSeconds = -1 +} + +func (c *DatabaseOptions) Verify(configErrs *ConfigErrors, isMonolith bool) { +} + +// MaxIdleConns returns maximum idle connections to the DB +func (c DatabaseOptions) MaxIdleConns() int { + return c.MaxIdleConnections +} + +// MaxOpenConns returns maximum open connections to the DB +func (c DatabaseOptions) MaxOpenConns() int { + return c.MaxOpenConnections +} + +// ConnMaxLifetime returns maximum amount of time a connection may be reused +func (c DatabaseOptions) ConnMaxLifetime() time.Duration { + return time.Duration(c.ConnMaxLifetimeSeconds) * time.Second +} diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go new file mode 100644 index 00000000..aa91e558 --- /dev/null +++ b/setup/config/config_kafka.go @@ -0,0 +1,61 @@ +package config + +import "fmt" + +// Defined Kafka topics. +const ( + TopicOutputTypingEvent = "OutputTypingEvent" + TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" + TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" + TopicOutputRoomEvent = "OutputRoomEvent" + TopicOutputClientData = "OutputClientData" + TopicOutputReceiptEvent = "OutputReceiptEvent" +) + +type Kafka struct { + // A list of kafka addresses to connect to. + Addresses []string `yaml:"addresses"` + // The prefix to use for Kafka topic names for this homeserver - really only + // useful if running more than one Dendrite on the same Kafka deployment. + TopicPrefix string `yaml:"topic_prefix"` + // Whether to use naffka instead of kafka. + // Naffka can only be used when running dendrite as a single monolithic server. + // Kafka can be used both with a monolithic server and when running the + // components as separate servers. + UseNaffka bool `yaml:"use_naffka"` + // The Naffka database is used internally by the naffka library, if used. + Database DatabaseOptions `yaml:"naffka_database"` + // The max size a Kafka message passed between consumer/producer can have + // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka + MaxMessageBytes *int `yaml:"max_message_bytes"` +} + +func (k *Kafka) TopicFor(name string) string { + return fmt.Sprintf("%s%s", k.TopicPrefix, name) +} + +func (c *Kafka) Defaults() { + c.UseNaffka = true + c.Database.Defaults() + c.Addresses = []string{"localhost:2181"} + c.Database.ConnectionString = DataSource("file:naffka.db") + c.TopicPrefix = "Dendrite" + + maxBytes := 1024 * 1024 * 8 // about 8MB + c.MaxMessageBytes = &maxBytes +} + +func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { + if c.UseNaffka { + if !isMonolith { + configErrs.Add("naffka can only be used in a monolithic server") + } + checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) + } else { + // If we aren't using naffka then we need to have at least one kafka + // server to talk to. + checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) + } + checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) + checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) +} diff --git a/setup/config/config_keyserver.go b/setup/config/config_keyserver.go new file mode 100644 index 00000000..89162300 --- /dev/null +++ b/setup/config/config_keyserver.go @@ -0,0 +1,22 @@ +package config + +type KeyServer struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + Database DatabaseOptions `yaml:"database"` +} + +func (c *KeyServer) Defaults() { + c.InternalAPI.Listen = "http://localhost:7779" + c.InternalAPI.Connect = "http://localhost:7779" + c.Database.Defaults() + c.Database.ConnectionString = "file:keyserver.db" +} + +func (c *KeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "key_server.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "key_server.internal_api.bind", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "key_server.database.connection_string", string(c.Database.ConnectionString)) +} diff --git a/setup/config/config_mediaapi.go b/setup/config/config_mediaapi.go new file mode 100644 index 00000000..a9425b7b --- /dev/null +++ b/setup/config/config_mediaapi.go @@ -0,0 +1,67 @@ +package config + +import ( + "fmt" +) + +type MediaAPI struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + ExternalAPI ExternalAPIOptions `yaml:"external_api"` + + // The MediaAPI database stores information about files uploaded and downloaded + // by local users. It is only accessed by the MediaAPI. + Database DatabaseOptions `yaml:"database"` + + // The base path to where the media files will be stored. May be relative or absolute. + BasePath Path `yaml:"base_path"` + + // The absolute base path to where media files will be stored. + AbsBasePath Path `yaml:"-"` + + // The maximum file size in bytes that is allowed to be stored on this server. + // Note: if max_file_size_bytes is set to 0, the size is unlimited. + // Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB) + MaxFileSizeBytes *FileSizeBytes `yaml:"max_file_size_bytes,omitempty"` + + // Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated + DynamicThumbnails bool `yaml:"dynamic_thumbnails"` + + // The maximum number of simultaneous thumbnail generators. default: 10 + MaxThumbnailGenerators int `yaml:"max_thumbnail_generators"` + + // A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content + ThumbnailSizes []ThumbnailSize `yaml:"thumbnail_sizes"` +} + +func (c *MediaAPI) Defaults() { + c.InternalAPI.Listen = "http://localhost:7774" + c.InternalAPI.Connect = "http://localhost:7774" + c.ExternalAPI.Listen = "http://[::]:8074" + c.Database.Defaults() + c.Database.ConnectionString = "file:mediaapi.db" + + defaultMaxFileSizeBytes := FileSizeBytes(10485760) + c.MaxFileSizeBytes = &defaultMaxFileSizeBytes + c.MaxThumbnailGenerators = 10 + c.BasePath = "./media_store" +} + +func (c *MediaAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "media_api.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "media_api.internal_api.connect", string(c.InternalAPI.Connect)) + if !isMonolith { + checkURL(configErrs, "media_api.external_api.listen", string(c.ExternalAPI.Listen)) + } + checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString)) + + checkNotEmpty(configErrs, "media_api.base_path", string(c.BasePath)) + checkPositive(configErrs, "media_api.max_file_size_bytes", int64(*c.MaxFileSizeBytes)) + checkPositive(configErrs, "media_api.max_thumbnail_generators", int64(c.MaxThumbnailGenerators)) + + for i, size := range c.ThumbnailSizes { + checkPositive(configErrs, fmt.Sprintf("media_api.thumbnail_sizes[%d].width", i), int64(size.Width)) + checkPositive(configErrs, fmt.Sprintf("media_api.thumbnail_sizes[%d].height", i), int64(size.Height)) + } +} diff --git a/setup/config/config_mscs.go b/setup/config/config_mscs.go new file mode 100644 index 00000000..776d0b64 --- /dev/null +++ b/setup/config/config_mscs.go @@ -0,0 +1,19 @@ +package config + +type MSCs struct { + Matrix *Global `yaml:"-"` + + // The MSCs to enable, currently only `msc2836` is supported. + MSCs []string `yaml:"mscs"` + + Database DatabaseOptions `yaml:"database"` +} + +func (c *MSCs) Defaults() { + c.Database.Defaults() + c.Database.ConnectionString = "file:mscs.db" +} + +func (c *MSCs) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkNotEmpty(configErrs, "mscs.database.connection_string", string(c.Database.ConnectionString)) +} diff --git a/setup/config/config_roomserver.go b/setup/config/config_roomserver.go new file mode 100644 index 00000000..2a1fc38b --- /dev/null +++ b/setup/config/config_roomserver.go @@ -0,0 +1,22 @@ +package config + +type RoomServer struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + Database DatabaseOptions `yaml:"database"` +} + +func (c *RoomServer) Defaults() { + c.InternalAPI.Listen = "http://localhost:7770" + c.InternalAPI.Connect = "http://localhost:7770" + c.Database.Defaults() + c.Database.ConnectionString = "file:roomserver.db" +} + +func (c *RoomServer) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "room_server.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "room_server.internal_ap.bind", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString)) +} diff --git a/setup/config/config_signingkeyserver.go b/setup/config/config_signingkeyserver.go new file mode 100644 index 00000000..51aca38b --- /dev/null +++ b/setup/config/config_signingkeyserver.go @@ -0,0 +1,52 @@ +package config + +import "github.com/matrix-org/gomatrixserverlib" + +type SigningKeyServer struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + // The SigningKeyServer database caches the public keys of remote servers. + // It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI. + Database DatabaseOptions `yaml:"database"` + + // Perspective keyservers, to use as a backup when direct key fetch + // requests don't succeed + KeyPerspectives KeyPerspectives `yaml:"key_perspectives"` + + // Should we prefer direct key fetches over perspective ones? + PreferDirectFetch bool `yaml:"prefer_direct_fetch"` +} + +func (c *SigningKeyServer) Defaults() { + c.InternalAPI.Listen = "http://localhost:7780" + c.InternalAPI.Connect = "http://localhost:7780" + c.Database.Defaults() + c.Database.ConnectionString = "file:signingkeyserver.db" +} + +func (c *SigningKeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "signing_key_server.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "signing_key_server.internal_api.bind", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "signing_key_server.database.connection_string", string(c.Database.ConnectionString)) +} + +// KeyPerspectives are used to configure perspective key servers for +// retrieving server keys. +type KeyPerspectives []KeyPerspective + +type KeyPerspective struct { + // The server name of the perspective key server + ServerName gomatrixserverlib.ServerName `yaml:"server_name"` + // Server keys for the perspective user, used to verify the + // keys have been signed by the perspective server + Keys []KeyPerspectiveTrustKey `yaml:"keys"` +} + +type KeyPerspectiveTrustKey struct { + // The key ID, e.g. ed25519:auto + KeyID gomatrixserverlib.KeyID `yaml:"key_id"` + // The public key in base64 unpadded format + PublicKey string `yaml:"public_key"` +} diff --git a/setup/config/config_syncapi.go b/setup/config/config_syncapi.go new file mode 100644 index 00000000..fc08f738 --- /dev/null +++ b/setup/config/config_syncapi.go @@ -0,0 +1,29 @@ +package config + +type SyncAPI struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + ExternalAPI ExternalAPIOptions `yaml:"external_api"` + + Database DatabaseOptions `yaml:"database"` + + RealIPHeader string `yaml:"real_ip_header"` +} + +func (c *SyncAPI) Defaults() { + c.InternalAPI.Listen = "http://localhost:7773" + c.InternalAPI.Connect = "http://localhost:7773" + c.ExternalAPI.Listen = "http://localhost:8073" + c.Database.Defaults() + c.Database.ConnectionString = "file:syncapi.db" +} + +func (c *SyncAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "sync_api.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "sync_api.internal_api.bind", string(c.InternalAPI.Connect)) + if !isMonolith { + checkURL(configErrs, "sync_api.external_api.listen", string(c.ExternalAPI.Listen)) + } + checkNotEmpty(configErrs, "sync_api.database", string(c.Database.ConnectionString)) +} diff --git a/setup/config/config_test.go b/setup/config/config_test.go new file mode 100644 index 00000000..4107b684 --- /dev/null +++ b/setup/config/config_test.go @@ -0,0 +1,285 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "testing" +) + +func TestLoadConfigRelative(t *testing.T) { + _, err := loadConfig("/my/config/dir", []byte(testConfig), + mockReadFile{ + "/my/config/dir/matrix_key.pem": testKey, + "/my/config/dir/tls_cert.pem": testCert, + }.readFile, + false, + ) + if err != nil { + t.Error("failed to load config:", err) + } +} + +const testConfig = ` +version: 1 +global: + server_name: localhost + private_key: matrix_key.pem + key_id: ed25519:auto + key_validity_period: 168h0m0s + trusted_third_party_id_servers: + - matrix.org + - vector.im + kafka: + addresses: + - localhost:2181 + topic_prefix: Dendrite + use_naffka: true + naffka_database: + connection_string: file:naffka.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 + metrics: + enabled: false + basic_auth: + username: metrics + password: metrics +app_service_api: + internal_api: + listen: http://localhost:7777 + connect: http://localhost:7777 + database: + connection_string: file:appservice.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 + config_files: [] +client_api: + internal_api: + listen: http://localhost:7771 + connect: http://localhost:7771 + external_api: + listen: http://[::]:8071 + registration_disabled: false + registration_shared_secret: "" + enable_registration_captcha: false + recaptcha_public_key: "" + recaptcha_private_key: "" + recaptcha_bypass_secret: "" + recaptcha_siteverify_api: "" + turn: + turn_user_lifetime: "" + turn_uris: [] + turn_shared_secret: "" + turn_username: "" + turn_password: "" +current_state_server: + internal_api: + listen: http://localhost:7782 + connect: http://localhost:7782 + database: + connection_string: file:currentstate.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 +edu_server: + internal_api: + listen: http://localhost:7778 + connect: http://localhost:7778 +federation_api: + internal_api: + listen: http://localhost:7772 + connect: http://localhost:7772 + external_api: + listen: http://[::]:8072 + federation_certificates: [] +federation_sender: + internal_api: + listen: http://localhost:7775 + connect: http://localhost:7775 + database: + connection_string: file:federationsender.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 + send_max_retries: 16 + disable_tls_validation: false + proxy_outbound: + enabled: false + protocol: http + host: localhost + port: 8080 +key_server: + internal_api: + listen: http://localhost:7779 + connect: http://localhost:7779 + database: + connection_string: file:keyserver.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 +media_api: + internal_api: + listen: http://localhost:7774 + connect: http://localhost:7774 + external_api: + listen: http://[::]:8074 + database: + connection_string: file:mediaapi.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 + base_path: ./media_store + max_file_size_bytes: 10485760 + dynamic_thumbnails: false + max_thumbnail_generators: 10 + thumbnail_sizes: + - width: 32 + height: 32 + method: crop + - width: 96 + height: 96 + method: crop + - width: 640 + height: 480 + method: scale +room_server: + internal_api: + listen: http://localhost:7770 + connect: http://localhost:7770 + database: + connection_string: file:roomserver.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 +server_key_api: + internal_api: + listen: http://localhost:7780 + connect: http://localhost:7780 + database: + connection_string: file:serverkeyapi.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 + key_perspectives: + - server_name: matrix.org + keys: + - key_id: ed25519:auto + public_key: Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw + - key_id: ed25519:a_RXGa + public_key: l8Hft5qXKn1vfHrg3p4+W8gELQVo8N13JkluMfmn2sQ +sync_api: + internal_api: + listen: http://localhost:7773 + connect: http://localhost:7773 + database: + connection_string: file:syncapi.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 +user_api: + internal_api: + listen: http://localhost:7781 + connect: http://localhost:7781 + account_database: + connection_string: file:userapi_accounts.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 + device_database: + connection_string: file:userapi_devices.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 +tracing: + enabled: false + jaeger: + serviceName: "" + disabled: false + rpc_metrics: false + tags: [] + sampler: null + reporter: null + headers: null + baggage_restrictions: null + throttler: null +logging: +- type: file + level: info + params: + path: /var/log/dendrite +` + +type mockReadFile map[string]string + +func (m mockReadFile) readFile(path string) ([]byte, error) { + data, ok := m[path] + if !ok { + return nil, fmt.Errorf("no such file %q", path) + } + return []byte(data), nil +} + +func TestReadKey(t *testing.T) { + keyID, _, err := readKeyPEM("path/to/key", []byte(testKey), true) + if err != nil { + t.Error("failed to load private key:", err) + } + wantKeyID := testKeyID + if wantKeyID != string(keyID) { + t.Errorf("wanted key ID to be %q, got %q", wantKeyID, keyID) + } +} + +const testKeyID = "ed25519:c8NsuQ" + +const testKey = ` +-----BEGIN MATRIX PRIVATE KEY----- +Key-ID: ` + testKeyID + ` +7KRZiZ2sTyRR8uqqUjRwczuwRXXkUMYIUHq4Mc3t4bE= +-----END MATRIX PRIVATE KEY----- +` + +const testCert = ` +-----BEGIN CERTIFICATE----- +MIIE0zCCArugAwIBAgIJAPype3u24LJeMA0GCSqGSIb3DQEBCwUAMAAwHhcNMTcw +NjEzMTQyODU4WhcNMTgwNjEzMTQyODU4WjAAMIICIjANBgkqhkiG9w0BAQEFAAOC +Ag8AMIICCgKCAgEA3vNSr7lCh/alxPFqairp/PYohwdsqPvOD7zf7dJCNhy0gbdC +9/APwIbPAPL9nU+o9ud1ACNCKBCQin/9LnI5vd5pa/Ne+mmRADDLB/BBBoywSJWG +NSfKJ9n3XY1bjgtqi53uUh+RDdQ7sXudDqCUxiiJZmS7oqK/mp88XXAgCbuXUY29 +GmzbbDz37vntuSxDgUOnJ8uPSvRp5YPKogA3JwW1SyrlLt4Z30CQ6nH3Y2Q5SVfJ +NIQyMrnwyjA9bCdXezv1cLXoTYn7U9BRyzXTZeXs3y3ldnRfISXN35CU04Az1F8j +lfj7nXMEqI/qAj/qhxZ8nVBB+rpNOZy9RJko3O+G5Qa/EvzkQYV1rW4TM2Yme88A +QyJspoV/0bXk6gG987PonK2Uk5djxSULhnGVIqswydyH0Nzb+slRp2bSoWbaNlee ++6TIeiyTQYc055pCHOp22gtLrC5LQGchksi02St2ZzRHdnlfqCJ8S9sS7x3trzds +cYueg1sGI+O8szpQ3eUM7OhJOBrx6OlR7+QYnQg1wr/V+JAz1qcyTC1URcwfeqtg +QjxFdBD9LfCtfK+AO51H9ugtsPJqOh33PmvfvUBEM05OHCA0lNaWJHROGpm4T4cc +YQI9JQk/0lB7itF1qK5RG74qgKdjkBkfZxi0OqkUgHk6YHtJlKfET8zfrtcCAwEA +AaNQME4wHQYDVR0OBBYEFGwb0NgH0Zr7Ga23njEJ85Ozf8M9MB8GA1UdIwQYMBaA +FGwb0NgH0Zr7Ga23njEJ85Ozf8M9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggIBAKU3RHXggbq/pLhGinU5q/9QT0TB/0bBnF1wNFkKQC0FrNJ+ZnBNmusy +oqOn7DEohBCCDxT0kgOC05gLEsGLkSXlVyqCsPFfycCFhtu1QzSRtQNRxB3pW3Wq +4/RFVYv0PGBjVBKxImQlEmXJWEDwemGKqDQZPtqR/FTHTbJcaT0xQr5+1oG6lawt +I/2cW6GQ0kYW/Szps8FgNdSNgVqCjjNIzBYbWhRWMx/63qD1ReUbY7/Yw9KKT8nK +zXERpbTM9k+Pnm0g9Gep+9HJ1dBFJeuTPugKeSeyqg2OJbENw1hxGs/HjBXw7580 +ioiMn/kMj6Tg/f3HCfKrdHHBFQw0/fJW6o17QImYIpPOPzc5RjXBrCJWb34kxqEd +NQdKgejWiV/LlVsguIF8hVZH2kRzvoyypkVUtSUYGmjvA5UXoORQZfJ+b41llq1B +GcSF6iaVbAFKnsUyyr1i9uHz/6Muqflphv/SfZxGheIn5u3PnhXrzDagvItjw0NS +n0Xq64k7fc42HXJpF8CGBkSaIhtlzcruO+vqR80B9r62+D0V7VmHOnP135MT6noU +8F0JQfEtP+I8NII5jHSF/khzSgP5g80LS9tEc2ILnIHK1StkInAoRQQ+/HsQsgbz +ANAf5kxmMsM0zlN2hkxl0H6o7wKlBSw3RI3cjfilXiMWRPJrzlc4 +-----END CERTIFICATE----- +` diff --git a/setup/config/config_userapi.go b/setup/config/config_userapi.go new file mode 100644 index 00000000..2cbd8a45 --- /dev/null +++ b/setup/config/config_userapi.go @@ -0,0 +1,30 @@ +package config + +type UserAPI struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + // The Account database stores the login details and account information + // for local users. It is accessed by the UserAPI. + AccountDatabase DatabaseOptions `yaml:"account_database"` + // The Device database stores session information for the devices of logged + // in local users. It is accessed by the UserAPI. + DeviceDatabase DatabaseOptions `yaml:"device_database"` +} + +func (c *UserAPI) Defaults() { + c.InternalAPI.Listen = "http://localhost:7781" + c.InternalAPI.Connect = "http://localhost:7781" + c.AccountDatabase.Defaults() + c.DeviceDatabase.Defaults() + c.AccountDatabase.ConnectionString = "file:userapi_accounts.db" + c.DeviceDatabase.ConnectionString = "file:userapi_devices.db" +} + +func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "user_api.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "user_api.internal_api.connect", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString)) + checkNotEmpty(configErrs, "user_api.device_database.connection_string", string(c.DeviceDatabase.ConnectionString)) +} diff --git a/setup/federation.go b/setup/federation.go new file mode 100644 index 00000000..7e9a22b3 --- /dev/null +++ b/setup/federation.go @@ -0,0 +1,32 @@ +package setup + +import ( + "context" + "fmt" + "net" + "net/http" +) + +// noOpHTTPTransport is used to disable federation. +var noOpHTTPTransport = &http.Transport{ + Dial: func(_, _ string) (net.Conn, error) { + return nil, fmt.Errorf("federation prohibited by configuration") + }, + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return nil, fmt.Errorf("federation prohibited by configuration") + }, + DialTLS: func(_, _ string) (net.Conn, error) { + return nil, fmt.Errorf("federation prohibited by configuration") + }, +} + +func init() { + noOpHTTPTransport.RegisterProtocol("matrix", &noOpHTTPRoundTripper{}) +} + +type noOpHTTPRoundTripper struct { +} + +func (y *noOpHTTPRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, fmt.Errorf("federation prohibited by configuration") +} diff --git a/setup/flags.go b/setup/flags.go new file mode 100644 index 00000000..281cf339 --- /dev/null +++ b/setup/flags.go @@ -0,0 +1,52 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package setup + +import ( + "flag" + "fmt" + "os" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/setup/config" + "github.com/sirupsen/logrus" +) + +var ( + configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") + version = flag.Bool("version", false, "Shows the current version and exits immediately.") +) + +// ParseFlags parses the commandline flags and uses them to create a config. +func ParseFlags(monolith bool) *config.Dendrite { + flag.Parse() + + if *version { + fmt.Println(internal.VersionString()) + os.Exit(0) + } + + if *configPath == "" { + logrus.Fatal("--config must be supplied") + } + + cfg, err := config.Load(*configPath, monolith) + + if err != nil { + logrus.Fatalf("Invalid config file: %s", err) + } + + return cfg +} diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go new file mode 100644 index 00000000..a2902c96 --- /dev/null +++ b/setup/kafka/kafka.go @@ -0,0 +1,58 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + sCfg := sarama.NewConfig() + sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes + sCfg.Producer.Return.Successes = true + sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes) + + consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} diff --git a/setup/monolith.go b/setup/monolith.go new file mode 100644 index 00000000..2403f57f --- /dev/null +++ b/setup/monolith.go @@ -0,0 +1,76 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package setup + +import ( + "github.com/gorilla/mux" + appserviceAPI "github.com/matrix-org/dendrite/appservice/api" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/api" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/federationapi" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/transactions" + keyAPI "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/mediaapi" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + serverKeyAPI "github.com/matrix-org/dendrite/signingkeyserver/api" + "github.com/matrix-org/dendrite/syncapi" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage/accounts" + "github.com/matrix-org/gomatrixserverlib" +) + +// Monolith represents an instantiation of all dependencies required to build +// all components of Dendrite, for use in monolith mode. +type Monolith struct { + Config *config.Dendrite + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + Client *gomatrixserverlib.Client + FedClient *gomatrixserverlib.FederationClient + + AppserviceAPI appserviceAPI.AppServiceQueryAPI + EDUInternalAPI eduServerAPI.EDUServerInputAPI + FederationSenderAPI federationSenderAPI.FederationSenderInternalAPI + RoomserverAPI roomserverAPI.RoomserverInternalAPI + ServerKeyAPI serverKeyAPI.SigningKeyServerAPI + UserAPI userapi.UserInternalAPI + KeyAPI keyAPI.KeyInternalAPI + + // Optional + ExtPublicRoomsProvider api.ExtraPublicRoomsProvider +} + +// AddAllPublicRoutes attaches all public paths to the given router +func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { + clientapi.AddPublicRoutes( + csMux, &m.Config.ClientAPI, m.AccountDB, + m.FedClient, m.RoomserverAPI, + m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), + m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, + ) + federationapi.AddPublicRoutes( + ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, + m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, + m.EDUInternalAPI, m.KeyAPI, + ) + mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) + syncapi.AddPublicRoutes( + csMux, m.UserAPI, m.RoomserverAPI, + m.KeyAPI, m.FedClient, &m.Config.SyncAPI, + ) +} diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go new file mode 100644 index 00000000..33a65c8f --- /dev/null +++ b/setup/mscs/msc2836/msc2836.go @@ -0,0 +1,530 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package msc2836 'Threading' implements https://github.com/matrix-org/matrix-doc/pull/2836 +package msc2836 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/matrix-org/dendrite/clientapi/jsonerror" + fs "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/hooks" + "github.com/matrix-org/dendrite/internal/httputil" + roomserver "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const ( + constRelType = "m.reference" + constRoomIDKey = "relationship_room_id" + constRoomServers = "relationship_servers" +) + +type EventRelationshipRequest struct { + EventID string `json:"event_id"` + MaxDepth int `json:"max_depth"` + MaxBreadth int `json:"max_breadth"` + Limit int `json:"limit"` + DepthFirst bool `json:"depth_first"` + RecentFirst bool `json:"recent_first"` + IncludeParent bool `json:"include_parent"` + IncludeChildren bool `json:"include_children"` + Direction string `json:"direction"` + Batch string `json:"batch"` + AutoJoin bool `json:"auto_join"` +} + +func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) { + var relation EventRelationshipRequest + relation.Defaults() + if err := json.NewDecoder(body).Decode(&relation); err != nil { + return nil, err + } + return &relation, nil +} + +func (r *EventRelationshipRequest) Defaults() { + r.Limit = 100 + r.MaxBreadth = 10 + r.MaxDepth = 3 + r.DepthFirst = false + r.RecentFirst = true + r.IncludeParent = false + r.IncludeChildren = false + r.Direction = "down" +} + +type EventRelationshipResponse struct { + Events []gomatrixserverlib.ClientEvent `json:"events"` + NextBatch string `json:"next_batch"` + Limited bool `json:"limited"` +} + +// Enable this MSC +// nolint:gocyclo +func Enable( + base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, + userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, +) error { + db, err := NewDatabase(&base.Cfg.MSCs.Database) + if err != nil { + return fmt.Errorf("Cannot enable MSC2836: %w", err) + } + hooks.Enable() + hooks.Attach(hooks.KindNewEventPersisted, func(headeredEvent interface{}) { + he := headeredEvent.(*gomatrixserverlib.HeaderedEvent) + hookErr := db.StoreRelation(context.Background(), he) + if hookErr != nil { + util.GetLogger(context.Background()).WithError(hookErr).Error( + "failed to StoreRelation", + ) + } + }) + hooks.Attach(hooks.KindNewEventReceived, func(headeredEvent interface{}) { + he := headeredEvent.(*gomatrixserverlib.HeaderedEvent) + ctx := context.Background() + // we only inject metadata for events our server sends + userID := he.Sender() + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return + } + if domain != base.Cfg.Global.ServerName { + return + } + // if this event has an m.relationship, add on the room_id and servers to unsigned + parent, child, relType := parentChildEventIDs(he) + if parent == "" || child == "" || relType == "" { + return + } + event, joinedToRoom := getEventIfVisible(ctx, rsAPI, parent, userID) + if !joinedToRoom { + return + } + err = he.SetUnsignedField(constRoomIDKey, event.RoomID()) + if err != nil { + util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField") + return + } + + var servers []gomatrixserverlib.ServerName + if fsAPI != nil { + var res fs.QueryJoinedHostServerNamesInRoomResponse + err = fsAPI.QueryJoinedHostServerNamesInRoom(ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ + RoomID: event.RoomID(), + }, &res) + if err != nil { + util.GetLogger(context.Background()).WithError(err).Warn("Failed to QueryJoinedHostServerNamesInRoom") + return + } + servers = res.ServerNames + } else { + servers = []gomatrixserverlib.ServerName{ + base.Cfg.Global.ServerName, + } + } + err = he.SetUnsignedField(constRoomServers, servers) + if err != nil { + util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField") + return + } + }) + + base.PublicClientAPIMux.Handle("/unstable/event_relationships", + httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI)), + ).Methods(http.MethodPost, http.MethodOptions) + + base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI( + "msc2836_event_relationships", func(req *http.Request) util.JSONResponse { + fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( + req, time.Now(), base.Cfg.Global.ServerName, keyRing, + ) + if fedReq == nil { + return errResp + } + return federatedEventRelationship(req.Context(), fedReq, db, rsAPI) + }, + )).Methods(http.MethodPost, http.MethodOptions) + return nil +} + +type reqCtx struct { + ctx context.Context + rsAPI roomserver.RoomserverInternalAPI + db Database + req *EventRelationshipRequest + userID string + isFederatedRequest bool +} + +func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse { + return func(req *http.Request, device *userapi.Device) util.JSONResponse { + relation, err := NewEventRelationshipRequest(req.Body) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("failed to decode HTTP request as JSON") + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)), + } + } + rc := reqCtx{ + ctx: req.Context(), + req: relation, + userID: device.UserID, + rsAPI: rsAPI, + isFederatedRequest: false, + db: db, + } + res, resErr := rc.process() + if resErr != nil { + return *resErr + } + + return util.JSONResponse{ + Code: 200, + JSON: res, + } + } +} + +func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI) util.JSONResponse { + relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content())) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON") + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)), + } + } + rc := reqCtx{ + ctx: ctx, + req: relation, + userID: "", + rsAPI: rsAPI, + isFederatedRequest: true, + db: db, + } + res, resErr := rc.process() + if resErr != nil { + return *resErr + } + + return util.JSONResponse{ + Code: 200, + JSON: res, + } +} + +func (rc *reqCtx) process() (*EventRelationshipResponse, *util.JSONResponse) { + var res EventRelationshipResponse + var returnEvents []*gomatrixserverlib.HeaderedEvent + // Can the user see (according to history visibility) event_id? If no, reject the request, else continue. + // We should have the event being referenced so don't give any claimed room ID / servers + event := rc.getEventIfVisible(rc.req.EventID, "", nil) + if event == nil { + return nil, &util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"), + } + } + + // Retrieve the event. Add it to response array. + returnEvents = append(returnEvents, event) + + if rc.req.IncludeParent { + if parentEvent := rc.includeParent(event); parentEvent != nil { + returnEvents = append(returnEvents, parentEvent) + } + } + + if rc.req.IncludeChildren { + remaining := rc.req.Limit - len(returnEvents) + if remaining > 0 { + children, resErr := rc.includeChildren(rc.db, event.EventID(), remaining, rc.req.RecentFirst) + if resErr != nil { + return nil, resErr + } + returnEvents = append(returnEvents, children...) + } + } + + remaining := rc.req.Limit - len(returnEvents) + var walkLimited bool + if remaining > 0 { + included := make(map[string]bool, len(returnEvents)) + for _, ev := range returnEvents { + included[ev.EventID()] = true + } + var events []*gomatrixserverlib.HeaderedEvent + events, walkLimited = walkThread( + rc.ctx, rc.db, rc, included, remaining, + ) + returnEvents = append(returnEvents, events...) + } + res.Events = make([]gomatrixserverlib.ClientEvent, len(returnEvents)) + for i, ev := range returnEvents { + res.Events[i] = gomatrixserverlib.HeaderedToClientEvent(ev, gomatrixserverlib.FormatAll) + } + res.Limited = remaining == 0 || walkLimited + return &res, nil +} + +// If include_parent: true and there is a valid m.relationship field in the event, +// retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array. +func (rc *reqCtx) includeParent(event *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) { + parentID, _, _ := parentChildEventIDs(event) + if parentID == "" { + return nil + } + claimedRoomID, claimedServers := roomIDAndServers(event) + return rc.getEventIfVisible(parentID, claimedRoomID, claimedServers) +} + +// If include_children: true, lookup all events which have event_id as an m.relationship +// Apply history visibility checks to all these events and add the ones which pass into the response array, +// honouring the recent_first flag and the limit. +func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) { + children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent") + resErr := jsonerror.InternalServerError() + return nil, &resErr + } + var childEvents []*gomatrixserverlib.HeaderedEvent + for _, child := range children { + // in order for us to even know about the children the server must be joined to those rooms, hence pass no claimed room ID or servers. + childEvent := rc.getEventIfVisible(child.EventID, "", nil) + if childEvent != nil { + childEvents = append(childEvents, childEvent) + } + } + if len(childEvents) > limit { + return childEvents[:limit], nil + } + return childEvents, nil +} + +// Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag, +// honouring the limit, max_depth and max_breadth values according to the following rules +// nolint: unparam +func walkThread( + ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int, +) ([]*gomatrixserverlib.HeaderedEvent, bool) { + if rc.req.Direction != "down" { + util.GetLogger(ctx).Error("not implemented: direction=up") + return nil, false + } + var result []*gomatrixserverlib.HeaderedEvent + eventWalker := walker{ + ctx: ctx, + req: rc.req, + db: db, + fn: func(wi *walkInfo) bool { + // If already processed event, skip. + if included[wi.EventID] { + return false + } + + // If the response array is >= limit, stop. + if len(result) >= limit { + return true + } + + // Process the event. + // TODO: Include edge information: room ID and servers + event := rc.getEventIfVisible(wi.EventID, "", nil) + if event != nil { + result = append(result, event) + } + included[wi.EventID] = true + return false + }, + } + limited, err := eventWalker.WalkFrom(rc.req.EventID) + if err != nil { + util.GetLogger(ctx).WithError(err).Errorf("Failed to WalkFrom %s", rc.req.EventID) + } + return result, limited +} + +func (rc *reqCtx) getEventIfVisible(eventID string, claimedRoomID string, claimedServers []string) *gomatrixserverlib.HeaderedEvent { + event, joinedToRoom := getEventIfVisible(rc.ctx, rc.rsAPI, eventID, rc.userID) + if event != nil && joinedToRoom { + return event + } + // either we don't have the event or we aren't joined to the room, regardless we should try joining if auto join is enabled + if !rc.req.AutoJoin { + return nil + } + // if we're doing this on behalf of a random server don't auto-join rooms regardless of what the request says + if rc.isFederatedRequest { + return nil + } + roomID := claimedRoomID + var servers []gomatrixserverlib.ServerName + if event != nil { + roomID = event.RoomID() + } + for _, s := range claimedServers { + servers = append(servers, gomatrixserverlib.ServerName(s)) + } + var joinRes roomserver.PerformJoinResponse + rc.rsAPI.PerformJoin(rc.ctx, &roomserver.PerformJoinRequest{ + UserID: rc.userID, + Content: map[string]interface{}{}, + RoomIDOrAlias: roomID, + ServerNames: servers, + }, &joinRes) + if joinRes.Error != nil { + util.GetLogger(rc.ctx).WithError(joinRes.Error).WithField("room_id", roomID).Error("Failed to auto-join room") + return nil + } + if event != nil { + return event + } + // TODO: hit /event_relationships on the server we joined via + util.GetLogger(rc.ctx).Infof("joined room but need to fetch event TODO") + return nil +} + +func getEventIfVisible(ctx context.Context, rsAPI roomserver.RoomserverInternalAPI, eventID, userID string) (*gomatrixserverlib.HeaderedEvent, bool) { + var queryEventsRes roomserver.QueryEventsByIDResponse + err := rsAPI.QueryEventsByID(ctx, &roomserver.QueryEventsByIDRequest{ + EventIDs: []string{eventID}, + }, &queryEventsRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryEventsByID") + return nil, false + } + if len(queryEventsRes.Events) == 0 { + util.GetLogger(ctx).Infof("event does not exist") + return nil, false // event does not exist + } + event := queryEventsRes.Events[0] + + // Allow events if the member is in the room + // TODO: This does not honour history_visibility + // TODO: This does not honour m.room.create content + var queryMembershipRes roomserver.QueryMembershipForUserResponse + err = rsAPI.QueryMembershipForUser(ctx, &roomserver.QueryMembershipForUserRequest{ + RoomID: event.RoomID(), + UserID: userID, + }, &queryMembershipRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryMembershipForUser") + return nil, false + } + return event, queryMembershipRes.IsInRoom +} + +type walkInfo struct { + eventInfo + SiblingNumber int + Depth int +} + +type walker struct { + ctx context.Context + req *EventRelationshipRequest + db Database + fn func(wi *walkInfo) bool // callback invoked for each event walked, return true to terminate the walk +} + +// WalkFrom the event ID given +func (w *walker) WalkFrom(eventID string) (limited bool, err error) { + children, err := w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst) + if err != nil { + util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk") + return false, err + } + var next *walkInfo + toWalk := w.addChildren(nil, children, 1) + next, toWalk = w.nextChild(toWalk) + for next != nil { + stop := w.fn(next) + if stop { + return true, nil + } + // find the children's children + children, err = w.db.ChildrenForParent(w.ctx, next.EventID, constRelType, w.req.RecentFirst) + if err != nil { + util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk") + return false, err + } + toWalk = w.addChildren(toWalk, children, next.Depth+1) + next, toWalk = w.nextChild(toWalk) + } + + return false, nil +} + +// addChildren adds an event's children to the to walk data structure +func (w *walker) addChildren(toWalk []walkInfo, children []eventInfo, depthOfChildren int) []walkInfo { + // Check what number child this event is (ordered by recent_first) compared to its parent, does it exceed (greater than) max_breadth? If yes, skip. + if len(children) > w.req.MaxBreadth { + children = children[:w.req.MaxBreadth] + } + // Check how deep the event is compared to event_id, does it exceed (greater than) max_depth? If yes, skip. + if depthOfChildren > w.req.MaxDepth { + return toWalk + } + + if w.req.DepthFirst { + // the slice is a stack so push them in reverse order so we pop them in the correct order + // e.g [3,2,1] => [3,2] , 1 => [3] , 2 => [] , 3 + for i := len(children) - 1; i >= 0; i-- { + toWalk = append(toWalk, walkInfo{ + eventInfo: children[i], + SiblingNumber: i + 1, // index from 1 + Depth: depthOfChildren, + }) + } + } else { + // the slice is a queue so push them in normal order to we dequeue them in the correct order + // e.g [1,2,3] => 1, [2, 3] => 2 , [3] => 3, [] + for i := range children { + toWalk = append(toWalk, walkInfo{ + eventInfo: children[i], + SiblingNumber: i + 1, // index from 1 + Depth: depthOfChildren, + }) + } + } + return toWalk +} + +func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) { + if len(toWalk) == 0 { + return nil, nil + } + var child walkInfo + if w.req.DepthFirst { + // toWalk is a stack so pop the child off + child, toWalk = toWalk[len(toWalk)-1], toWalk[:len(toWalk)-1] + return &child, toWalk + } + // toWalk is a queue so shift the child off + child, toWalk = toWalk[0], toWalk[1:] + return &child, toWalk +} diff --git a/setup/mscs/msc2836/msc2836_test.go b/setup/mscs/msc2836/msc2836_test.go new file mode 100644 index 00000000..996cc79f --- /dev/null +++ b/setup/mscs/msc2836/msc2836_test.go @@ -0,0 +1,577 @@ +package msc2836_test + +import ( + "bytes" + "context" + "crypto/ed25519" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/internal/hooks" + "github.com/matrix-org/dendrite/internal/httputil" + roomserver "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/mscs/msc2836" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +var ( + client = &http.Client{ + Timeout: 10 * time.Second, + } +) + +// Basic sanity check of MSC2836 logic. Injects a thread that looks like: +// A +// | +// B +// / \ +// C D +// /|\ +// E F G +// | +// H +// And makes sure POST /event_relationships works with various parameters +func TestMSC2836(t *testing.T) { + alice := "@alice:localhost" + bob := "@bob:localhost" + charlie := "@charlie:localhost" + roomIDA := "!alice:localhost" + roomIDB := "!bob:localhost" + roomIDC := "!charlie:localhost" + // give access tokens to all three users + nopUserAPI := &testUserAPI{ + accessTokens: make(map[string]userapi.Device), + } + nopUserAPI.accessTokens["alice"] = userapi.Device{ + AccessToken: "alice", + DisplayName: "Alice", + UserID: alice, + } + nopUserAPI.accessTokens["bob"] = userapi.Device{ + AccessToken: "bob", + DisplayName: "Bob", + UserID: bob, + } + nopUserAPI.accessTokens["charlie"] = userapi.Device{ + AccessToken: "charlie", + DisplayName: "Charles", + UserID: charlie, + } + eventA := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDA, + Sender: alice, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[A] Do you know shelties?", + }, + }) + eventB := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDB, + Sender: bob, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[B] I <3 shelties", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventA.EventID(), + }, + }, + }) + eventC := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDB, + Sender: bob, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[C] like so much", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventB.EventID(), + }, + }, + }) + eventD := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDA, + Sender: alice, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[D] but what are shelties???", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventB.EventID(), + }, + }, + }) + eventE := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDB, + Sender: bob, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[E] seriously???", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventD.EventID(), + }, + }, + }) + eventF := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDC, + Sender: charlie, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[F] omg how do you not know what shelties are", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventD.EventID(), + }, + }, + }) + eventG := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDA, + Sender: alice, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[G] looked it up, it's a sheltered person?", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventD.EventID(), + }, + }, + }) + eventH := mustCreateEvent(t, fledglingEvent{ + RoomID: roomIDB, + Sender: bob, + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "[H] it's a dog!!!!!", + "m.relationship": map[string]string{ + "rel_type": "m.reference", + "event_id": eventE.EventID(), + }, + }, + }) + // make everyone joined to each other's rooms + nopRsAPI := &testRoomserverAPI{ + userToJoinedRooms: map[string][]string{ + alice: []string{roomIDA, roomIDB, roomIDC}, + bob: []string{roomIDA, roomIDB, roomIDC}, + charlie: []string{roomIDA, roomIDB, roomIDC}, + }, + events: map[string]*gomatrixserverlib.HeaderedEvent{ + eventA.EventID(): eventA, + eventB.EventID(): eventB, + eventC.EventID(): eventC, + eventD.EventID(): eventD, + eventE.EventID(): eventE, + eventF.EventID(): eventF, + eventG.EventID(): eventG, + eventH.EventID(): eventH, + }, + } + router := injectEvents(t, nopUserAPI, nopRsAPI, []*gomatrixserverlib.HeaderedEvent{ + eventA, eventB, eventC, eventD, eventE, eventF, eventG, eventH, + }) + cancel := runServer(t, router) + defer cancel() + + t.Run("returns 403 on invalid event IDs", func(t *testing.T) { + _ = postRelationships(t, 403, "alice", newReq(t, map[string]interface{}{ + "event_id": "$invalid", + })) + }) + t.Run("returns 403 if not joined to the room of specified event in request", func(t *testing.T) { + nopUserAPI.accessTokens["frank"] = userapi.Device{ + AccessToken: "frank", + DisplayName: "Frank Not In Room", + UserID: "@frank:localhost", + } + _ = postRelationships(t, 403, "frank", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "limit": 1, + "include_parent": true, + })) + }) + t.Run("omits parent if not joined to the room of parent of event", func(t *testing.T) { + nopUserAPI.accessTokens["frank2"] = userapi.Device{ + AccessToken: "frank2", + DisplayName: "Frank2 Not In Room", + UserID: "@frank2:localhost", + } + // Event B is in roomB, Event A is in roomA, so make frank2 joined to roomB + nopRsAPI.userToJoinedRooms["@frank2:localhost"] = []string{roomIDB} + body := postRelationships(t, 200, "frank2", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "limit": 1, + "include_parent": true, + })) + assertContains(t, body, []string{eventB.EventID()}) + }) + t.Run("returns the parent if include_parent is true", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "include_parent": true, + "limit": 2, + })) + assertContains(t, body, []string{eventB.EventID(), eventA.EventID()}) + }) + t.Run("returns the children in the right order if include_children is true", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventD.EventID(), + "include_children": true, + "recent_first": true, + "limit": 4, + })) + assertContains(t, body, []string{eventD.EventID(), eventG.EventID(), eventF.EventID(), eventE.EventID()}) + body = postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventD.EventID(), + "include_children": true, + "recent_first": false, + "limit": 4, + })) + assertContains(t, body, []string{eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID()}) + }) + t.Run("walks the graph depth first", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": false, + "depth_first": true, + "limit": 6, + })) + // Oldest first so: + // A + // | + // B1 + // / \ + // C2 D3 + // /| \ + // 4E 6F G + // | + // 5H + assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventH.EventID(), eventF.EventID()}) + body = postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": true, + "depth_first": true, + "limit": 6, + })) + // Recent first so: + // A + // | + // B1 + // / \ + // C D2 + // /| \ + // E5 F4 G3 + // | + // H6 + assertContains(t, body, []string{eventB.EventID(), eventD.EventID(), eventG.EventID(), eventF.EventID(), eventE.EventID(), eventH.EventID()}) + }) + t.Run("walks the graph breadth first", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": false, + "depth_first": false, + "limit": 6, + })) + // Oldest first so: + // A + // | + // B1 + // / \ + // C2 D3 + // /| \ + // E4 F5 G6 + // | + // H + assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID()}) + body = postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": true, + "depth_first": false, + "limit": 6, + })) + // Recent first so: + // A + // | + // B1 + // / \ + // C3 D2 + // /| \ + // E6 F5 G4 + // | + // H + assertContains(t, body, []string{eventB.EventID(), eventD.EventID(), eventC.EventID(), eventG.EventID(), eventF.EventID(), eventE.EventID()}) + }) + t.Run("caps via max_breadth", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": false, + "depth_first": false, + "max_breadth": 2, + "limit": 10, + })) + // Event G gets omitted because of max_breadth + assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventH.EventID()}) + }) + t.Run("caps via max_depth", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": false, + "depth_first": false, + "max_depth": 2, + "limit": 10, + })) + // Event H gets omitted because of max_depth + assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID()}) + }) + t.Run("terminates when reaching the limit", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": false, + "depth_first": false, + "limit": 4, + })) + assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID()}) + }) + t.Run("returns all events with a high enough limit", func(t *testing.T) { + body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{ + "event_id": eventB.EventID(), + "recent_first": false, + "depth_first": false, + "limit": 400, + })) + assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID(), eventH.EventID()}) + }) +} + +// TODO: TestMSC2836TerminatesLoops (short and long) +// TODO: TestMSC2836UnknownEventsSkipped +// TODO: TestMSC2836SkipEventIfNotInRoom + +func newReq(t *testing.T, jsonBody map[string]interface{}) *msc2836.EventRelationshipRequest { + t.Helper() + b, err := json.Marshal(jsonBody) + if err != nil { + t.Fatalf("Failed to marshal request: %s", err) + } + r, err := msc2836.NewEventRelationshipRequest(bytes.NewBuffer(b)) + if err != nil { + t.Fatalf("Failed to NewEventRelationshipRequest: %s", err) + } + return r +} + +func runServer(t *testing.T, router *mux.Router) func() { + t.Helper() + externalServ := &http.Server{ + Addr: string(":8009"), + WriteTimeout: 60 * time.Second, + Handler: router, + } + go func() { + externalServ.ListenAndServe() + }() + // wait to listen on the port + time.Sleep(500 * time.Millisecond) + return func() { + externalServ.Shutdown(context.TODO()) + } +} + +func postRelationships(t *testing.T, expectCode int, accessToken string, req *msc2836.EventRelationshipRequest) *msc2836.EventRelationshipResponse { + t.Helper() + var r msc2836.EventRelationshipRequest + r.Defaults() + data, err := json.Marshal(req) + if err != nil { + t.Fatalf("failed to marshal request: %s", err) + } + httpReq, err := http.NewRequest( + "POST", "http://localhost:8009/_matrix/client/unstable/event_relationships", + bytes.NewBuffer(data), + ) + httpReq.Header.Set("Authorization", "Bearer "+accessToken) + if err != nil { + t.Fatalf("failed to prepare request: %s", err) + } + res, err := client.Do(httpReq) + if err != nil { + t.Fatalf("failed to do request: %s", err) + } + if res.StatusCode != expectCode { + body, _ := ioutil.ReadAll(res.Body) + t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body)) + } + if res.StatusCode == 200 { + var result msc2836.EventRelationshipResponse + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + t.Fatalf("response 200 OK but failed to deserialise JSON : %s", err) + } + return &result + } + return nil +} + +func assertContains(t *testing.T, result *msc2836.EventRelationshipResponse, wantEventIDs []string) { + t.Helper() + gotEventIDs := make([]string, len(result.Events)) + for i, ev := range result.Events { + gotEventIDs[i] = ev.EventID + } + if len(gotEventIDs) != len(wantEventIDs) { + t.Fatalf("length mismatch: got %v want %v", gotEventIDs, wantEventIDs) + } + for i := range gotEventIDs { + if gotEventIDs[i] != wantEventIDs[i] { + t.Errorf("wrong item in position %d - got %s want %s", i, gotEventIDs[i], wantEventIDs[i]) + } + } +} + +type testUserAPI struct { + accessTokens map[string]userapi.Device +} + +func (u *testUserAPI) InputAccountData(ctx context.Context, req *userapi.InputAccountDataRequest, res *userapi.InputAccountDataResponse) error { + return nil +} +func (u *testUserAPI) PerformAccountCreation(ctx context.Context, req *userapi.PerformAccountCreationRequest, res *userapi.PerformAccountCreationResponse) error { + return nil +} +func (u *testUserAPI) PerformPasswordUpdate(ctx context.Context, req *userapi.PerformPasswordUpdateRequest, res *userapi.PerformPasswordUpdateResponse) error { + return nil +} +func (u *testUserAPI) PerformDeviceCreation(ctx context.Context, req *userapi.PerformDeviceCreationRequest, res *userapi.PerformDeviceCreationResponse) error { + return nil +} +func (u *testUserAPI) PerformDeviceDeletion(ctx context.Context, req *userapi.PerformDeviceDeletionRequest, res *userapi.PerformDeviceDeletionResponse) error { + return nil +} +func (u *testUserAPI) PerformDeviceUpdate(ctx context.Context, req *userapi.PerformDeviceUpdateRequest, res *userapi.PerformDeviceUpdateResponse) error { + return nil +} +func (u *testUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error { + return nil +} +func (u *testUserAPI) PerformAccountDeactivation(ctx context.Context, req *userapi.PerformAccountDeactivationRequest, res *userapi.PerformAccountDeactivationResponse) error { + return nil +} +func (u *testUserAPI) QueryProfile(ctx context.Context, req *userapi.QueryProfileRequest, res *userapi.QueryProfileResponse) error { + return nil +} +func (u *testUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error { + dev, ok := u.accessTokens[req.AccessToken] + if !ok { + res.Err = fmt.Errorf("unknown token") + return nil + } + res.Device = &dev + return nil +} +func (u *testUserAPI) QueryDevices(ctx context.Context, req *userapi.QueryDevicesRequest, res *userapi.QueryDevicesResponse) error { + return nil +} +func (u *testUserAPI) QueryAccountData(ctx context.Context, req *userapi.QueryAccountDataRequest, res *userapi.QueryAccountDataResponse) error { + return nil +} +func (u *testUserAPI) QueryDeviceInfos(ctx context.Context, req *userapi.QueryDeviceInfosRequest, res *userapi.QueryDeviceInfosResponse) error { + return nil +} +func (u *testUserAPI) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error { + return nil +} + +type testRoomserverAPI struct { + // use a trace API as it implements method stubs so we don't need to have them here. + // We'll override the functions we care about. + roomserver.RoomserverInternalAPITrace + userToJoinedRooms map[string][]string + events map[string]*gomatrixserverlib.HeaderedEvent +} + +func (r *testRoomserverAPI) QueryEventsByID(ctx context.Context, req *roomserver.QueryEventsByIDRequest, res *roomserver.QueryEventsByIDResponse) error { + for _, eventID := range req.EventIDs { + ev := r.events[eventID] + if ev != nil { + res.Events = append(res.Events, ev) + } + } + return nil +} + +func (r *testRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *roomserver.QueryMembershipForUserRequest, res *roomserver.QueryMembershipForUserResponse) error { + rooms := r.userToJoinedRooms[req.UserID] + for _, roomID := range rooms { + if roomID == req.RoomID { + res.IsInRoom = true + res.HasBeenInRoom = true + res.Membership = "join" + break + } + } + return nil +} + +func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserver.RoomserverInternalAPI, events []*gomatrixserverlib.HeaderedEvent) *mux.Router { + t.Helper() + cfg := &config.Dendrite{} + cfg.Defaults() + cfg.Global.ServerName = "localhost" + cfg.MSCs.Database.ConnectionString = "file:msc2836_test.db" + cfg.MSCs.MSCs = []string{"msc2836"} + base := &setup.BaseDendrite{ + Cfg: cfg, + PublicClientAPIMux: mux.NewRouter().PathPrefix(httputil.PublicClientPathPrefix).Subrouter(), + PublicFederationAPIMux: mux.NewRouter().PathPrefix(httputil.PublicFederationPathPrefix).Subrouter(), + } + + err := msc2836.Enable(base, rsAPI, nil, userAPI, nil) + if err != nil { + t.Fatalf("failed to enable MSC2836: %s", err) + } + for _, ev := range events { + hooks.Run(hooks.KindNewEventPersisted, ev) + } + return base.PublicClientAPIMux +} + +type fledglingEvent struct { + Type string + StateKey *string + Content interface{} + Sender string + RoomID string +} + +func mustCreateEvent(t *testing.T, ev fledglingEvent) (result *gomatrixserverlib.HeaderedEvent) { + t.Helper() + roomVer := gomatrixserverlib.RoomVersionV6 + seed := make([]byte, ed25519.SeedSize) // zero seed + key := ed25519.NewKeyFromSeed(seed) + eb := gomatrixserverlib.EventBuilder{ + Sender: ev.Sender, + Depth: 999, + Type: ev.Type, + StateKey: ev.StateKey, + RoomID: ev.RoomID, + } + err := eb.SetContent(ev.Content) + if err != nil { + t.Fatalf("mustCreateEvent: failed to marshal event content %+v", ev.Content) + } + // make sure the origin_server_ts changes so we can test recency + time.Sleep(1 * time.Millisecond) + signedEvent, err := eb.Build(time.Now(), gomatrixserverlib.ServerName("localhost"), "ed25519:test", key, roomVer) + if err != nil { + t.Fatalf("mustCreateEvent: failed to sign event: %s", err) + } + h := signedEvent.Headered(roomVer) + return h +} diff --git a/setup/mscs/msc2836/storage.go b/setup/mscs/msc2836/storage.go new file mode 100644 index 00000000..72ea5195 --- /dev/null +++ b/setup/mscs/msc2836/storage.go @@ -0,0 +1,226 @@ +package msc2836 + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" +) + +type eventInfo struct { + EventID string + OriginServerTS gomatrixserverlib.Timestamp + RoomID string + Servers []string +} + +type Database interface { + // StoreRelation stores the parent->child and child->parent relationship for later querying. + // Also stores the event metadata e.g timestamp + StoreRelation(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error + // ChildrenForParent returns the events who have the given `eventID` as an m.relationship with the + // provided `relType`. The returned slice is sorted by origin_server_ts according to whether + // `recentFirst` is true or false. + ChildrenForParent(ctx context.Context, eventID, relType string, recentFirst bool) ([]eventInfo, error) +} + +type DB struct { + db *sql.DB + writer sqlutil.Writer + insertEdgeStmt *sql.Stmt + insertNodeStmt *sql.Stmt + selectChildrenForParentOldestFirstStmt *sql.Stmt + selectChildrenForParentRecentFirstStmt *sql.Stmt +} + +// NewDatabase loads the database for msc2836 +func NewDatabase(dbOpts *config.DatabaseOptions) (Database, error) { + if dbOpts.ConnectionString.IsPostgres() { + return newPostgresDatabase(dbOpts) + } + return newSQLiteDatabase(dbOpts) +} + +func newPostgresDatabase(dbOpts *config.DatabaseOptions) (Database, error) { + d := DB{ + writer: sqlutil.NewDummyWriter(), + } + var err error + if d.db, err = sqlutil.Open(dbOpts); err != nil { + return nil, err + } + _, err = d.db.Exec(` + CREATE TABLE IF NOT EXISTS msc2836_edges ( + parent_event_id TEXT NOT NULL, + child_event_id TEXT NOT NULL, + rel_type TEXT NOT NULL, + parent_room_id TEXT NOT NULL, + parent_servers TEXT NOT NULL, + CONSTRAINT msc2836_edges_uniq UNIQUE (parent_event_id, child_event_id, rel_type) + ); + + CREATE TABLE IF NOT EXISTS msc2836_nodes ( + event_id TEXT PRIMARY KEY NOT NULL, + origin_server_ts BIGINT NOT NULL, + room_id TEXT NOT NULL + ); + `) + if err != nil { + return nil, err + } + if d.insertEdgeStmt, err = d.db.Prepare(` + INSERT INTO msc2836_edges(parent_event_id, child_event_id, rel_type, parent_room_id, parent_servers) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING + `); err != nil { + return nil, err + } + if d.insertNodeStmt, err = d.db.Prepare(` + INSERT INTO msc2836_nodes(event_id, origin_server_ts, room_id) VALUES($1, $2, $3) ON CONFLICT DO NOTHING + `); err != nil { + return nil, err + } + selectChildrenQuery := ` + SELECT child_event_id, origin_server_ts, room_id FROM msc2836_edges + LEFT JOIN msc2836_nodes ON msc2836_edges.child_event_id = msc2836_nodes.event_id + WHERE parent_event_id = $1 AND rel_type = $2 + ORDER BY origin_server_ts + ` + if d.selectChildrenForParentOldestFirstStmt, err = d.db.Prepare(selectChildrenQuery + "ASC"); err != nil { + return nil, err + } + if d.selectChildrenForParentRecentFirstStmt, err = d.db.Prepare(selectChildrenQuery + "DESC"); err != nil { + return nil, err + } + return &d, err +} + +func newSQLiteDatabase(dbOpts *config.DatabaseOptions) (Database, error) { + d := DB{ + writer: sqlutil.NewExclusiveWriter(), + } + var err error + if d.db, err = sqlutil.Open(dbOpts); err != nil { + return nil, err + } + _, err = d.db.Exec(` + CREATE TABLE IF NOT EXISTS msc2836_edges ( + parent_event_id TEXT NOT NULL, + child_event_id TEXT NOT NULL, + rel_type TEXT NOT NULL, + parent_room_id TEXT NOT NULL, + parent_servers TEXT NOT NULL, + UNIQUE (parent_event_id, child_event_id, rel_type) + ); + + CREATE TABLE IF NOT EXISTS msc2836_nodes ( + event_id TEXT PRIMARY KEY NOT NULL, + origin_server_ts BIGINT NOT NULL, + room_id TEXT NOT NULL + ); + `) + if err != nil { + return nil, err + } + if d.insertEdgeStmt, err = d.db.Prepare(` + INSERT INTO msc2836_edges(parent_event_id, child_event_id, rel_type, parent_room_id, parent_servers) VALUES($1, $2, $3, $4, $5) ON CONFLICT (parent_event_id, child_event_id, rel_type) DO NOTHING + `); err != nil { + return nil, err + } + if d.insertNodeStmt, err = d.db.Prepare(` + INSERT INTO msc2836_nodes(event_id, origin_server_ts, room_id) VALUES($1, $2, $3) ON CONFLICT DO NOTHING + `); err != nil { + return nil, err + } + selectChildrenQuery := ` + SELECT child_event_id, origin_server_ts, room_id FROM msc2836_edges + LEFT JOIN msc2836_nodes ON msc2836_edges.child_event_id = msc2836_nodes.event_id + WHERE parent_event_id = $1 AND rel_type = $2 + ORDER BY origin_server_ts + ` + if d.selectChildrenForParentOldestFirstStmt, err = d.db.Prepare(selectChildrenQuery + "ASC"); err != nil { + return nil, err + } + if d.selectChildrenForParentRecentFirstStmt, err = d.db.Prepare(selectChildrenQuery + "DESC"); err != nil { + return nil, err + } + return &d, nil +} + +func (p *DB) StoreRelation(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error { + parent, child, relType := parentChildEventIDs(ev) + if parent == "" || child == "" { + return nil + } + relationRoomID, relationServers := roomIDAndServers(ev) + relationServersJSON, err := json.Marshal(relationServers) + if err != nil { + return err + } + return p.writer.Do(p.db, nil, func(txn *sql.Tx) error { + _, err := txn.Stmt(p.insertEdgeStmt).ExecContext(ctx, parent, child, relType, relationRoomID, string(relationServersJSON)) + if err != nil { + return err + } + _, err = txn.Stmt(p.insertNodeStmt).ExecContext(ctx, ev.EventID(), ev.OriginServerTS(), ev.RoomID()) + return err + }) +} + +func (p *DB) ChildrenForParent(ctx context.Context, eventID, relType string, recentFirst bool) ([]eventInfo, error) { + var rows *sql.Rows + var err error + if recentFirst { + rows, err = p.selectChildrenForParentRecentFirstStmt.QueryContext(ctx, eventID, relType) + } else { + rows, err = p.selectChildrenForParentOldestFirstStmt.QueryContext(ctx, eventID, relType) + } + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + var children []eventInfo + for rows.Next() { + var evInfo eventInfo + if err := rows.Scan(&evInfo.EventID, &evInfo.OriginServerTS, &evInfo.RoomID); err != nil { + return nil, err + } + children = append(children, evInfo) + } + return children, nil +} + +func parentChildEventIDs(ev *gomatrixserverlib.HeaderedEvent) (parent, child, relType string) { + if ev == nil { + return + } + body := struct { + Relationship struct { + RelType string `json:"rel_type"` + EventID string `json:"event_id"` + } `json:"m.relationship"` + }{} + if err := json.Unmarshal(ev.Content(), &body); err != nil { + return + } + if body.Relationship.EventID == "" || body.Relationship.RelType == "" { + return + } + return body.Relationship.EventID, ev.EventID(), body.Relationship.RelType +} + +func roomIDAndServers(ev *gomatrixserverlib.HeaderedEvent) (roomID string, servers []string) { + servers = []string{} + if ev == nil { + return + } + body := struct { + RoomID string `json:"relationship_room_id"` + Servers []string `json:"relationship_servers"` + }{} + if err := json.Unmarshal(ev.Unsigned(), &body); err != nil { + return + } + return body.RoomID, body.Servers +} diff --git a/setup/mscs/mscs.go b/setup/mscs/mscs.go new file mode 100644 index 00000000..8b0498ce --- /dev/null +++ b/setup/mscs/mscs.go @@ -0,0 +1,42 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mscs implements Matrix Spec Changes from https://github.com/matrix-org/matrix-doc +package mscs + +import ( + "fmt" + + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/mscs/msc2836" +) + +// Enable MSCs - returns an error on unknown MSCs +func Enable(base *setup.BaseDendrite, monolith *setup.Monolith) error { + for _, msc := range base.Cfg.MSCs.MSCs { + if err := EnableMSC(base, monolith, msc); err != nil { + return err + } + } + return nil +} + +func EnableMSC(base *setup.BaseDendrite, monolith *setup.Monolith, msc string) error { + switch msc { + case "msc2836": + return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationSenderAPI, monolith.UserAPI, monolith.KeyRing) + default: + return fmt.Errorf("EnableMSC: unknown msc '%s'", msc) + } +} |