aboutsummaryrefslogtreecommitdiff
path: root/setup
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-02 17:41:00 +0000
committerGitHub <noreply@github.com>2020-12-02 17:41:00 +0000
commitb5aa7ca3ab1c91397700637c91d60860a0535f1e (patch)
tree9da277c7b22027f09a7f45b0b0d771e44949e8f0 /setup
parent3ef6187e96ca2d68b3014bbd150e69971b6f7800 (diff)
Top-level setup package (#1605)
* Move config, setup, mscs into "setup" top-level folder * oops, forgot the EDU server * Add setup * goimports
Diffstat (limited to 'setup')
-rw-r--r--setup/base.go359
-rw-r--r--setup/config/config.go572
-rw-r--r--setup/config/config_appservice.go353
-rw-r--r--setup/config/config_clientapi.go123
-rw-r--r--setup/config/config_eduserver.go17
-rw-r--r--setup/config/config_federationapi.go31
-rw-r--r--setup/config/config_federationsender.go63
-rw-r--r--setup/config/config_global.go142
-rw-r--r--setup/config/config_kafka.go61
-rw-r--r--setup/config/config_keyserver.go22
-rw-r--r--setup/config/config_mediaapi.go67
-rw-r--r--setup/config/config_mscs.go19
-rw-r--r--setup/config/config_roomserver.go22
-rw-r--r--setup/config/config_signingkeyserver.go52
-rw-r--r--setup/config/config_syncapi.go29
-rw-r--r--setup/config/config_test.go285
-rw-r--r--setup/config/config_userapi.go30
-rw-r--r--setup/federation.go32
-rw-r--r--setup/flags.go52
-rw-r--r--setup/kafka/kafka.go58
-rw-r--r--setup/monolith.go76
-rw-r--r--setup/mscs/msc2836/msc2836.go530
-rw-r--r--setup/mscs/msc2836/msc2836_test.go577
-rw-r--r--setup/mscs/msc2836/storage.go226
-rw-r--r--setup/mscs/mscs.go42
25 files changed, 3840 insertions, 0 deletions
diff --git a/setup/base.go b/setup/base.go
new file mode 100644
index 00000000..acbf2d35
--- /dev/null
+++ b/setup/base.go
@@ -0,0 +1,359 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package setup
+
+import (
+ "crypto/tls"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "golang.org/x/net/http2"
+ "golang.org/x/net/http2/h2c"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/userapi/storage/accounts"
+
+ "github.com/gorilla/mux"
+
+ appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
+ asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
+ eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
+ eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
+ fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp"
+ keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
+ keyinthttp "github.com/matrix-org/dendrite/keyserver/inthttp"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp"
+ "github.com/matrix-org/dendrite/setup/config"
+ skapi "github.com/matrix-org/dendrite/signingkeyserver/api"
+ skinthttp "github.com/matrix-org/dendrite/signingkeyserver/inthttp"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp"
+ "github.com/sirupsen/logrus"
+
+ _ "net/http/pprof"
+)
+
+// BaseDendrite is a base for creating new instances of dendrite. It parses
+// command line flags and config, and exposes methods for creating various
+// resources. All errors are handled by logging then exiting, so all methods
+// should only be used during start up.
+// Must be closed when shutting down.
+type BaseDendrite struct {
+ componentName string
+ tracerCloser io.Closer
+ PublicClientAPIMux *mux.Router
+ PublicFederationAPIMux *mux.Router
+ PublicKeyAPIMux *mux.Router
+ PublicMediaAPIMux *mux.Router
+ InternalAPIMux *mux.Router
+ UseHTTPAPIs bool
+ apiHttpClient *http.Client
+ httpClient *http.Client
+ Cfg *config.Dendrite
+ Caches *caching.Caches
+ // KafkaConsumer sarama.Consumer
+ // KafkaProducer sarama.SyncProducer
+}
+
+const HTTPServerTimeout = time.Minute * 5
+const HTTPClientTimeout = time.Second * 30
+
+const NoListener = ""
+
+// NewBaseDendrite creates a new instance to be used by a component.
+// The componentName is used for logging purposes, and should be a friendly name
+// of the compontent running, e.g. "SyncAPI"
+func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite {
+ configErrors := &config.ConfigErrors{}
+ cfg.Verify(configErrors, componentName == "Monolith") // TODO: better way?
+ if len(*configErrors) > 0 {
+ for _, err := range *configErrors {
+ logrus.Errorf("Configuration error: %s", err)
+ }
+ logrus.Fatalf("Failed to start due to configuration errors")
+ }
+
+ internal.SetupStdLogging()
+ internal.SetupHookLogging(cfg.Logging, componentName)
+ internal.SetupPprof()
+
+ logrus.Infof("Dendrite version %s", internal.VersionString())
+
+ closer, err := cfg.SetupTracing("Dendrite" + componentName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to start opentracing")
+ }
+
+ cache, err := caching.NewInMemoryLRUCache(true)
+ if err != nil {
+ logrus.WithError(err).Warnf("Failed to create cache")
+ }
+
+ apiClient := http.Client{
+ Timeout: time.Minute * 10,
+ Transport: &http2.Transport{
+ AllowHTTP: true,
+ DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) {
+ // Ordinarily HTTP/2 would expect TLS, but the remote listener is
+ // H2C-enabled (HTTP/2 without encryption). Overriding the DialTLS
+ // function with a plain Dial allows us to trick the HTTP client
+ // into establishing a HTTP/2 connection without TLS.
+ // TODO: Eventually we will want to look at authenticating and
+ // encrypting these internal HTTP APIs, at which point we will have
+ // to reconsider H2C and change all this anyway.
+ return net.Dial(network, addr)
+ },
+ },
+ }
+ client := http.Client{Timeout: HTTPClientTimeout}
+ if cfg.FederationSender.Proxy.Enabled {
+ client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{
+ Scheme: cfg.FederationSender.Proxy.Protocol,
+ Host: fmt.Sprintf("%s:%d", cfg.FederationSender.Proxy.Host, cfg.FederationSender.Proxy.Port),
+ })}
+ }
+
+ // Ideally we would only use SkipClean on routes which we know can allow '/' but due to
+ // https://github.com/gorilla/mux/issues/460 we have to attach this at the top router.
+ // When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing
+ // path parameters:
+ // /foo/bar%2Fbaz == [foo, bar%2Fbaz] (from UseEncodedPath)
+ // /foo/bar%2F%2Fbaz == [foo, bar%2F%2Fbaz] (from SkipClean)
+ // In particular, rooms v3 event IDs are not urlsafe and can include '/' and because they
+ // are randomly generated it results in flakey tests.
+ // We need to be careful with media APIs if they read from a filesystem to make sure they
+ // are not inadvertently reading paths without cleaning, else this could introduce a
+ // directory traversal attack e.g /../../../etc/passwd
+ return &BaseDendrite{
+ componentName: componentName,
+ UseHTTPAPIs: useHTTPAPIs,
+ tracerCloser: closer,
+ Cfg: cfg,
+ Caches: cache,
+ PublicClientAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicClientPathPrefix).Subrouter().UseEncodedPath(),
+ PublicFederationAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath(),
+ PublicKeyAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicKeyPathPrefix).Subrouter().UseEncodedPath(),
+ PublicMediaAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicMediaPathPrefix).Subrouter().UseEncodedPath(),
+ InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
+ apiHttpClient: &apiClient,
+ httpClient: &client,
+ }
+}
+
+// Close implements io.Closer
+func (b *BaseDendrite) Close() error {
+ return b.tracerCloser.Close()
+}
+
+// AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP.
+func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI {
+ a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.apiHttpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed")
+ }
+ return a
+}
+
+// RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP.
+func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI {
+ rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.apiHttpClient, b.Caches)
+ if err != nil {
+ logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.apiHttpClient)
+ }
+ return rsAPI
+}
+
+// UserAPIClient returns UserInternalAPI for hitting the userapi over HTTP.
+func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
+ userAPI, err := userapiinthttp.NewUserAPIClient(b.Cfg.UserAPIURL(), b.apiHttpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("UserAPIClient failed", b.apiHttpClient)
+ }
+ return userAPI
+}
+
+// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
+func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
+ e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.apiHttpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("EDUServerClient failed", b.apiHttpClient)
+ }
+ return e
+}
+
+// FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting
+// the federation sender over HTTP
+func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI {
+ f, err := fsinthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.apiHttpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("FederationSenderHTTPClient failed", b.apiHttpClient)
+ }
+ return f
+}
+
+// SigningKeyServerHTTPClient returns SigningKeyServer for hitting the signing key server over HTTP
+func (b *BaseDendrite) SigningKeyServerHTTPClient() skapi.SigningKeyServerAPI {
+ f, err := skinthttp.NewSigningKeyServerClient(
+ b.Cfg.SigningKeyServerURL(),
+ b.apiHttpClient,
+ b.Caches,
+ )
+ if err != nil {
+ logrus.WithError(err).Panic("SigningKeyServerHTTPClient failed", b.httpClient)
+ }
+ return f
+}
+
+// KeyServerHTTPClient returns KeyInternalAPI for hitting the key server over HTTP
+func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI {
+ f, err := keyinthttp.NewKeyServerClient(b.Cfg.KeyServerURL(), b.apiHttpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("KeyServerHTTPClient failed", b.apiHttpClient)
+ }
+ return f
+}
+
+// CreateAccountsDB creates a new instance of the accounts database. Should only
+// be called once per component.
+func (b *BaseDendrite) CreateAccountsDB() accounts.Database {
+ db, err := accounts.NewDatabase(&b.Cfg.UserAPI.AccountDatabase, b.Cfg.Global.ServerName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to accounts db")
+ }
+
+ return db
+}
+
+// CreateClient creates a new client (normally used for media fetch requests).
+// Should only be called once per component.
+func (b *BaseDendrite) CreateClient() *gomatrixserverlib.Client {
+ if b.Cfg.Global.DisableFederation {
+ return gomatrixserverlib.NewClientWithTransport(noOpHTTPTransport)
+ }
+ client := gomatrixserverlib.NewClient(
+ b.Cfg.FederationSender.DisableTLSValidation,
+ )
+ client.SetUserAgent(fmt.Sprintf("Dendrite/%s", internal.VersionString()))
+ return client
+}
+
+// CreateFederationClient creates a new federation client. Should only be called
+// once per component.
+func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient {
+ if b.Cfg.Global.DisableFederation {
+ return gomatrixserverlib.NewFederationClientWithTransport(
+ b.Cfg.Global.ServerName, b.Cfg.Global.KeyID, b.Cfg.Global.PrivateKey,
+ b.Cfg.FederationSender.DisableTLSValidation, noOpHTTPTransport,
+ )
+ }
+ client := gomatrixserverlib.NewFederationClientWithTimeout(
+ b.Cfg.Global.ServerName, b.Cfg.Global.KeyID, b.Cfg.Global.PrivateKey,
+ b.Cfg.FederationSender.DisableTLSValidation, time.Minute*5,
+ )
+ client.SetUserAgent(fmt.Sprintf("Dendrite/%s", internal.VersionString()))
+ return client
+}
+
+// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on
+// ApiMux under /api/ and adds a prometheus handler under /metrics.
+// nolint:gocyclo
+func (b *BaseDendrite) SetupAndServeHTTP(
+ internalHTTPAddr, externalHTTPAddr config.HTTPAddress,
+ certFile, keyFile *string,
+) {
+ internalAddr, _ := internalHTTPAddr.Address()
+ externalAddr, _ := externalHTTPAddr.Address()
+
+ externalRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
+ internalRouter := externalRouter
+
+ externalServ := &http.Server{
+ Addr: string(externalAddr),
+ WriteTimeout: HTTPServerTimeout,
+ Handler: externalRouter,
+ }
+ internalServ := externalServ
+
+ if internalAddr != NoListener && externalAddr != internalAddr {
+ // H2C allows us to accept HTTP/2 connections without TLS
+ // encryption. Since we don't currently require any form of
+ // authentication or encryption on these internal HTTP APIs,
+ // H2C gives us all of the advantages of HTTP/2 (such as
+ // stream multiplexing and avoiding head-of-line blocking)
+ // without enabling TLS.
+ internalH2S := &http2.Server{}
+ internalRouter = mux.NewRouter().SkipClean(true).UseEncodedPath()
+ internalServ = &http.Server{
+ Addr: string(internalAddr),
+ Handler: h2c.NewHandler(internalRouter, internalH2S),
+ }
+ }
+
+ internalRouter.PathPrefix(httputil.InternalPathPrefix).Handler(b.InternalAPIMux)
+ if b.Cfg.Global.Metrics.Enabled {
+ internalRouter.Handle("/metrics", httputil.WrapHandlerInBasicAuth(promhttp.Handler(), b.Cfg.Global.Metrics.BasicAuth))
+ }
+
+ externalRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(b.PublicClientAPIMux)
+ if !b.Cfg.Global.DisableFederation {
+ externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux)
+ externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(b.PublicFederationAPIMux)
+ }
+ externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux)
+
+ if internalAddr != NoListener && internalAddr != externalAddr {
+ go func() {
+ logrus.Infof("Starting internal %s listener on %s", b.componentName, internalServ.Addr)
+ if certFile != nil && keyFile != nil {
+ if err := internalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil {
+ logrus.WithError(err).Fatal("failed to serve HTTPS")
+ }
+ } else {
+ if err := internalServ.ListenAndServe(); err != nil {
+ logrus.WithError(err).Fatal("failed to serve HTTP")
+ }
+ }
+ logrus.Infof("Stopped internal %s listener on %s", b.componentName, internalServ.Addr)
+ }()
+ }
+
+ if externalAddr != NoListener {
+ go func() {
+ logrus.Infof("Starting external %s listener on %s", b.componentName, externalServ.Addr)
+ if certFile != nil && keyFile != nil {
+ if err := externalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil {
+ logrus.WithError(err).Fatal("failed to serve HTTPS")
+ }
+ } else {
+ if err := externalServ.ListenAndServe(); err != nil {
+ logrus.WithError(err).Fatal("failed to serve HTTP")
+ }
+ }
+ logrus.Infof("Stopped external %s listener on %s", b.componentName, externalServ.Addr)
+ }()
+ }
+
+ select {}
+}
diff --git a/setup/config/config.go b/setup/config/config.go
new file mode 100644
index 00000000..b8b12d0c
--- /dev/null
+++ b/setup/config/config.go
@@ -0,0 +1,572 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "bytes"
+ "encoding/pem"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/url"
+ "path/filepath"
+ "regexp"
+ "strings"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ed25519"
+ yaml "gopkg.in/yaml.v2"
+
+ jaegerconfig "github.com/uber/jaeger-client-go/config"
+ jaegermetrics "github.com/uber/jaeger-lib/metrics"
+)
+
+// keyIDRegexp defines allowable characters in Key IDs.
+var keyIDRegexp = regexp.MustCompile("^ed25519:[a-zA-Z0-9_]+$")
+
+// Version is the current version of the config format.
+// This will change whenever we make breaking changes to the config format.
+const Version = 1
+
+// Dendrite contains all the config used by a dendrite process.
+// Relative paths are resolved relative to the current working directory
+type Dendrite struct {
+ // The version of the configuration file.
+ // If the version in a file doesn't match the current dendrite config
+ // version then we can give a clear error message telling the user
+ // to update their config file to the current version.
+ // The version of the file should only be different if there has
+ // been a breaking change to the config file format.
+ Version int `yaml:"version"`
+
+ Global Global `yaml:"global"`
+ AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
+ ClientAPI ClientAPI `yaml:"client_api"`
+ EDUServer EDUServer `yaml:"edu_server"`
+ FederationAPI FederationAPI `yaml:"federation_api"`
+ FederationSender FederationSender `yaml:"federation_sender"`
+ KeyServer KeyServer `yaml:"key_server"`
+ MediaAPI MediaAPI `yaml:"media_api"`
+ RoomServer RoomServer `yaml:"room_server"`
+ SigningKeyServer SigningKeyServer `yaml:"signing_key_server"`
+ SyncAPI SyncAPI `yaml:"sync_api"`
+ UserAPI UserAPI `yaml:"user_api"`
+
+ MSCs MSCs `yaml:"mscs"`
+
+ // The config for tracing the dendrite servers.
+ Tracing struct {
+ // Set to true to enable tracer hooks. If false, no tracing is set up.
+ Enabled bool `yaml:"enabled"`
+ // The config for the jaeger opentracing reporter.
+ Jaeger jaegerconfig.Configuration `yaml:"jaeger"`
+ } `yaml:"tracing"`
+
+ // The config for logging informations. Each hook will be added to logrus.
+ Logging []LogrusHook `yaml:"logging"`
+
+ // Any information derived from the configuration options for later use.
+ Derived Derived `yaml:"-"`
+}
+
+// TODO: Kill Derived
+type Derived struct {
+ Registration struct {
+ // Flows is a slice of flows, which represent one possible way that the client can authenticate a request.
+ // http://matrix.org/docs/spec/HEAD/client_server/r0.3.0.html#user-interactive-authentication-api
+ // As long as the generated flows only rely on config file options,
+ // we can generate them on startup and store them until needed
+ Flows []authtypes.Flow `json:"flows"`
+
+ // Params that need to be returned to the client during
+ // registration in order to complete registration stages.
+ Params map[string]interface{} `json:"params"`
+ }
+
+ // Application services parsed from their config files
+ // The paths of which were given above in the main config file
+ ApplicationServices []ApplicationService
+
+ // Meta-regexes compiled from all exclusive application service
+ // Regexes.
+ //
+ // When a user registers, we check that their username does not match any
+ // exclusive application service namespaces
+ ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
+ // When a user creates a room alias, we check that it isn't already
+ // reserved by an application service
+ ExclusiveApplicationServicesAliasRegexp *regexp.Regexp
+ // Note: An Exclusive Regex for room ID isn't necessary as we aren't blocking
+ // servers from creating RoomIDs in exclusive application service namespaces
+}
+
+type InternalAPIOptions struct {
+ Listen HTTPAddress `yaml:"listen"`
+ Connect HTTPAddress `yaml:"connect"`
+}
+
+type ExternalAPIOptions struct {
+ Listen HTTPAddress `yaml:"listen"`
+}
+
+// A Path on the filesystem.
+type Path string
+
+// A DataSource for opening a postgresql database using lib/pq.
+type DataSource string
+
+func (d DataSource) IsSQLite() bool {
+ return strings.HasPrefix(string(d), "file:")
+}
+
+func (d DataSource) IsPostgres() bool {
+ // commented line may not always be true?
+ // return strings.HasPrefix(string(d), "postgres:")
+ return !d.IsSQLite()
+}
+
+// A Topic in kafka.
+type Topic string
+
+// An Address to listen on.
+type Address string
+
+// An HTTPAddress to listen on, starting with either http:// or https://.
+type HTTPAddress string
+
+func (h HTTPAddress) Address() (Address, error) {
+ url, err := url.Parse(string(h))
+ if err != nil {
+ return "", err
+ }
+ return Address(url.Host), nil
+}
+
+// FileSizeBytes is a file size in bytes
+type FileSizeBytes int64
+
+// ThumbnailSize contains a single thumbnail size configuration
+type ThumbnailSize struct {
+ // Maximum width of the thumbnail image
+ Width int `yaml:"width"`
+ // Maximum height of the thumbnail image
+ Height int `yaml:"height"`
+ // ResizeMethod is one of crop or scale.
+ // crop scales to fill the requested dimensions and crops the excess.
+ // scale scales to fit the requested dimensions and one dimension may be smaller than requested.
+ ResizeMethod string `yaml:"method,omitempty"`
+}
+
+// LogrusHook represents a single logrus hook. At this point, only parsing and
+// verification of the proper values for type and level are done.
+// Validity/integrity checks on the parameters are done when configuring logrus.
+type LogrusHook struct {
+ // The type of hook, currently only "file" is supported.
+ Type string `yaml:"type"`
+
+ // The level of the logs to produce. Will output only this level and above.
+ Level string `yaml:"level"`
+
+ // The parameters for this hook.
+ Params map[string]interface{} `yaml:"params"`
+}
+
+// ConfigErrors stores problems encountered when parsing a config file.
+// It implements the error interface.
+type ConfigErrors []string
+
+// Load a yaml config file for a server run as multiple processes or as a monolith.
+// Checks the config to ensure that it is valid.
+func Load(configPath string, monolith bool) (*Dendrite, error) {
+ configData, err := ioutil.ReadFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+ basePath, err := filepath.Abs(".")
+ if err != nil {
+ return nil, err
+ }
+ // Pass the current working directory and ioutil.ReadFile so that they can
+ // be mocked in the tests
+ return loadConfig(basePath, configData, ioutil.ReadFile, monolith)
+}
+
+func loadConfig(
+ basePath string,
+ configData []byte,
+ readFile func(string) ([]byte, error),
+ monolithic bool,
+) (*Dendrite, error) {
+ var c Dendrite
+ c.Defaults()
+
+ var err error
+ if err = yaml.Unmarshal(configData, &c); err != nil {
+ return nil, err
+ }
+
+ if err = c.check(monolithic); err != nil {
+ return nil, err
+ }
+
+ privateKeyPath := absPath(basePath, c.Global.PrivateKeyPath)
+ privateKeyData, err := readFile(privateKeyPath)
+ if err != nil {
+ return nil, err
+ }
+
+ if c.Global.KeyID, c.Global.PrivateKey, err = readKeyPEM(privateKeyPath, privateKeyData, true); err != nil {
+ return nil, err
+ }
+
+ for i, oldPrivateKey := range c.Global.OldVerifyKeys {
+ var oldPrivateKeyData []byte
+
+ oldPrivateKeyPath := absPath(basePath, oldPrivateKey.PrivateKeyPath)
+ oldPrivateKeyData, err = readFile(oldPrivateKeyPath)
+ if err != nil {
+ return nil, err
+ }
+
+ // NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are
+ // a number of private keys out there with non-compatible symbols in them due
+ // to lack of validation in Synapse, we won't enforce that for old verify keys.
+ keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false)
+ if perr != nil {
+ return nil, perr
+ }
+
+ c.Global.OldVerifyKeys[i].KeyID, c.Global.OldVerifyKeys[i].PrivateKey = keyID, privateKey
+ }
+
+ c.MediaAPI.AbsBasePath = Path(absPath(basePath, c.MediaAPI.BasePath))
+
+ // Generate data from config options
+ err = c.Derive()
+ if err != nil {
+ return nil, err
+ }
+
+ c.Wiring()
+ return &c, nil
+}
+
+// Derive generates data that is derived from various values provided in
+// the config file.
+func (config *Dendrite) Derive() error {
+ // Determine registrations flows based off config values
+
+ config.Derived.Registration.Params = make(map[string]interface{})
+
+ // TODO: Add email auth type
+ // TODO: Add MSISDN auth type
+
+ if config.ClientAPI.RecaptchaEnabled {
+ config.Derived.Registration.Params[authtypes.LoginTypeRecaptcha] = map[string]string{"public_key": config.ClientAPI.RecaptchaPublicKey}
+ config.Derived.Registration.Flows = append(config.Derived.Registration.Flows,
+ authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeRecaptcha}})
+ } else {
+ config.Derived.Registration.Flows = append(config.Derived.Registration.Flows,
+ authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeDummy}})
+ }
+
+ // Load application service configuration files
+ if err := loadAppServices(&config.AppServiceAPI, &config.Derived); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// SetDefaults sets default config values if they are not explicitly set.
+func (c *Dendrite) Defaults() {
+ c.Version = 1
+
+ c.Global.Defaults()
+ c.ClientAPI.Defaults()
+ c.EDUServer.Defaults()
+ c.FederationAPI.Defaults()
+ c.FederationSender.Defaults()
+ c.KeyServer.Defaults()
+ c.MediaAPI.Defaults()
+ c.RoomServer.Defaults()
+ c.SigningKeyServer.Defaults()
+ c.SyncAPI.Defaults()
+ c.UserAPI.Defaults()
+ c.AppServiceAPI.Defaults()
+ c.MSCs.Defaults()
+
+ c.Wiring()
+}
+
+func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ type verifiable interface {
+ Verify(configErrs *ConfigErrors, isMonolith bool)
+ }
+ for _, c := range []verifiable{
+ &c.Global, &c.ClientAPI,
+ &c.EDUServer, &c.FederationAPI, &c.FederationSender,
+ &c.KeyServer, &c.MediaAPI, &c.RoomServer,
+ &c.SigningKeyServer, &c.SyncAPI, &c.UserAPI,
+ &c.AppServiceAPI, &c.MSCs,
+ } {
+ c.Verify(configErrs, isMonolith)
+ }
+}
+
+func (c *Dendrite) Wiring() {
+ c.ClientAPI.Matrix = &c.Global
+ c.EDUServer.Matrix = &c.Global
+ c.FederationAPI.Matrix = &c.Global
+ c.FederationSender.Matrix = &c.Global
+ c.KeyServer.Matrix = &c.Global
+ c.MediaAPI.Matrix = &c.Global
+ c.RoomServer.Matrix = &c.Global
+ c.SigningKeyServer.Matrix = &c.Global
+ c.SyncAPI.Matrix = &c.Global
+ c.UserAPI.Matrix = &c.Global
+ c.AppServiceAPI.Matrix = &c.Global
+ c.MSCs.Matrix = &c.Global
+
+ c.ClientAPI.Derived = &c.Derived
+ c.AppServiceAPI.Derived = &c.Derived
+}
+
+// Error returns a string detailing how many errors were contained within a
+// configErrors type.
+func (errs ConfigErrors) Error() string {
+ if len(errs) == 1 {
+ return errs[0]
+ }
+ return fmt.Sprintf(
+ "%s (and %d other problems)", errs[0], len(errs)-1,
+ )
+}
+
+// Add appends an error to the list of errors in this configErrors.
+// It is a wrapper to the builtin append and hides pointers from
+// the client code.
+// This method is safe to use with an uninitialized configErrors because
+// if it is nil, it will be properly allocated.
+func (errs *ConfigErrors) Add(str string) {
+ *errs = append(*errs, str)
+}
+
+// checkNotEmpty verifies the given value is not empty in the configuration.
+// If it is, adds an error to the list.
+func checkNotEmpty(configErrs *ConfigErrors, key, value string) {
+ if value == "" {
+ configErrs.Add(fmt.Sprintf("missing config key %q", key))
+ }
+}
+
+// checkNotZero verifies the given value is not zero in the configuration.
+// If it is, adds an error to the list.
+func checkNotZero(configErrs *ConfigErrors, key string, value int64) {
+ if value == 0 {
+ configErrs.Add(fmt.Sprintf("missing config key %q", key))
+ }
+}
+
+// checkPositive verifies the given value is positive (zero included)
+// in the configuration. If it is not, adds an error to the list.
+func checkPositive(configErrs *ConfigErrors, key string, value int64) {
+ if value < 0 {
+ configErrs.Add(fmt.Sprintf("invalid value for config key %q: %d", key, value))
+ }
+}
+
+// checkURL verifies that the parameter is a valid URL
+func checkURL(configErrs *ConfigErrors, key, value string) {
+ if value == "" {
+ configErrs.Add(fmt.Sprintf("missing config key %q", key))
+ return
+ }
+ url, err := url.Parse(value)
+ if err != nil {
+ configErrs.Add(fmt.Sprintf("config key %q contains invalid URL (%s)", key, err.Error()))
+ return
+ }
+ switch url.Scheme {
+ case "http":
+ case "https":
+ default:
+ configErrs.Add(fmt.Sprintf("config key %q URL should be http:// or https://", key))
+ return
+ }
+}
+
+// checkLogging verifies the parameters logging.* are valid.
+func (config *Dendrite) checkLogging(configErrs *ConfigErrors) {
+ for _, logrusHook := range config.Logging {
+ checkNotEmpty(configErrs, "logging.type", string(logrusHook.Type))
+ checkNotEmpty(configErrs, "logging.level", string(logrusHook.Level))
+ }
+}
+
+// check returns an error type containing all errors found within the config
+// file.
+func (config *Dendrite) check(_ bool) error { // monolithic
+ var configErrs ConfigErrors
+
+ if config.Version != Version {
+ configErrs.Add(fmt.Sprintf(
+ "unknown config version %q, expected %q", config.Version, Version,
+ ))
+ return configErrs
+ }
+
+ config.checkLogging(&configErrs)
+
+ // Due to how Golang manages its interface types, this condition is not redundant.
+ // In order to get the proper behaviour, it is necessary to return an explicit nil
+ // and not a nil configErrors.
+ // This is because the following equalities hold:
+ // error(nil) == nil
+ // error(configErrors(nil)) != nil
+ if configErrs != nil {
+ return configErrs
+ }
+ return nil
+}
+
+// absPath returns the absolute path for a given relative or absolute path.
+func absPath(dir string, path Path) string {
+ if filepath.IsAbs(string(path)) {
+ // filepath.Join cleans the path so we should clean the absolute paths as well for consistency.
+ return filepath.Clean(string(path))
+ }
+ return filepath.Join(dir, string(path))
+}
+
+func readKeyPEM(path string, data []byte, enforceKeyIDFormat bool) (gomatrixserverlib.KeyID, ed25519.PrivateKey, error) {
+ for {
+ var keyBlock *pem.Block
+ keyBlock, data = pem.Decode(data)
+ if data == nil {
+ return "", nil, fmt.Errorf("no matrix private key PEM data in %q", path)
+ }
+ if keyBlock == nil {
+ return "", nil, fmt.Errorf("keyBlock is nil %q", path)
+ }
+ if keyBlock.Type == "MATRIX PRIVATE KEY" {
+ keyID := keyBlock.Headers["Key-ID"]
+ if keyID == "" {
+ return "", nil, fmt.Errorf("missing key ID in PEM data in %q", path)
+ }
+ if !strings.HasPrefix(keyID, "ed25519:") {
+ return "", nil, fmt.Errorf("key ID %q doesn't start with \"ed25519:\" in %q", keyID, path)
+ }
+ if enforceKeyIDFormat && !keyIDRegexp.MatchString(keyID) {
+ return "", nil, fmt.Errorf("key ID %q in %q contains illegal characters (use a-z, A-Z, 0-9 and _ only)", keyID, path)
+ }
+ _, privKey, err := ed25519.GenerateKey(bytes.NewReader(keyBlock.Bytes))
+ if err != nil {
+ return "", nil, err
+ }
+ return gomatrixserverlib.KeyID(keyID), privKey, nil
+ }
+ }
+}
+
+// AppServiceURL returns a HTTP URL for where the appservice component is listening.
+func (config *Dendrite) AppServiceURL() string {
+ // Hard code the appservice server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.AppServiceAPI.InternalAPI.Connect)
+}
+
+// RoomServerURL returns an HTTP URL for where the roomserver is listening.
+func (config *Dendrite) RoomServerURL() string {
+ // Hard code the roomserver to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.RoomServer.InternalAPI.Connect)
+}
+
+// UserAPIURL returns an HTTP URL for where the userapi is listening.
+func (config *Dendrite) UserAPIURL() string {
+ // Hard code the userapi to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.UserAPI.InternalAPI.Connect)
+}
+
+// EDUServerURL returns an HTTP URL for where the EDU server is listening.
+func (config *Dendrite) EDUServerURL() string {
+ // Hard code the EDU server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.EDUServer.InternalAPI.Connect)
+}
+
+// FederationSenderURL returns an HTTP URL for where the federation sender is listening.
+func (config *Dendrite) FederationSenderURL() string {
+ // Hard code the federation sender server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.FederationSender.InternalAPI.Connect)
+}
+
+// SigningKeyServerURL returns an HTTP URL for where the signing key server is listening.
+func (config *Dendrite) SigningKeyServerURL() string {
+ // Hard code the signing key server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.SigningKeyServer.InternalAPI.Connect)
+}
+
+// KeyServerURL returns an HTTP URL for where the key server is listening.
+func (config *Dendrite) KeyServerURL() string {
+ // Hard code the key server to talk HTTP for now.
+ // If we support HTTPS we need to think of a practical way to do certificate validation.
+ // People setting up servers shouldn't need to get a certificate valid for the public
+ // internet for an internal API.
+ return string(config.KeyServer.InternalAPI.Connect)
+}
+
+// SetupTracing configures the opentracing using the supplied configuration.
+func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
+ if !config.Tracing.Enabled {
+ return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
+ }
+ return config.Tracing.Jaeger.InitGlobalTracer(
+ serviceName,
+ jaegerconfig.Logger(logrusLogger{logrus.StandardLogger()}),
+ jaegerconfig.Metrics(jaegermetrics.NullFactory),
+ )
+}
+
+// logrusLogger is a small wrapper that implements jaeger.Logger using logrus.
+type logrusLogger struct {
+ l *logrus.Logger
+}
+
+func (l logrusLogger) Error(msg string) {
+ l.l.Error(msg)
+}
+
+func (l logrusLogger) Infof(msg string, args ...interface{}) {
+ l.l.Infof(msg, args...)
+}
diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go
new file mode 100644
index 00000000..a042691d
--- /dev/null
+++ b/setup/config/config_appservice.go
@@ -0,0 +1,353 @@
+// Copyright 2017 Andrew Morgan <andrew@amorgan.xyz>
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "fmt"
+ "io/ioutil"
+ "path/filepath"
+ "regexp"
+ "strings"
+
+ log "github.com/sirupsen/logrus"
+ yaml "gopkg.in/yaml.v2"
+)
+
+type AppServiceAPI struct {
+ Matrix *Global `yaml:"-"`
+ Derived *Derived `yaml:"-"` // TODO: Nuke Derived from orbit
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+
+ Database DatabaseOptions `yaml:"database"`
+
+ ConfigFiles []string `yaml:"config_files"`
+}
+
+func (c *AppServiceAPI) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7777"
+ c.InternalAPI.Connect = "http://localhost:7777"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:appservice.db"
+}
+
+func (c *AppServiceAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "app_service_api.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "app_service_api.internal_api.bind", string(c.InternalAPI.Connect))
+ checkNotEmpty(configErrs, "app_service_api.database.connection_string", string(c.Database.ConnectionString))
+}
+
+// ApplicationServiceNamespace is the namespace that a specific application
+// service has management over.
+type ApplicationServiceNamespace struct {
+ // Whether or not the namespace is managed solely by this application service
+ Exclusive bool `yaml:"exclusive"`
+ // A regex pattern that represents the namespace
+ Regex string `yaml:"regex"`
+ // The ID of an existing group that all users of this application service will
+ // be added to. This field is only relevant to the `users` namespace.
+ // Note that users who are joined to this group through an application service
+ // are not to be listed when querying for the group's members, however the
+ // group should be listed when querying an application service user's groups.
+ // This is to prevent making spamming all users of an application service
+ // trivial.
+ GroupID string `yaml:"group_id"`
+ // Regex object representing our pattern. Saves having to recompile every time
+ RegexpObject *regexp.Regexp
+}
+
+// ApplicationService represents a Matrix application service.
+// https://matrix.org/docs/spec/application_service/unstable.html
+type ApplicationService struct {
+ // User-defined, unique, persistent ID of the application service
+ ID string `yaml:"id"`
+ // Base URL of the application service
+ URL string `yaml:"url"`
+ // Application service token provided in requests to a homeserver
+ ASToken string `yaml:"as_token"`
+ // Homeserver token provided in requests to an application service
+ HSToken string `yaml:"hs_token"`
+ // Localpart of application service user
+ SenderLocalpart string `yaml:"sender_localpart"`
+ // Information about an application service's namespaces. Key is either
+ // "users", "aliases" or "rooms"
+ NamespaceMap map[string][]ApplicationServiceNamespace `yaml:"namespaces"`
+ // Whether rate limiting is applied to each application service user
+ RateLimited bool `yaml:"rate_limited"`
+ // Any custom protocols that this application service provides (e.g. IRC)
+ Protocols []string `yaml:"protocols"`
+}
+
+// IsInterestedInRoomID returns a bool on whether an application service's
+// namespace includes the given room ID
+func (a *ApplicationService) IsInterestedInRoomID(
+ roomID string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["rooms"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.RegexpObject.MatchString(roomID) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// IsInterestedInUserID returns a bool on whether an application service's
+// namespace includes the given user ID
+func (a *ApplicationService) IsInterestedInUserID(
+ userID string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["users"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.RegexpObject.MatchString(userID) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// OwnsNamespaceCoveringUserId returns a bool on whether an application service's
+// namespace is exclusive and includes the given user ID
+func (a *ApplicationService) OwnsNamespaceCoveringUserId(
+ userID string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["users"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.Exclusive && namespace.RegexpObject.MatchString(userID) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// IsInterestedInRoomAlias returns a bool on whether an application service's
+// namespace includes the given room alias
+func (a *ApplicationService) IsInterestedInRoomAlias(
+ roomAlias string,
+) bool {
+ if namespaceSlice, ok := a.NamespaceMap["aliases"]; ok {
+ for _, namespace := range namespaceSlice {
+ if namespace.RegexpObject.MatchString(roomAlias) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// loadAppServices iterates through all application service config files
+// and loads their data into the config object for later access.
+func loadAppServices(config *AppServiceAPI, derived *Derived) error {
+ for _, configPath := range config.ConfigFiles {
+ // Create a new application service with default options
+ appservice := ApplicationService{
+ RateLimited: true,
+ }
+
+ // Create an absolute path from a potentially relative path
+ absPath, err := filepath.Abs(configPath)
+ if err != nil {
+ return err
+ }
+
+ // Read the application service's config file
+ configData, err := ioutil.ReadFile(absPath)
+ if err != nil {
+ return err
+ }
+
+ // Load the config data into our struct
+ if err = yaml.UnmarshalStrict(configData, &appservice); err != nil {
+ return err
+ }
+
+ // Append the parsed application service to the global config
+ derived.ApplicationServices = append(
+ derived.ApplicationServices, appservice,
+ )
+ }
+
+ // Check for any errors in the loaded application services
+ return checkErrors(config, derived)
+}
+
+// setupRegexps will create regex objects for exclusive and non-exclusive
+// usernames, aliases and rooms of all application services, so that other
+// methods can quickly check if a particular string matches any of them.
+func setupRegexps(_ *AppServiceAPI, derived *Derived) (err error) {
+ // Combine all exclusive namespaces for later string checking
+ var exclusiveUsernameStrings, exclusiveAliasStrings []string
+
+ // If an application service's regex is marked as exclusive, add
+ // its contents to the overall exlusive regex string. Room regex
+ // not necessary as we aren't denying exclusive room ID creation
+ for _, appservice := range derived.ApplicationServices {
+ for key, namespaceSlice := range appservice.NamespaceMap {
+ switch key {
+ case "users":
+ appendExclusiveNamespaceRegexs(&exclusiveUsernameStrings, namespaceSlice)
+ case "aliases":
+ appendExclusiveNamespaceRegexs(&exclusiveAliasStrings, namespaceSlice)
+ }
+ }
+ }
+
+ // Join the regexes together into one big regex.
+ // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
+ // Later we can check if a username or alias matches any exclusive regex and
+ // deny access if it isn't from an application service
+ exclusiveUsernames := strings.Join(exclusiveUsernameStrings, "|")
+ exclusiveAliases := strings.Join(exclusiveAliasStrings, "|")
+
+ // If there are no exclusive regexes, compile string so that it will not match
+ // any valid usernames/aliases/roomIDs
+ if exclusiveUsernames == "" {
+ exclusiveUsernames = "^$"
+ }
+ if exclusiveAliases == "" {
+ exclusiveAliases = "^$"
+ }
+
+ // Store compiled Regex
+ if derived.ExclusiveApplicationServicesUsernameRegexp, err = regexp.Compile(exclusiveUsernames); err != nil {
+ return err
+ }
+ if derived.ExclusiveApplicationServicesAliasRegexp, err = regexp.Compile(exclusiveAliases); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// appendExclusiveNamespaceRegexs takes a slice of strings and a slice of
+// namespaces and will append the regexes of only the exclusive namespaces
+// into the string slice
+func appendExclusiveNamespaceRegexs(
+ exclusiveStrings *[]string, namespaces []ApplicationServiceNamespace,
+) {
+ for index, namespace := range namespaces {
+ if namespace.Exclusive {
+ // We append parenthesis to later separate each regex when we compile
+ // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
+ *exclusiveStrings = append(*exclusiveStrings, "("+namespace.Regex+")")
+ }
+
+ // Compile this regex into a Regexp object for later use
+ namespaces[index].RegexpObject, _ = regexp.Compile(namespace.Regex)
+ }
+}
+
+// checkErrors checks for any configuration errors amongst the loaded
+// application services according to the application service spec.
+func checkErrors(config *AppServiceAPI, derived *Derived) (err error) {
+ var idMap = make(map[string]bool)
+ var tokenMap = make(map[string]bool)
+
+ // Compile regexp object for checking groupIDs
+ groupIDRegexp := regexp.MustCompile(`\+.*:.*`)
+
+ // Check each application service for any config errors
+ for _, appservice := range derived.ApplicationServices {
+ // Namespace-related checks
+ for key, namespaceSlice := range appservice.NamespaceMap {
+ for _, namespace := range namespaceSlice {
+ if err := validateNamespace(&appservice, key, &namespace, groupIDRegexp); err != nil {
+ return err
+ }
+ }
+ }
+
+ // Check if the url has trailing /'s. If so, remove them
+ appservice.URL = strings.TrimRight(appservice.URL, "/")
+
+ // Check if we've already seen this ID. No two application services
+ // can have the same ID or token.
+ if idMap[appservice.ID] {
+ return ConfigErrors([]string{fmt.Sprintf(
+ "Application service ID %s must be unique", appservice.ID,
+ )})
+ }
+ // Check if we've already seen this token
+ if tokenMap[appservice.ASToken] {
+ return ConfigErrors([]string{fmt.Sprintf(
+ "Application service Token %s must be unique", appservice.ASToken,
+ )})
+ }
+
+ // Add the id/token to their respective maps if we haven't already
+ // seen them.
+ idMap[appservice.ID] = true
+ tokenMap[appservice.ASToken] = true
+
+ // TODO: Remove once rate_limited is implemented
+ if appservice.RateLimited {
+ log.Warn("WARNING: Application service option rate_limited is currently unimplemented")
+ }
+ // TODO: Remove once protocols is implemented
+ if len(appservice.Protocols) > 0 {
+ log.Warn("WARNING: Application service option protocols is currently unimplemented")
+ }
+ }
+
+ return setupRegexps(config, derived)
+}
+
+// validateNamespace returns nil or an error based on whether a given
+// application service namespace is valid. A namespace is valid if it has the
+// required fields, and its regex is correct.
+func validateNamespace(
+ appservice *ApplicationService,
+ key string,
+ namespace *ApplicationServiceNamespace,
+ groupIDRegexp *regexp.Regexp,
+) error {
+ // Check that namespace(s) are valid regex
+ if !IsValidRegex(namespace.Regex) {
+ return ConfigErrors([]string{fmt.Sprintf(
+ "Invalid regex string for Application Service %s", appservice.ID,
+ )})
+ }
+
+ // Check if GroupID for the users namespace is in the correct format
+ if key == "users" && namespace.GroupID != "" {
+ // TODO: Remove once group_id is implemented
+ log.Warn("WARNING: Application service option group_id is currently unimplemented")
+
+ correctFormat := groupIDRegexp.MatchString(namespace.GroupID)
+ if !correctFormat {
+ return ConfigErrors([]string{fmt.Sprintf(
+ "Invalid user group_id field for application service %s.",
+ appservice.ID,
+ )})
+ }
+ }
+
+ return nil
+}
+
+// IsValidRegex returns true or false based on whether the
+// given string is valid regex or not
+func IsValidRegex(regexString string) bool {
+ _, err := regexp.Compile(regexString)
+
+ return err == nil
+}
diff --git a/setup/config/config_clientapi.go b/setup/config/config_clientapi.go
new file mode 100644
index 00000000..52115491
--- /dev/null
+++ b/setup/config/config_clientapi.go
@@ -0,0 +1,123 @@
+package config
+
+import (
+ "fmt"
+ "time"
+)
+
+type ClientAPI struct {
+ Matrix *Global `yaml:"-"`
+ Derived *Derived `yaml:"-"` // TODO: Nuke Derived from orbit
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+ ExternalAPI ExternalAPIOptions `yaml:"external_api"`
+
+ // If set disables new users from registering (except via shared
+ // secrets)
+ RegistrationDisabled bool `yaml:"registration_disabled"`
+ // If set, allows registration by anyone who also has the shared
+ // secret, even if registration is otherwise disabled.
+ RegistrationSharedSecret string `yaml:"registration_shared_secret"`
+
+ // Boolean stating whether catpcha registration is enabled
+ // and required
+ RecaptchaEnabled bool `yaml:"enable_registration_captcha"`
+ // This Home Server's ReCAPTCHA public key.
+ RecaptchaPublicKey string `yaml:"recaptcha_public_key"`
+ // This Home Server's ReCAPTCHA private key.
+ RecaptchaPrivateKey string `yaml:"recaptcha_private_key"`
+ // Secret used to bypass the captcha registration entirely
+ RecaptchaBypassSecret string `yaml:"recaptcha_bypass_secret"`
+ // HTTP API endpoint used to verify whether the captcha response
+ // was successful
+ RecaptchaSiteVerifyAPI string `yaml:"recaptcha_siteverify_api"`
+
+ // TURN options
+ TURN TURN `yaml:"turn"`
+
+ // Rate-limiting options
+ RateLimiting RateLimiting `yaml:"rate_limiting"`
+}
+
+func (c *ClientAPI) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7771"
+ c.InternalAPI.Connect = "http://localhost:7771"
+ c.ExternalAPI.Listen = "http://[::]:8071"
+ c.RegistrationSharedSecret = ""
+ c.RecaptchaPublicKey = ""
+ c.RecaptchaPrivateKey = ""
+ c.RecaptchaEnabled = false
+ c.RecaptchaBypassSecret = ""
+ c.RecaptchaSiteVerifyAPI = ""
+ c.RegistrationDisabled = false
+ c.RateLimiting.Defaults()
+}
+
+func (c *ClientAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "client_api.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "client_api.internal_api.connect", string(c.InternalAPI.Connect))
+ if !isMonolith {
+ checkURL(configErrs, "client_api.external_api.listen", string(c.ExternalAPI.Listen))
+ }
+ if c.RecaptchaEnabled {
+ checkNotEmpty(configErrs, "client_api.recaptcha_public_key", string(c.RecaptchaPublicKey))
+ checkNotEmpty(configErrs, "client_api.recaptcha_private_key", string(c.RecaptchaPrivateKey))
+ checkNotEmpty(configErrs, "client_api.recaptcha_siteverify_api", string(c.RecaptchaSiteVerifyAPI))
+ }
+ c.TURN.Verify(configErrs)
+ c.RateLimiting.Verify(configErrs)
+}
+
+type TURN struct {
+ // TODO Guest Support
+ // Whether or not guests can request TURN credentials
+ // AllowGuests bool `yaml:"turn_allow_guests"`
+ // How long the authorization should last
+ UserLifetime string `yaml:"turn_user_lifetime"`
+ // The list of TURN URIs to pass to clients
+ URIs []string `yaml:"turn_uris"`
+
+ // Authorization via Shared Secret
+ // The shared secret from coturn
+ SharedSecret string `yaml:"turn_shared_secret"`
+
+ // Authorization via Static Username & Password
+ // Hardcoded Username and Password
+ Username string `yaml:"turn_username"`
+ Password string `yaml:"turn_password"`
+}
+
+func (c *TURN) Verify(configErrs *ConfigErrors) {
+ value := c.UserLifetime
+ if value != "" {
+ if _, err := time.ParseDuration(value); err != nil {
+ configErrs.Add(fmt.Sprintf("invalid duration for config key %q: %s", "client_api.turn.turn_user_lifetime", value))
+ }
+ }
+}
+
+type RateLimiting struct {
+ // Is rate limiting enabled or disabled?
+ Enabled bool `yaml:"enabled"`
+
+ // How many "slots" a user can occupy sending requests to a rate-limited
+ // endpoint before we apply rate-limiting
+ Threshold int64 `yaml:"threshold"`
+
+ // The cooloff period in milliseconds after a request before the "slot"
+ // is freed again
+ CooloffMS int64 `yaml:"cooloff_ms"`
+}
+
+func (r *RateLimiting) Verify(configErrs *ConfigErrors) {
+ if r.Enabled {
+ checkPositive(configErrs, "client_api.rate_limiting.threshold", r.Threshold)
+ checkPositive(configErrs, "client_api.rate_limiting.cooloff_ms", r.CooloffMS)
+ }
+}
+
+func (r *RateLimiting) Defaults() {
+ r.Enabled = true
+ r.Threshold = 5
+ r.CooloffMS = 500
+}
diff --git a/setup/config/config_eduserver.go b/setup/config/config_eduserver.go
new file mode 100644
index 00000000..a2ff3697
--- /dev/null
+++ b/setup/config/config_eduserver.go
@@ -0,0 +1,17 @@
+package config
+
+type EDUServer struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+}
+
+func (c *EDUServer) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7778"
+ c.InternalAPI.Connect = "http://localhost:7778"
+}
+
+func (c *EDUServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "edu_server.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "edu_server.internal_api.connect", string(c.InternalAPI.Connect))
+}
diff --git a/setup/config/config_federationapi.go b/setup/config/config_federationapi.go
new file mode 100644
index 00000000..64803d95
--- /dev/null
+++ b/setup/config/config_federationapi.go
@@ -0,0 +1,31 @@
+package config
+
+type FederationAPI struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+ ExternalAPI ExternalAPIOptions `yaml:"external_api"`
+
+ // List of paths to X509 certificates used by the external federation listeners.
+ // These are used to calculate the TLS fingerprints to publish for this server.
+ // Other matrix servers talking to this server will expect the x509 certificate
+ // to match one of these certificates.
+ // The certificates should be in PEM format.
+ FederationCertificatePaths []Path `yaml:"federation_certificates"`
+}
+
+func (c *FederationAPI) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7772"
+ c.InternalAPI.Connect = "http://localhost:7772"
+ c.ExternalAPI.Listen = "http://[::]:8072"
+}
+
+func (c *FederationAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "federation_api.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "federation_api.internal_api.connect", string(c.InternalAPI.Connect))
+ if !isMonolith {
+ checkURL(configErrs, "federation_api.external_api.listen", string(c.ExternalAPI.Listen))
+ }
+ // TODO: not applicable always, e.g. in demos
+ //checkNotZero(configErrs, "federation_api.federation_certificates", int64(len(c.FederationCertificatePaths)))
+}
diff --git a/setup/config/config_federationsender.go b/setup/config/config_federationsender.go
new file mode 100644
index 00000000..84f5b6f4
--- /dev/null
+++ b/setup/config/config_federationsender.go
@@ -0,0 +1,63 @@
+package config
+
+type FederationSender struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+
+ // The FederationSender database stores information used by the FederationSender
+ // It is only accessed by the FederationSender.
+ Database DatabaseOptions `yaml:"database"`
+
+ // Federation failure threshold. How many consecutive failures that we should
+ // tolerate when sending federation requests to a specific server. The backoff
+ // is 2**x seconds, so 1 = 2 seconds, 2 = 4 seconds, 3 = 8 seconds, etc.
+ // The default value is 16 if not specified, which is circa 18 hours.
+ FederationMaxRetries uint32 `yaml:"send_max_retries"`
+
+ // FederationDisableTLSValidation disables the validation of X.509 TLS certs
+ // on remote federation endpoints. This is not recommended in production!
+ DisableTLSValidation bool `yaml:"disable_tls_validation"`
+
+ Proxy Proxy `yaml:"proxy_outbound"`
+}
+
+func (c *FederationSender) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7775"
+ c.InternalAPI.Connect = "http://localhost:7775"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:federationsender.db"
+
+ c.FederationMaxRetries = 16
+ c.DisableTLSValidation = false
+
+ c.Proxy.Defaults()
+}
+
+func (c *FederationSender) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "federation_sender.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "federation_sender.internal_api.connect", string(c.InternalAPI.Connect))
+ checkNotEmpty(configErrs, "federation_sender.database.connection_string", string(c.Database.ConnectionString))
+}
+
+// The config for setting a proxy to use for server->server requests
+type Proxy struct {
+ // Is the proxy enabled?
+ Enabled bool `yaml:"enabled"`
+ // The protocol for the proxy (http / https / socks5)
+ Protocol string `yaml:"protocol"`
+ // The host where the proxy is listening
+ Host string `yaml:"host"`
+ // The port on which the proxy is listening
+ Port uint16 `yaml:"port"`
+}
+
+func (c *Proxy) Defaults() {
+ c.Enabled = false
+ c.Protocol = "http"
+ c.Host = "localhost"
+ c.Port = 8080
+}
+
+func (c *Proxy) Verify(configErrs *ConfigErrors) {
+}
diff --git a/setup/config/config_global.go b/setup/config/config_global.go
new file mode 100644
index 00000000..95652217
--- /dev/null
+++ b/setup/config/config_global.go
@@ -0,0 +1,142 @@
+package config
+
+import (
+ "math/rand"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ "golang.org/x/crypto/ed25519"
+)
+
+type Global struct {
+ // The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
+ ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
+
+ // Path to the private key which will be used to sign requests and events.
+ PrivateKeyPath Path `yaml:"private_key"`
+
+ // The private key which will be used to sign requests and events.
+ PrivateKey ed25519.PrivateKey `yaml:"-"`
+
+ // An arbitrary string used to uniquely identify the PrivateKey. Must start with the
+ // prefix "ed25519:".
+ KeyID gomatrixserverlib.KeyID `yaml:"-"`
+
+ // Information about old private keys that used to be used to sign requests and
+ // events on this domain. They will not be used but will be advertised to other
+ // servers that ask for them to help verify old events.
+ OldVerifyKeys []OldVerifyKeys `yaml:"old_private_keys"`
+
+ // How long a remote server can cache our server key for before requesting it again.
+ // Increasing this number will reduce the number of requests made by remote servers
+ // for our key, but increases the period a compromised key will be considered valid
+ // by remote servers.
+ // Defaults to 24 hours.
+ KeyValidityPeriod time.Duration `yaml:"key_validity_period"`
+
+ // Disables federation. Dendrite will not be able to make any outbound HTTP requests
+ // to other servers and the federation API will not be exposed.
+ DisableFederation bool `yaml:"disable_federation"`
+
+ // List of domains that the server will trust as identity servers to
+ // verify third-party identifiers.
+ // Defaults to an empty array.
+ TrustedIDServers []string `yaml:"trusted_third_party_id_servers"`
+
+ // Kafka/Naffka configuration
+ Kafka Kafka `yaml:"kafka"`
+
+ // Metrics configuration
+ Metrics Metrics `yaml:"metrics"`
+}
+
+func (c *Global) Defaults() {
+ c.ServerName = "localhost"
+ c.PrivateKeyPath = "matrix_key.pem"
+ _, c.PrivateKey, _ = ed25519.GenerateKey(rand.New(rand.NewSource(0)))
+ c.KeyID = "ed25519:auto"
+ c.KeyValidityPeriod = time.Hour * 24 * 7
+
+ c.Kafka.Defaults()
+ c.Metrics.Defaults()
+}
+
+func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkNotEmpty(configErrs, "global.server_name", string(c.ServerName))
+ checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath))
+
+ c.Kafka.Verify(configErrs, isMonolith)
+ c.Metrics.Verify(configErrs, isMonolith)
+}
+
+type OldVerifyKeys struct {
+ // Path to the private key.
+ PrivateKeyPath Path `yaml:"private_key"`
+
+ // The private key itself.
+ PrivateKey ed25519.PrivateKey `yaml:"-"`
+
+ // The key ID of the private key.
+ KeyID gomatrixserverlib.KeyID `yaml:"-"`
+
+ // When the private key was designed as "expired", as a UNIX timestamp
+ // in millisecond precision.
+ ExpiredAt gomatrixserverlib.Timestamp `yaml:"expired_at"`
+}
+
+// The configuration to use for Prometheus metrics
+type Metrics struct {
+ // Whether or not the metrics are enabled
+ Enabled bool `yaml:"enabled"`
+ // Use BasicAuth for Authorization
+ BasicAuth struct {
+ // Authorization via Static Username & Password
+ // Hardcoded Username and Password
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ } `yaml:"basic_auth"`
+}
+
+func (c *Metrics) Defaults() {
+ c.Enabled = false
+ c.BasicAuth.Username = "metrics"
+ c.BasicAuth.Password = "metrics"
+}
+
+func (c *Metrics) Verify(configErrs *ConfigErrors, isMonolith bool) {
+}
+
+type DatabaseOptions struct {
+ // The connection string, file:filename.db or postgres://server....
+ ConnectionString DataSource `yaml:"connection_string"`
+ // Maximum open connections to the DB (0 = use default, negative means unlimited)
+ MaxOpenConnections int `yaml:"max_open_conns"`
+ // Maximum idle connections to the DB (0 = use default, negative means unlimited)
+ MaxIdleConnections int `yaml:"max_idle_conns"`
+ // maximum amount of time (in seconds) a connection may be reused (<= 0 means unlimited)
+ ConnMaxLifetimeSeconds int `yaml:"conn_max_lifetime"`
+}
+
+func (c *DatabaseOptions) Defaults() {
+ c.MaxOpenConnections = 100
+ c.MaxIdleConnections = 2
+ c.ConnMaxLifetimeSeconds = -1
+}
+
+func (c *DatabaseOptions) Verify(configErrs *ConfigErrors, isMonolith bool) {
+}
+
+// MaxIdleConns returns maximum idle connections to the DB
+func (c DatabaseOptions) MaxIdleConns() int {
+ return c.MaxIdleConnections
+}
+
+// MaxOpenConns returns maximum open connections to the DB
+func (c DatabaseOptions) MaxOpenConns() int {
+ return c.MaxOpenConnections
+}
+
+// ConnMaxLifetime returns maximum amount of time a connection may be reused
+func (c DatabaseOptions) ConnMaxLifetime() time.Duration {
+ return time.Duration(c.ConnMaxLifetimeSeconds) * time.Second
+}
diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go
new file mode 100644
index 00000000..aa91e558
--- /dev/null
+++ b/setup/config/config_kafka.go
@@ -0,0 +1,61 @@
+package config
+
+import "fmt"
+
+// Defined Kafka topics.
+const (
+ TopicOutputTypingEvent = "OutputTypingEvent"
+ TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent"
+ TopicOutputKeyChangeEvent = "OutputKeyChangeEvent"
+ TopicOutputRoomEvent = "OutputRoomEvent"
+ TopicOutputClientData = "OutputClientData"
+ TopicOutputReceiptEvent = "OutputReceiptEvent"
+)
+
+type Kafka struct {
+ // A list of kafka addresses to connect to.
+ Addresses []string `yaml:"addresses"`
+ // The prefix to use for Kafka topic names for this homeserver - really only
+ // useful if running more than one Dendrite on the same Kafka deployment.
+ TopicPrefix string `yaml:"topic_prefix"`
+ // Whether to use naffka instead of kafka.
+ // Naffka can only be used when running dendrite as a single monolithic server.
+ // Kafka can be used both with a monolithic server and when running the
+ // components as separate servers.
+ UseNaffka bool `yaml:"use_naffka"`
+ // The Naffka database is used internally by the naffka library, if used.
+ Database DatabaseOptions `yaml:"naffka_database"`
+ // The max size a Kafka message passed between consumer/producer can have
+ // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
+ MaxMessageBytes *int `yaml:"max_message_bytes"`
+}
+
+func (k *Kafka) TopicFor(name string) string {
+ return fmt.Sprintf("%s%s", k.TopicPrefix, name)
+}
+
+func (c *Kafka) Defaults() {
+ c.UseNaffka = true
+ c.Database.Defaults()
+ c.Addresses = []string{"localhost:2181"}
+ c.Database.ConnectionString = DataSource("file:naffka.db")
+ c.TopicPrefix = "Dendrite"
+
+ maxBytes := 1024 * 1024 * 8 // about 8MB
+ c.MaxMessageBytes = &maxBytes
+}
+
+func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ if c.UseNaffka {
+ if !isMonolith {
+ configErrs.Add("naffka can only be used in a monolithic server")
+ }
+ checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString))
+ } else {
+ // If we aren't using naffka then we need to have at least one kafka
+ // server to talk to.
+ checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
+ }
+ checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
+ checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
+}
diff --git a/setup/config/config_keyserver.go b/setup/config/config_keyserver.go
new file mode 100644
index 00000000..89162300
--- /dev/null
+++ b/setup/config/config_keyserver.go
@@ -0,0 +1,22 @@
+package config
+
+type KeyServer struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+
+ Database DatabaseOptions `yaml:"database"`
+}
+
+func (c *KeyServer) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7779"
+ c.InternalAPI.Connect = "http://localhost:7779"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:keyserver.db"
+}
+
+func (c *KeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "key_server.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "key_server.internal_api.bind", string(c.InternalAPI.Connect))
+ checkNotEmpty(configErrs, "key_server.database.connection_string", string(c.Database.ConnectionString))
+}
diff --git a/setup/config/config_mediaapi.go b/setup/config/config_mediaapi.go
new file mode 100644
index 00000000..a9425b7b
--- /dev/null
+++ b/setup/config/config_mediaapi.go
@@ -0,0 +1,67 @@
+package config
+
+import (
+ "fmt"
+)
+
+type MediaAPI struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+ ExternalAPI ExternalAPIOptions `yaml:"external_api"`
+
+ // The MediaAPI database stores information about files uploaded and downloaded
+ // by local users. It is only accessed by the MediaAPI.
+ Database DatabaseOptions `yaml:"database"`
+
+ // The base path to where the media files will be stored. May be relative or absolute.
+ BasePath Path `yaml:"base_path"`
+
+ // The absolute base path to where media files will be stored.
+ AbsBasePath Path `yaml:"-"`
+
+ // The maximum file size in bytes that is allowed to be stored on this server.
+ // Note: if max_file_size_bytes is set to 0, the size is unlimited.
+ // Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB)
+ MaxFileSizeBytes *FileSizeBytes `yaml:"max_file_size_bytes,omitempty"`
+
+ // Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated
+ DynamicThumbnails bool `yaml:"dynamic_thumbnails"`
+
+ // The maximum number of simultaneous thumbnail generators. default: 10
+ MaxThumbnailGenerators int `yaml:"max_thumbnail_generators"`
+
+ // A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content
+ ThumbnailSizes []ThumbnailSize `yaml:"thumbnail_sizes"`
+}
+
+func (c *MediaAPI) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7774"
+ c.InternalAPI.Connect = "http://localhost:7774"
+ c.ExternalAPI.Listen = "http://[::]:8074"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:mediaapi.db"
+
+ defaultMaxFileSizeBytes := FileSizeBytes(10485760)
+ c.MaxFileSizeBytes = &defaultMaxFileSizeBytes
+ c.MaxThumbnailGenerators = 10
+ c.BasePath = "./media_store"
+}
+
+func (c *MediaAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "media_api.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "media_api.internal_api.connect", string(c.InternalAPI.Connect))
+ if !isMonolith {
+ checkURL(configErrs, "media_api.external_api.listen", string(c.ExternalAPI.Listen))
+ }
+ checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString))
+
+ checkNotEmpty(configErrs, "media_api.base_path", string(c.BasePath))
+ checkPositive(configErrs, "media_api.max_file_size_bytes", int64(*c.MaxFileSizeBytes))
+ checkPositive(configErrs, "media_api.max_thumbnail_generators", int64(c.MaxThumbnailGenerators))
+
+ for i, size := range c.ThumbnailSizes {
+ checkPositive(configErrs, fmt.Sprintf("media_api.thumbnail_sizes[%d].width", i), int64(size.Width))
+ checkPositive(configErrs, fmt.Sprintf("media_api.thumbnail_sizes[%d].height", i), int64(size.Height))
+ }
+}
diff --git a/setup/config/config_mscs.go b/setup/config/config_mscs.go
new file mode 100644
index 00000000..776d0b64
--- /dev/null
+++ b/setup/config/config_mscs.go
@@ -0,0 +1,19 @@
+package config
+
+type MSCs struct {
+ Matrix *Global `yaml:"-"`
+
+ // The MSCs to enable, currently only `msc2836` is supported.
+ MSCs []string `yaml:"mscs"`
+
+ Database DatabaseOptions `yaml:"database"`
+}
+
+func (c *MSCs) Defaults() {
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:mscs.db"
+}
+
+func (c *MSCs) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkNotEmpty(configErrs, "mscs.database.connection_string", string(c.Database.ConnectionString))
+}
diff --git a/setup/config/config_roomserver.go b/setup/config/config_roomserver.go
new file mode 100644
index 00000000..2a1fc38b
--- /dev/null
+++ b/setup/config/config_roomserver.go
@@ -0,0 +1,22 @@
+package config
+
+type RoomServer struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+
+ Database DatabaseOptions `yaml:"database"`
+}
+
+func (c *RoomServer) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7770"
+ c.InternalAPI.Connect = "http://localhost:7770"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:roomserver.db"
+}
+
+func (c *RoomServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "room_server.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "room_server.internal_ap.bind", string(c.InternalAPI.Connect))
+ checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString))
+}
diff --git a/setup/config/config_signingkeyserver.go b/setup/config/config_signingkeyserver.go
new file mode 100644
index 00000000..51aca38b
--- /dev/null
+++ b/setup/config/config_signingkeyserver.go
@@ -0,0 +1,52 @@
+package config
+
+import "github.com/matrix-org/gomatrixserverlib"
+
+type SigningKeyServer struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+
+ // The SigningKeyServer database caches the public keys of remote servers.
+ // It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI.
+ Database DatabaseOptions `yaml:"database"`
+
+ // Perspective keyservers, to use as a backup when direct key fetch
+ // requests don't succeed
+ KeyPerspectives KeyPerspectives `yaml:"key_perspectives"`
+
+ // Should we prefer direct key fetches over perspective ones?
+ PreferDirectFetch bool `yaml:"prefer_direct_fetch"`
+}
+
+func (c *SigningKeyServer) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7780"
+ c.InternalAPI.Connect = "http://localhost:7780"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:signingkeyserver.db"
+}
+
+func (c *SigningKeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "signing_key_server.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "signing_key_server.internal_api.bind", string(c.InternalAPI.Connect))
+ checkNotEmpty(configErrs, "signing_key_server.database.connection_string", string(c.Database.ConnectionString))
+}
+
+// KeyPerspectives are used to configure perspective key servers for
+// retrieving server keys.
+type KeyPerspectives []KeyPerspective
+
+type KeyPerspective struct {
+ // The server name of the perspective key server
+ ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
+ // Server keys for the perspective user, used to verify the
+ // keys have been signed by the perspective server
+ Keys []KeyPerspectiveTrustKey `yaml:"keys"`
+}
+
+type KeyPerspectiveTrustKey struct {
+ // The key ID, e.g. ed25519:auto
+ KeyID gomatrixserverlib.KeyID `yaml:"key_id"`
+ // The public key in base64 unpadded format
+ PublicKey string `yaml:"public_key"`
+}
diff --git a/setup/config/config_syncapi.go b/setup/config/config_syncapi.go
new file mode 100644
index 00000000..fc08f738
--- /dev/null
+++ b/setup/config/config_syncapi.go
@@ -0,0 +1,29 @@
+package config
+
+type SyncAPI struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+ ExternalAPI ExternalAPIOptions `yaml:"external_api"`
+
+ Database DatabaseOptions `yaml:"database"`
+
+ RealIPHeader string `yaml:"real_ip_header"`
+}
+
+func (c *SyncAPI) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7773"
+ c.InternalAPI.Connect = "http://localhost:7773"
+ c.ExternalAPI.Listen = "http://localhost:8073"
+ c.Database.Defaults()
+ c.Database.ConnectionString = "file:syncapi.db"
+}
+
+func (c *SyncAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "sync_api.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "sync_api.internal_api.bind", string(c.InternalAPI.Connect))
+ if !isMonolith {
+ checkURL(configErrs, "sync_api.external_api.listen", string(c.ExternalAPI.Listen))
+ }
+ checkNotEmpty(configErrs, "sync_api.database", string(c.Database.ConnectionString))
+}
diff --git a/setup/config/config_test.go b/setup/config/config_test.go
new file mode 100644
index 00000000..4107b684
--- /dev/null
+++ b/setup/config/config_test.go
@@ -0,0 +1,285 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLoadConfigRelative(t *testing.T) {
+ _, err := loadConfig("/my/config/dir", []byte(testConfig),
+ mockReadFile{
+ "/my/config/dir/matrix_key.pem": testKey,
+ "/my/config/dir/tls_cert.pem": testCert,
+ }.readFile,
+ false,
+ )
+ if err != nil {
+ t.Error("failed to load config:", err)
+ }
+}
+
+const testConfig = `
+version: 1
+global:
+ server_name: localhost
+ private_key: matrix_key.pem
+ key_id: ed25519:auto
+ key_validity_period: 168h0m0s
+ trusted_third_party_id_servers:
+ - matrix.org
+ - vector.im
+ kafka:
+ addresses:
+ - localhost:2181
+ topic_prefix: Dendrite
+ use_naffka: true
+ naffka_database:
+ connection_string: file:naffka.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+ metrics:
+ enabled: false
+ basic_auth:
+ username: metrics
+ password: metrics
+app_service_api:
+ internal_api:
+ listen: http://localhost:7777
+ connect: http://localhost:7777
+ database:
+ connection_string: file:appservice.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+ config_files: []
+client_api:
+ internal_api:
+ listen: http://localhost:7771
+ connect: http://localhost:7771
+ external_api:
+ listen: http://[::]:8071
+ registration_disabled: false
+ registration_shared_secret: ""
+ enable_registration_captcha: false
+ recaptcha_public_key: ""
+ recaptcha_private_key: ""
+ recaptcha_bypass_secret: ""
+ recaptcha_siteverify_api: ""
+ turn:
+ turn_user_lifetime: ""
+ turn_uris: []
+ turn_shared_secret: ""
+ turn_username: ""
+ turn_password: ""
+current_state_server:
+ internal_api:
+ listen: http://localhost:7782
+ connect: http://localhost:7782
+ database:
+ connection_string: file:currentstate.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+edu_server:
+ internal_api:
+ listen: http://localhost:7778
+ connect: http://localhost:7778
+federation_api:
+ internal_api:
+ listen: http://localhost:7772
+ connect: http://localhost:7772
+ external_api:
+ listen: http://[::]:8072
+ federation_certificates: []
+federation_sender:
+ internal_api:
+ listen: http://localhost:7775
+ connect: http://localhost:7775
+ database:
+ connection_string: file:federationsender.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+ send_max_retries: 16
+ disable_tls_validation: false
+ proxy_outbound:
+ enabled: false
+ protocol: http
+ host: localhost
+ port: 8080
+key_server:
+ internal_api:
+ listen: http://localhost:7779
+ connect: http://localhost:7779
+ database:
+ connection_string: file:keyserver.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+media_api:
+ internal_api:
+ listen: http://localhost:7774
+ connect: http://localhost:7774
+ external_api:
+ listen: http://[::]:8074
+ database:
+ connection_string: file:mediaapi.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+ base_path: ./media_store
+ max_file_size_bytes: 10485760
+ dynamic_thumbnails: false
+ max_thumbnail_generators: 10
+ thumbnail_sizes:
+ - width: 32
+ height: 32
+ method: crop
+ - width: 96
+ height: 96
+ method: crop
+ - width: 640
+ height: 480
+ method: scale
+room_server:
+ internal_api:
+ listen: http://localhost:7770
+ connect: http://localhost:7770
+ database:
+ connection_string: file:roomserver.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+server_key_api:
+ internal_api:
+ listen: http://localhost:7780
+ connect: http://localhost:7780
+ database:
+ connection_string: file:serverkeyapi.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+ key_perspectives:
+ - server_name: matrix.org
+ keys:
+ - key_id: ed25519:auto
+ public_key: Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw
+ - key_id: ed25519:a_RXGa
+ public_key: l8Hft5qXKn1vfHrg3p4+W8gELQVo8N13JkluMfmn2sQ
+sync_api:
+ internal_api:
+ listen: http://localhost:7773
+ connect: http://localhost:7773
+ database:
+ connection_string: file:syncapi.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+user_api:
+ internal_api:
+ listen: http://localhost:7781
+ connect: http://localhost:7781
+ account_database:
+ connection_string: file:userapi_accounts.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+ device_database:
+ connection_string: file:userapi_devices.db
+ max_open_conns: 100
+ max_idle_conns: 2
+ conn_max_lifetime: -1
+tracing:
+ enabled: false
+ jaeger:
+ serviceName: ""
+ disabled: false
+ rpc_metrics: false
+ tags: []
+ sampler: null
+ reporter: null
+ headers: null
+ baggage_restrictions: null
+ throttler: null
+logging:
+- type: file
+ level: info
+ params:
+ path: /var/log/dendrite
+`
+
+type mockReadFile map[string]string
+
+func (m mockReadFile) readFile(path string) ([]byte, error) {
+ data, ok := m[path]
+ if !ok {
+ return nil, fmt.Errorf("no such file %q", path)
+ }
+ return []byte(data), nil
+}
+
+func TestReadKey(t *testing.T) {
+ keyID, _, err := readKeyPEM("path/to/key", []byte(testKey), true)
+ if err != nil {
+ t.Error("failed to load private key:", err)
+ }
+ wantKeyID := testKeyID
+ if wantKeyID != string(keyID) {
+ t.Errorf("wanted key ID to be %q, got %q", wantKeyID, keyID)
+ }
+}
+
+const testKeyID = "ed25519:c8NsuQ"
+
+const testKey = `
+-----BEGIN MATRIX PRIVATE KEY-----
+Key-ID: ` + testKeyID + `
+7KRZiZ2sTyRR8uqqUjRwczuwRXXkUMYIUHq4Mc3t4bE=
+-----END MATRIX PRIVATE KEY-----
+`
+
+const testCert = `
+-----BEGIN CERTIFICATE-----
+MIIE0zCCArugAwIBAgIJAPype3u24LJeMA0GCSqGSIb3DQEBCwUAMAAwHhcNMTcw
+NjEzMTQyODU4WhcNMTgwNjEzMTQyODU4WjAAMIICIjANBgkqhkiG9w0BAQEFAAOC
+Ag8AMIICCgKCAgEA3vNSr7lCh/alxPFqairp/PYohwdsqPvOD7zf7dJCNhy0gbdC
+9/APwIbPAPL9nU+o9ud1ACNCKBCQin/9LnI5vd5pa/Ne+mmRADDLB/BBBoywSJWG
+NSfKJ9n3XY1bjgtqi53uUh+RDdQ7sXudDqCUxiiJZmS7oqK/mp88XXAgCbuXUY29
+GmzbbDz37vntuSxDgUOnJ8uPSvRp5YPKogA3JwW1SyrlLt4Z30CQ6nH3Y2Q5SVfJ
+NIQyMrnwyjA9bCdXezv1cLXoTYn7U9BRyzXTZeXs3y3ldnRfISXN35CU04Az1F8j
+lfj7nXMEqI/qAj/qhxZ8nVBB+rpNOZy9RJko3O+G5Qa/EvzkQYV1rW4TM2Yme88A
+QyJspoV/0bXk6gG987PonK2Uk5djxSULhnGVIqswydyH0Nzb+slRp2bSoWbaNlee
++6TIeiyTQYc055pCHOp22gtLrC5LQGchksi02St2ZzRHdnlfqCJ8S9sS7x3trzds
+cYueg1sGI+O8szpQ3eUM7OhJOBrx6OlR7+QYnQg1wr/V+JAz1qcyTC1URcwfeqtg
+QjxFdBD9LfCtfK+AO51H9ugtsPJqOh33PmvfvUBEM05OHCA0lNaWJHROGpm4T4cc
+YQI9JQk/0lB7itF1qK5RG74qgKdjkBkfZxi0OqkUgHk6YHtJlKfET8zfrtcCAwEA
+AaNQME4wHQYDVR0OBBYEFGwb0NgH0Zr7Ga23njEJ85Ozf8M9MB8GA1UdIwQYMBaA
+FGwb0NgH0Zr7Ga23njEJ85Ozf8M9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEL
+BQADggIBAKU3RHXggbq/pLhGinU5q/9QT0TB/0bBnF1wNFkKQC0FrNJ+ZnBNmusy
+oqOn7DEohBCCDxT0kgOC05gLEsGLkSXlVyqCsPFfycCFhtu1QzSRtQNRxB3pW3Wq
+4/RFVYv0PGBjVBKxImQlEmXJWEDwemGKqDQZPtqR/FTHTbJcaT0xQr5+1oG6lawt
+I/2cW6GQ0kYW/Szps8FgNdSNgVqCjjNIzBYbWhRWMx/63qD1ReUbY7/Yw9KKT8nK
+zXERpbTM9k+Pnm0g9Gep+9HJ1dBFJeuTPugKeSeyqg2OJbENw1hxGs/HjBXw7580
+ioiMn/kMj6Tg/f3HCfKrdHHBFQw0/fJW6o17QImYIpPOPzc5RjXBrCJWb34kxqEd
+NQdKgejWiV/LlVsguIF8hVZH2kRzvoyypkVUtSUYGmjvA5UXoORQZfJ+b41llq1B
+GcSF6iaVbAFKnsUyyr1i9uHz/6Muqflphv/SfZxGheIn5u3PnhXrzDagvItjw0NS
+n0Xq64k7fc42HXJpF8CGBkSaIhtlzcruO+vqR80B9r62+D0V7VmHOnP135MT6noU
+8F0JQfEtP+I8NII5jHSF/khzSgP5g80LS9tEc2ILnIHK1StkInAoRQQ+/HsQsgbz
+ANAf5kxmMsM0zlN2hkxl0H6o7wKlBSw3RI3cjfilXiMWRPJrzlc4
+-----END CERTIFICATE-----
+`
diff --git a/setup/config/config_userapi.go b/setup/config/config_userapi.go
new file mode 100644
index 00000000..2cbd8a45
--- /dev/null
+++ b/setup/config/config_userapi.go
@@ -0,0 +1,30 @@
+package config
+
+type UserAPI struct {
+ Matrix *Global `yaml:"-"`
+
+ InternalAPI InternalAPIOptions `yaml:"internal_api"`
+
+ // The Account database stores the login details and account information
+ // for local users. It is accessed by the UserAPI.
+ AccountDatabase DatabaseOptions `yaml:"account_database"`
+ // The Device database stores session information for the devices of logged
+ // in local users. It is accessed by the UserAPI.
+ DeviceDatabase DatabaseOptions `yaml:"device_database"`
+}
+
+func (c *UserAPI) Defaults() {
+ c.InternalAPI.Listen = "http://localhost:7781"
+ c.InternalAPI.Connect = "http://localhost:7781"
+ c.AccountDatabase.Defaults()
+ c.DeviceDatabase.Defaults()
+ c.AccountDatabase.ConnectionString = "file:userapi_accounts.db"
+ c.DeviceDatabase.ConnectionString = "file:userapi_devices.db"
+}
+
+func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ checkURL(configErrs, "user_api.internal_api.listen", string(c.InternalAPI.Listen))
+ checkURL(configErrs, "user_api.internal_api.connect", string(c.InternalAPI.Connect))
+ checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString))
+ checkNotEmpty(configErrs, "user_api.device_database.connection_string", string(c.DeviceDatabase.ConnectionString))
+}
diff --git a/setup/federation.go b/setup/federation.go
new file mode 100644
index 00000000..7e9a22b3
--- /dev/null
+++ b/setup/federation.go
@@ -0,0 +1,32 @@
+package setup
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/http"
+)
+
+// noOpHTTPTransport is used to disable federation.
+var noOpHTTPTransport = &http.Transport{
+ Dial: func(_, _ string) (net.Conn, error) {
+ return nil, fmt.Errorf("federation prohibited by configuration")
+ },
+ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
+ return nil, fmt.Errorf("federation prohibited by configuration")
+ },
+ DialTLS: func(_, _ string) (net.Conn, error) {
+ return nil, fmt.Errorf("federation prohibited by configuration")
+ },
+}
+
+func init() {
+ noOpHTTPTransport.RegisterProtocol("matrix", &noOpHTTPRoundTripper{})
+}
+
+type noOpHTTPRoundTripper struct {
+}
+
+func (y *noOpHTTPRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+ return nil, fmt.Errorf("federation prohibited by configuration")
+}
diff --git a/setup/flags.go b/setup/flags.go
new file mode 100644
index 00000000..281cf339
--- /dev/null
+++ b/setup/flags.go
@@ -0,0 +1,52 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package setup
+
+import (
+ "flag"
+ "fmt"
+ "os"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/sirupsen/logrus"
+)
+
+var (
+ configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
+ version = flag.Bool("version", false, "Shows the current version and exits immediately.")
+)
+
+// ParseFlags parses the commandline flags and uses them to create a config.
+func ParseFlags(monolith bool) *config.Dendrite {
+ flag.Parse()
+
+ if *version {
+ fmt.Println(internal.VersionString())
+ os.Exit(0)
+ }
+
+ if *configPath == "" {
+ logrus.Fatal("--config must be supplied")
+ }
+
+ cfg, err := config.Load(*configPath, monolith)
+
+ if err != nil {
+ logrus.Fatalf("Invalid config file: %s", err)
+ }
+
+ return cfg
+}
diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go
new file mode 100644
index 00000000..a2902c96
--- /dev/null
+++ b/setup/kafka/kafka.go
@@ -0,0 +1,58 @@
+package kafka
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/naffka"
+ naffkaStorage "github.com/matrix-org/naffka/storage"
+ "github.com/sirupsen/logrus"
+)
+
+func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if cfg.UseNaffka {
+ return setupNaffka(cfg)
+ }
+ return setupKafka(cfg)
+}
+
+// setupKafka creates kafka consumer/producer pair from the config.
+func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ sCfg := sarama.NewConfig()
+ sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
+ sCfg.Producer.Return.Successes = true
+ sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
+
+ consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to start kafka consumer")
+ }
+
+ producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to setup kafka producers")
+ }
+
+ return consumer, producer
+}
+
+// In monolith mode with Naffka, we don't have the same constraints about
+// consuming the same topic from more than one place like we do with Kafka.
+// Therefore, we will only open one Naffka connection in case Naffka is
+// running on SQLite.
+var naffkaInstance *naffka.Naffka
+
+// setupNaffka creates kafka consumer/producer pair from the config.
+func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if naffkaInstance != nil {
+ return naffkaInstance, naffkaInstance
+ }
+ naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ naffkaInstance, err = naffka.New(naffkaDB)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka")
+ }
+ return naffkaInstance, naffkaInstance
+}
diff --git a/setup/monolith.go b/setup/monolith.go
new file mode 100644
index 00000000..2403f57f
--- /dev/null
+++ b/setup/monolith.go
@@ -0,0 +1,76 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package setup
+
+import (
+ "github.com/gorilla/mux"
+ appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
+ "github.com/matrix-org/dendrite/clientapi"
+ "github.com/matrix-org/dendrite/clientapi/api"
+ eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/federationapi"
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/transactions"
+ keyAPI "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/mediaapi"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ serverKeyAPI "github.com/matrix-org/dendrite/signingkeyserver/api"
+ "github.com/matrix-org/dendrite/syncapi"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/dendrite/userapi/storage/accounts"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Monolith represents an instantiation of all dependencies required to build
+// all components of Dendrite, for use in monolith mode.
+type Monolith struct {
+ Config *config.Dendrite
+ AccountDB accounts.Database
+ KeyRing *gomatrixserverlib.KeyRing
+ Client *gomatrixserverlib.Client
+ FedClient *gomatrixserverlib.FederationClient
+
+ AppserviceAPI appserviceAPI.AppServiceQueryAPI
+ EDUInternalAPI eduServerAPI.EDUServerInputAPI
+ FederationSenderAPI federationSenderAPI.FederationSenderInternalAPI
+ RoomserverAPI roomserverAPI.RoomserverInternalAPI
+ ServerKeyAPI serverKeyAPI.SigningKeyServerAPI
+ UserAPI userapi.UserInternalAPI
+ KeyAPI keyAPI.KeyInternalAPI
+
+ // Optional
+ ExtPublicRoomsProvider api.ExtraPublicRoomsProvider
+}
+
+// AddAllPublicRoutes attaches all public paths to the given router
+func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
+ clientapi.AddPublicRoutes(
+ csMux, &m.Config.ClientAPI, m.AccountDB,
+ m.FedClient, m.RoomserverAPI,
+ m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
+ m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
+ )
+ federationapi.AddPublicRoutes(
+ ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
+ m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
+ m.EDUInternalAPI, m.KeyAPI,
+ )
+ mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
+ syncapi.AddPublicRoutes(
+ csMux, m.UserAPI, m.RoomserverAPI,
+ m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
+ )
+}
diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go
new file mode 100644
index 00000000..33a65c8f
--- /dev/null
+++ b/setup/mscs/msc2836/msc2836.go
@@ -0,0 +1,530 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package msc2836 'Threading' implements https://github.com/matrix-org/matrix-doc/pull/2836
+package msc2836
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ fs "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/hooks"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ roomserver "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+const (
+ constRelType = "m.reference"
+ constRoomIDKey = "relationship_room_id"
+ constRoomServers = "relationship_servers"
+)
+
+type EventRelationshipRequest struct {
+ EventID string `json:"event_id"`
+ MaxDepth int `json:"max_depth"`
+ MaxBreadth int `json:"max_breadth"`
+ Limit int `json:"limit"`
+ DepthFirst bool `json:"depth_first"`
+ RecentFirst bool `json:"recent_first"`
+ IncludeParent bool `json:"include_parent"`
+ IncludeChildren bool `json:"include_children"`
+ Direction string `json:"direction"`
+ Batch string `json:"batch"`
+ AutoJoin bool `json:"auto_join"`
+}
+
+func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) {
+ var relation EventRelationshipRequest
+ relation.Defaults()
+ if err := json.NewDecoder(body).Decode(&relation); err != nil {
+ return nil, err
+ }
+ return &relation, nil
+}
+
+func (r *EventRelationshipRequest) Defaults() {
+ r.Limit = 100
+ r.MaxBreadth = 10
+ r.MaxDepth = 3
+ r.DepthFirst = false
+ r.RecentFirst = true
+ r.IncludeParent = false
+ r.IncludeChildren = false
+ r.Direction = "down"
+}
+
+type EventRelationshipResponse struct {
+ Events []gomatrixserverlib.ClientEvent `json:"events"`
+ NextBatch string `json:"next_batch"`
+ Limited bool `json:"limited"`
+}
+
+// Enable this MSC
+// nolint:gocyclo
+func Enable(
+ base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
+ userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
+) error {
+ db, err := NewDatabase(&base.Cfg.MSCs.Database)
+ if err != nil {
+ return fmt.Errorf("Cannot enable MSC2836: %w", err)
+ }
+ hooks.Enable()
+ hooks.Attach(hooks.KindNewEventPersisted, func(headeredEvent interface{}) {
+ he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
+ hookErr := db.StoreRelation(context.Background(), he)
+ if hookErr != nil {
+ util.GetLogger(context.Background()).WithError(hookErr).Error(
+ "failed to StoreRelation",
+ )
+ }
+ })
+ hooks.Attach(hooks.KindNewEventReceived, func(headeredEvent interface{}) {
+ he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
+ ctx := context.Background()
+ // we only inject metadata for events our server sends
+ userID := he.Sender()
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return
+ }
+ if domain != base.Cfg.Global.ServerName {
+ return
+ }
+ // if this event has an m.relationship, add on the room_id and servers to unsigned
+ parent, child, relType := parentChildEventIDs(he)
+ if parent == "" || child == "" || relType == "" {
+ return
+ }
+ event, joinedToRoom := getEventIfVisible(ctx, rsAPI, parent, userID)
+ if !joinedToRoom {
+ return
+ }
+ err = he.SetUnsignedField(constRoomIDKey, event.RoomID())
+ if err != nil {
+ util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField")
+ return
+ }
+
+ var servers []gomatrixserverlib.ServerName
+ if fsAPI != nil {
+ var res fs.QueryJoinedHostServerNamesInRoomResponse
+ err = fsAPI.QueryJoinedHostServerNamesInRoom(ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
+ RoomID: event.RoomID(),
+ }, &res)
+ if err != nil {
+ util.GetLogger(context.Background()).WithError(err).Warn("Failed to QueryJoinedHostServerNamesInRoom")
+ return
+ }
+ servers = res.ServerNames
+ } else {
+ servers = []gomatrixserverlib.ServerName{
+ base.Cfg.Global.ServerName,
+ }
+ }
+ err = he.SetUnsignedField(constRoomServers, servers)
+ if err != nil {
+ util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField")
+ return
+ }
+ })
+
+ base.PublicClientAPIMux.Handle("/unstable/event_relationships",
+ httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI)),
+ ).Methods(http.MethodPost, http.MethodOptions)
+
+ base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI(
+ "msc2836_event_relationships", func(req *http.Request) util.JSONResponse {
+ fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
+ req, time.Now(), base.Cfg.Global.ServerName, keyRing,
+ )
+ if fedReq == nil {
+ return errResp
+ }
+ return federatedEventRelationship(req.Context(), fedReq, db, rsAPI)
+ },
+ )).Methods(http.MethodPost, http.MethodOptions)
+ return nil
+}
+
+type reqCtx struct {
+ ctx context.Context
+ rsAPI roomserver.RoomserverInternalAPI
+ db Database
+ req *EventRelationshipRequest
+ userID string
+ isFederatedRequest bool
+}
+
+func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
+ return func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ relation, err := NewEventRelationshipRequest(req.Body)
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("failed to decode HTTP request as JSON")
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)),
+ }
+ }
+ rc := reqCtx{
+ ctx: req.Context(),
+ req: relation,
+ userID: device.UserID,
+ rsAPI: rsAPI,
+ isFederatedRequest: false,
+ db: db,
+ }
+ res, resErr := rc.process()
+ if resErr != nil {
+ return *resErr
+ }
+
+ return util.JSONResponse{
+ Code: 200,
+ JSON: res,
+ }
+ }
+}
+
+func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI) util.JSONResponse {
+ relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content()))
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON")
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)),
+ }
+ }
+ rc := reqCtx{
+ ctx: ctx,
+ req: relation,
+ userID: "",
+ rsAPI: rsAPI,
+ isFederatedRequest: true,
+ db: db,
+ }
+ res, resErr := rc.process()
+ if resErr != nil {
+ return *resErr
+ }
+
+ return util.JSONResponse{
+ Code: 200,
+ JSON: res,
+ }
+}
+
+func (rc *reqCtx) process() (*EventRelationshipResponse, *util.JSONResponse) {
+ var res EventRelationshipResponse
+ var returnEvents []*gomatrixserverlib.HeaderedEvent
+ // Can the user see (according to history visibility) event_id? If no, reject the request, else continue.
+ // We should have the event being referenced so don't give any claimed room ID / servers
+ event := rc.getEventIfVisible(rc.req.EventID, "", nil)
+ if event == nil {
+ return nil, &util.JSONResponse{
+ Code: 403,
+ JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"),
+ }
+ }
+
+ // Retrieve the event. Add it to response array.
+ returnEvents = append(returnEvents, event)
+
+ if rc.req.IncludeParent {
+ if parentEvent := rc.includeParent(event); parentEvent != nil {
+ returnEvents = append(returnEvents, parentEvent)
+ }
+ }
+
+ if rc.req.IncludeChildren {
+ remaining := rc.req.Limit - len(returnEvents)
+ if remaining > 0 {
+ children, resErr := rc.includeChildren(rc.db, event.EventID(), remaining, rc.req.RecentFirst)
+ if resErr != nil {
+ return nil, resErr
+ }
+ returnEvents = append(returnEvents, children...)
+ }
+ }
+
+ remaining := rc.req.Limit - len(returnEvents)
+ var walkLimited bool
+ if remaining > 0 {
+ included := make(map[string]bool, len(returnEvents))
+ for _, ev := range returnEvents {
+ included[ev.EventID()] = true
+ }
+ var events []*gomatrixserverlib.HeaderedEvent
+ events, walkLimited = walkThread(
+ rc.ctx, rc.db, rc, included, remaining,
+ )
+ returnEvents = append(returnEvents, events...)
+ }
+ res.Events = make([]gomatrixserverlib.ClientEvent, len(returnEvents))
+ for i, ev := range returnEvents {
+ res.Events[i] = gomatrixserverlib.HeaderedToClientEvent(ev, gomatrixserverlib.FormatAll)
+ }
+ res.Limited = remaining == 0 || walkLimited
+ return &res, nil
+}
+
+// If include_parent: true and there is a valid m.relationship field in the event,
+// retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array.
+func (rc *reqCtx) includeParent(event *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) {
+ parentID, _, _ := parentChildEventIDs(event)
+ if parentID == "" {
+ return nil
+ }
+ claimedRoomID, claimedServers := roomIDAndServers(event)
+ return rc.getEventIfVisible(parentID, claimedRoomID, claimedServers)
+}
+
+// If include_children: true, lookup all events which have event_id as an m.relationship
+// Apply history visibility checks to all these events and add the ones which pass into the response array,
+// honouring the recent_first flag and the limit.
+func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) {
+ children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent")
+ resErr := jsonerror.InternalServerError()
+ return nil, &resErr
+ }
+ var childEvents []*gomatrixserverlib.HeaderedEvent
+ for _, child := range children {
+ // in order for us to even know about the children the server must be joined to those rooms, hence pass no claimed room ID or servers.
+ childEvent := rc.getEventIfVisible(child.EventID, "", nil)
+ if childEvent != nil {
+ childEvents = append(childEvents, childEvent)
+ }
+ }
+ if len(childEvents) > limit {
+ return childEvents[:limit], nil
+ }
+ return childEvents, nil
+}
+
+// Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag,
+// honouring the limit, max_depth and max_breadth values according to the following rules
+// nolint: unparam
+func walkThread(
+ ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int,
+) ([]*gomatrixserverlib.HeaderedEvent, bool) {
+ if rc.req.Direction != "down" {
+ util.GetLogger(ctx).Error("not implemented: direction=up")
+ return nil, false
+ }
+ var result []*gomatrixserverlib.HeaderedEvent
+ eventWalker := walker{
+ ctx: ctx,
+ req: rc.req,
+ db: db,
+ fn: func(wi *walkInfo) bool {
+ // If already processed event, skip.
+ if included[wi.EventID] {
+ return false
+ }
+
+ // If the response array is >= limit, stop.
+ if len(result) >= limit {
+ return true
+ }
+
+ // Process the event.
+ // TODO: Include edge information: room ID and servers
+ event := rc.getEventIfVisible(wi.EventID, "", nil)
+ if event != nil {
+ result = append(result, event)
+ }
+ included[wi.EventID] = true
+ return false
+ },
+ }
+ limited, err := eventWalker.WalkFrom(rc.req.EventID)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Errorf("Failed to WalkFrom %s", rc.req.EventID)
+ }
+ return result, limited
+}
+
+func (rc *reqCtx) getEventIfVisible(eventID string, claimedRoomID string, claimedServers []string) *gomatrixserverlib.HeaderedEvent {
+ event, joinedToRoom := getEventIfVisible(rc.ctx, rc.rsAPI, eventID, rc.userID)
+ if event != nil && joinedToRoom {
+ return event
+ }
+ // either we don't have the event or we aren't joined to the room, regardless we should try joining if auto join is enabled
+ if !rc.req.AutoJoin {
+ return nil
+ }
+ // if we're doing this on behalf of a random server don't auto-join rooms regardless of what the request says
+ if rc.isFederatedRequest {
+ return nil
+ }
+ roomID := claimedRoomID
+ var servers []gomatrixserverlib.ServerName
+ if event != nil {
+ roomID = event.RoomID()
+ }
+ for _, s := range claimedServers {
+ servers = append(servers, gomatrixserverlib.ServerName(s))
+ }
+ var joinRes roomserver.PerformJoinResponse
+ rc.rsAPI.PerformJoin(rc.ctx, &roomserver.PerformJoinRequest{
+ UserID: rc.userID,
+ Content: map[string]interface{}{},
+ RoomIDOrAlias: roomID,
+ ServerNames: servers,
+ }, &joinRes)
+ if joinRes.Error != nil {
+ util.GetLogger(rc.ctx).WithError(joinRes.Error).WithField("room_id", roomID).Error("Failed to auto-join room")
+ return nil
+ }
+ if event != nil {
+ return event
+ }
+ // TODO: hit /event_relationships on the server we joined via
+ util.GetLogger(rc.ctx).Infof("joined room but need to fetch event TODO")
+ return nil
+}
+
+func getEventIfVisible(ctx context.Context, rsAPI roomserver.RoomserverInternalAPI, eventID, userID string) (*gomatrixserverlib.HeaderedEvent, bool) {
+ var queryEventsRes roomserver.QueryEventsByIDResponse
+ err := rsAPI.QueryEventsByID(ctx, &roomserver.QueryEventsByIDRequest{
+ EventIDs: []string{eventID},
+ }, &queryEventsRes)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryEventsByID")
+ return nil, false
+ }
+ if len(queryEventsRes.Events) == 0 {
+ util.GetLogger(ctx).Infof("event does not exist")
+ return nil, false // event does not exist
+ }
+ event := queryEventsRes.Events[0]
+
+ // Allow events if the member is in the room
+ // TODO: This does not honour history_visibility
+ // TODO: This does not honour m.room.create content
+ var queryMembershipRes roomserver.QueryMembershipForUserResponse
+ err = rsAPI.QueryMembershipForUser(ctx, &roomserver.QueryMembershipForUserRequest{
+ RoomID: event.RoomID(),
+ UserID: userID,
+ }, &queryMembershipRes)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryMembershipForUser")
+ return nil, false
+ }
+ return event, queryMembershipRes.IsInRoom
+}
+
+type walkInfo struct {
+ eventInfo
+ SiblingNumber int
+ Depth int
+}
+
+type walker struct {
+ ctx context.Context
+ req *EventRelationshipRequest
+ db Database
+ fn func(wi *walkInfo) bool // callback invoked for each event walked, return true to terminate the walk
+}
+
+// WalkFrom the event ID given
+func (w *walker) WalkFrom(eventID string) (limited bool, err error) {
+ children, err := w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst)
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk")
+ return false, err
+ }
+ var next *walkInfo
+ toWalk := w.addChildren(nil, children, 1)
+ next, toWalk = w.nextChild(toWalk)
+ for next != nil {
+ stop := w.fn(next)
+ if stop {
+ return true, nil
+ }
+ // find the children's children
+ children, err = w.db.ChildrenForParent(w.ctx, next.EventID, constRelType, w.req.RecentFirst)
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk")
+ return false, err
+ }
+ toWalk = w.addChildren(toWalk, children, next.Depth+1)
+ next, toWalk = w.nextChild(toWalk)
+ }
+
+ return false, nil
+}
+
+// addChildren adds an event's children to the to walk data structure
+func (w *walker) addChildren(toWalk []walkInfo, children []eventInfo, depthOfChildren int) []walkInfo {
+ // Check what number child this event is (ordered by recent_first) compared to its parent, does it exceed (greater than) max_breadth? If yes, skip.
+ if len(children) > w.req.MaxBreadth {
+ children = children[:w.req.MaxBreadth]
+ }
+ // Check how deep the event is compared to event_id, does it exceed (greater than) max_depth? If yes, skip.
+ if depthOfChildren > w.req.MaxDepth {
+ return toWalk
+ }
+
+ if w.req.DepthFirst {
+ // the slice is a stack so push them in reverse order so we pop them in the correct order
+ // e.g [3,2,1] => [3,2] , 1 => [3] , 2 => [] , 3
+ for i := len(children) - 1; i >= 0; i-- {
+ toWalk = append(toWalk, walkInfo{
+ eventInfo: children[i],
+ SiblingNumber: i + 1, // index from 1
+ Depth: depthOfChildren,
+ })
+ }
+ } else {
+ // the slice is a queue so push them in normal order to we dequeue them in the correct order
+ // e.g [1,2,3] => 1, [2, 3] => 2 , [3] => 3, []
+ for i := range children {
+ toWalk = append(toWalk, walkInfo{
+ eventInfo: children[i],
+ SiblingNumber: i + 1, // index from 1
+ Depth: depthOfChildren,
+ })
+ }
+ }
+ return toWalk
+}
+
+func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) {
+ if len(toWalk) == 0 {
+ return nil, nil
+ }
+ var child walkInfo
+ if w.req.DepthFirst {
+ // toWalk is a stack so pop the child off
+ child, toWalk = toWalk[len(toWalk)-1], toWalk[:len(toWalk)-1]
+ return &child, toWalk
+ }
+ // toWalk is a queue so shift the child off
+ child, toWalk = toWalk[0], toWalk[1:]
+ return &child, toWalk
+}
diff --git a/setup/mscs/msc2836/msc2836_test.go b/setup/mscs/msc2836/msc2836_test.go
new file mode 100644
index 00000000..996cc79f
--- /dev/null
+++ b/setup/mscs/msc2836/msc2836_test.go
@@ -0,0 +1,577 @@
+package msc2836_test
+
+import (
+ "bytes"
+ "context"
+ "crypto/ed25519"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/internal/hooks"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ roomserver "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/mscs/msc2836"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+var (
+ client = &http.Client{
+ Timeout: 10 * time.Second,
+ }
+)
+
+// Basic sanity check of MSC2836 logic. Injects a thread that looks like:
+// A
+// |
+// B
+// / \
+// C D
+// /|\
+// E F G
+// |
+// H
+// And makes sure POST /event_relationships works with various parameters
+func TestMSC2836(t *testing.T) {
+ alice := "@alice:localhost"
+ bob := "@bob:localhost"
+ charlie := "@charlie:localhost"
+ roomIDA := "!alice:localhost"
+ roomIDB := "!bob:localhost"
+ roomIDC := "!charlie:localhost"
+ // give access tokens to all three users
+ nopUserAPI := &testUserAPI{
+ accessTokens: make(map[string]userapi.Device),
+ }
+ nopUserAPI.accessTokens["alice"] = userapi.Device{
+ AccessToken: "alice",
+ DisplayName: "Alice",
+ UserID: alice,
+ }
+ nopUserAPI.accessTokens["bob"] = userapi.Device{
+ AccessToken: "bob",
+ DisplayName: "Bob",
+ UserID: bob,
+ }
+ nopUserAPI.accessTokens["charlie"] = userapi.Device{
+ AccessToken: "charlie",
+ DisplayName: "Charles",
+ UserID: charlie,
+ }
+ eventA := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDA,
+ Sender: alice,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[A] Do you know shelties?",
+ },
+ })
+ eventB := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDB,
+ Sender: bob,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[B] I <3 shelties",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventA.EventID(),
+ },
+ },
+ })
+ eventC := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDB,
+ Sender: bob,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[C] like so much",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventB.EventID(),
+ },
+ },
+ })
+ eventD := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDA,
+ Sender: alice,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[D] but what are shelties???",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventB.EventID(),
+ },
+ },
+ })
+ eventE := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDB,
+ Sender: bob,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[E] seriously???",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventD.EventID(),
+ },
+ },
+ })
+ eventF := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDC,
+ Sender: charlie,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[F] omg how do you not know what shelties are",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventD.EventID(),
+ },
+ },
+ })
+ eventG := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDA,
+ Sender: alice,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[G] looked it up, it's a sheltered person?",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventD.EventID(),
+ },
+ },
+ })
+ eventH := mustCreateEvent(t, fledglingEvent{
+ RoomID: roomIDB,
+ Sender: bob,
+ Type: "m.room.message",
+ Content: map[string]interface{}{
+ "body": "[H] it's a dog!!!!!",
+ "m.relationship": map[string]string{
+ "rel_type": "m.reference",
+ "event_id": eventE.EventID(),
+ },
+ },
+ })
+ // make everyone joined to each other's rooms
+ nopRsAPI := &testRoomserverAPI{
+ userToJoinedRooms: map[string][]string{
+ alice: []string{roomIDA, roomIDB, roomIDC},
+ bob: []string{roomIDA, roomIDB, roomIDC},
+ charlie: []string{roomIDA, roomIDB, roomIDC},
+ },
+ events: map[string]*gomatrixserverlib.HeaderedEvent{
+ eventA.EventID(): eventA,
+ eventB.EventID(): eventB,
+ eventC.EventID(): eventC,
+ eventD.EventID(): eventD,
+ eventE.EventID(): eventE,
+ eventF.EventID(): eventF,
+ eventG.EventID(): eventG,
+ eventH.EventID(): eventH,
+ },
+ }
+ router := injectEvents(t, nopUserAPI, nopRsAPI, []*gomatrixserverlib.HeaderedEvent{
+ eventA, eventB, eventC, eventD, eventE, eventF, eventG, eventH,
+ })
+ cancel := runServer(t, router)
+ defer cancel()
+
+ t.Run("returns 403 on invalid event IDs", func(t *testing.T) {
+ _ = postRelationships(t, 403, "alice", newReq(t, map[string]interface{}{
+ "event_id": "$invalid",
+ }))
+ })
+ t.Run("returns 403 if not joined to the room of specified event in request", func(t *testing.T) {
+ nopUserAPI.accessTokens["frank"] = userapi.Device{
+ AccessToken: "frank",
+ DisplayName: "Frank Not In Room",
+ UserID: "@frank:localhost",
+ }
+ _ = postRelationships(t, 403, "frank", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "limit": 1,
+ "include_parent": true,
+ }))
+ })
+ t.Run("omits parent if not joined to the room of parent of event", func(t *testing.T) {
+ nopUserAPI.accessTokens["frank2"] = userapi.Device{
+ AccessToken: "frank2",
+ DisplayName: "Frank2 Not In Room",
+ UserID: "@frank2:localhost",
+ }
+ // Event B is in roomB, Event A is in roomA, so make frank2 joined to roomB
+ nopRsAPI.userToJoinedRooms["@frank2:localhost"] = []string{roomIDB}
+ body := postRelationships(t, 200, "frank2", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "limit": 1,
+ "include_parent": true,
+ }))
+ assertContains(t, body, []string{eventB.EventID()})
+ })
+ t.Run("returns the parent if include_parent is true", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "include_parent": true,
+ "limit": 2,
+ }))
+ assertContains(t, body, []string{eventB.EventID(), eventA.EventID()})
+ })
+ t.Run("returns the children in the right order if include_children is true", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventD.EventID(),
+ "include_children": true,
+ "recent_first": true,
+ "limit": 4,
+ }))
+ assertContains(t, body, []string{eventD.EventID(), eventG.EventID(), eventF.EventID(), eventE.EventID()})
+ body = postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventD.EventID(),
+ "include_children": true,
+ "recent_first": false,
+ "limit": 4,
+ }))
+ assertContains(t, body, []string{eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID()})
+ })
+ t.Run("walks the graph depth first", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": false,
+ "depth_first": true,
+ "limit": 6,
+ }))
+ // Oldest first so:
+ // A
+ // |
+ // B1
+ // / \
+ // C2 D3
+ // /| \
+ // 4E 6F G
+ // |
+ // 5H
+ assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventH.EventID(), eventF.EventID()})
+ body = postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": true,
+ "depth_first": true,
+ "limit": 6,
+ }))
+ // Recent first so:
+ // A
+ // |
+ // B1
+ // / \
+ // C D2
+ // /| \
+ // E5 F4 G3
+ // |
+ // H6
+ assertContains(t, body, []string{eventB.EventID(), eventD.EventID(), eventG.EventID(), eventF.EventID(), eventE.EventID(), eventH.EventID()})
+ })
+ t.Run("walks the graph breadth first", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": false,
+ "depth_first": false,
+ "limit": 6,
+ }))
+ // Oldest first so:
+ // A
+ // |
+ // B1
+ // / \
+ // C2 D3
+ // /| \
+ // E4 F5 G6
+ // |
+ // H
+ assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID()})
+ body = postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": true,
+ "depth_first": false,
+ "limit": 6,
+ }))
+ // Recent first so:
+ // A
+ // |
+ // B1
+ // / \
+ // C3 D2
+ // /| \
+ // E6 F5 G4
+ // |
+ // H
+ assertContains(t, body, []string{eventB.EventID(), eventD.EventID(), eventC.EventID(), eventG.EventID(), eventF.EventID(), eventE.EventID()})
+ })
+ t.Run("caps via max_breadth", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": false,
+ "depth_first": false,
+ "max_breadth": 2,
+ "limit": 10,
+ }))
+ // Event G gets omitted because of max_breadth
+ assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventH.EventID()})
+ })
+ t.Run("caps via max_depth", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": false,
+ "depth_first": false,
+ "max_depth": 2,
+ "limit": 10,
+ }))
+ // Event H gets omitted because of max_depth
+ assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID()})
+ })
+ t.Run("terminates when reaching the limit", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": false,
+ "depth_first": false,
+ "limit": 4,
+ }))
+ assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID()})
+ })
+ t.Run("returns all events with a high enough limit", func(t *testing.T) {
+ body := postRelationships(t, 200, "alice", newReq(t, map[string]interface{}{
+ "event_id": eventB.EventID(),
+ "recent_first": false,
+ "depth_first": false,
+ "limit": 400,
+ }))
+ assertContains(t, body, []string{eventB.EventID(), eventC.EventID(), eventD.EventID(), eventE.EventID(), eventF.EventID(), eventG.EventID(), eventH.EventID()})
+ })
+}
+
+// TODO: TestMSC2836TerminatesLoops (short and long)
+// TODO: TestMSC2836UnknownEventsSkipped
+// TODO: TestMSC2836SkipEventIfNotInRoom
+
+func newReq(t *testing.T, jsonBody map[string]interface{}) *msc2836.EventRelationshipRequest {
+ t.Helper()
+ b, err := json.Marshal(jsonBody)
+ if err != nil {
+ t.Fatalf("Failed to marshal request: %s", err)
+ }
+ r, err := msc2836.NewEventRelationshipRequest(bytes.NewBuffer(b))
+ if err != nil {
+ t.Fatalf("Failed to NewEventRelationshipRequest: %s", err)
+ }
+ return r
+}
+
+func runServer(t *testing.T, router *mux.Router) func() {
+ t.Helper()
+ externalServ := &http.Server{
+ Addr: string(":8009"),
+ WriteTimeout: 60 * time.Second,
+ Handler: router,
+ }
+ go func() {
+ externalServ.ListenAndServe()
+ }()
+ // wait to listen on the port
+ time.Sleep(500 * time.Millisecond)
+ return func() {
+ externalServ.Shutdown(context.TODO())
+ }
+}
+
+func postRelationships(t *testing.T, expectCode int, accessToken string, req *msc2836.EventRelationshipRequest) *msc2836.EventRelationshipResponse {
+ t.Helper()
+ var r msc2836.EventRelationshipRequest
+ r.Defaults()
+ data, err := json.Marshal(req)
+ if err != nil {
+ t.Fatalf("failed to marshal request: %s", err)
+ }
+ httpReq, err := http.NewRequest(
+ "POST", "http://localhost:8009/_matrix/client/unstable/event_relationships",
+ bytes.NewBuffer(data),
+ )
+ httpReq.Header.Set("Authorization", "Bearer "+accessToken)
+ if err != nil {
+ t.Fatalf("failed to prepare request: %s", err)
+ }
+ res, err := client.Do(httpReq)
+ if err != nil {
+ t.Fatalf("failed to do request: %s", err)
+ }
+ if res.StatusCode != expectCode {
+ body, _ := ioutil.ReadAll(res.Body)
+ t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body))
+ }
+ if res.StatusCode == 200 {
+ var result msc2836.EventRelationshipResponse
+ if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
+ t.Fatalf("response 200 OK but failed to deserialise JSON : %s", err)
+ }
+ return &result
+ }
+ return nil
+}
+
+func assertContains(t *testing.T, result *msc2836.EventRelationshipResponse, wantEventIDs []string) {
+ t.Helper()
+ gotEventIDs := make([]string, len(result.Events))
+ for i, ev := range result.Events {
+ gotEventIDs[i] = ev.EventID
+ }
+ if len(gotEventIDs) != len(wantEventIDs) {
+ t.Fatalf("length mismatch: got %v want %v", gotEventIDs, wantEventIDs)
+ }
+ for i := range gotEventIDs {
+ if gotEventIDs[i] != wantEventIDs[i] {
+ t.Errorf("wrong item in position %d - got %s want %s", i, gotEventIDs[i], wantEventIDs[i])
+ }
+ }
+}
+
+type testUserAPI struct {
+ accessTokens map[string]userapi.Device
+}
+
+func (u *testUserAPI) InputAccountData(ctx context.Context, req *userapi.InputAccountDataRequest, res *userapi.InputAccountDataResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformAccountCreation(ctx context.Context, req *userapi.PerformAccountCreationRequest, res *userapi.PerformAccountCreationResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformPasswordUpdate(ctx context.Context, req *userapi.PerformPasswordUpdateRequest, res *userapi.PerformPasswordUpdateResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformDeviceCreation(ctx context.Context, req *userapi.PerformDeviceCreationRequest, res *userapi.PerformDeviceCreationResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformDeviceDeletion(ctx context.Context, req *userapi.PerformDeviceDeletionRequest, res *userapi.PerformDeviceDeletionResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformDeviceUpdate(ctx context.Context, req *userapi.PerformDeviceUpdateRequest, res *userapi.PerformDeviceUpdateResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
+ return nil
+}
+func (u *testUserAPI) PerformAccountDeactivation(ctx context.Context, req *userapi.PerformAccountDeactivationRequest, res *userapi.PerformAccountDeactivationResponse) error {
+ return nil
+}
+func (u *testUserAPI) QueryProfile(ctx context.Context, req *userapi.QueryProfileRequest, res *userapi.QueryProfileResponse) error {
+ return nil
+}
+func (u *testUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
+ dev, ok := u.accessTokens[req.AccessToken]
+ if !ok {
+ res.Err = fmt.Errorf("unknown token")
+ return nil
+ }
+ res.Device = &dev
+ return nil
+}
+func (u *testUserAPI) QueryDevices(ctx context.Context, req *userapi.QueryDevicesRequest, res *userapi.QueryDevicesResponse) error {
+ return nil
+}
+func (u *testUserAPI) QueryAccountData(ctx context.Context, req *userapi.QueryAccountDataRequest, res *userapi.QueryAccountDataResponse) error {
+ return nil
+}
+func (u *testUserAPI) QueryDeviceInfos(ctx context.Context, req *userapi.QueryDeviceInfosRequest, res *userapi.QueryDeviceInfosResponse) error {
+ return nil
+}
+func (u *testUserAPI) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error {
+ return nil
+}
+
+type testRoomserverAPI struct {
+ // use a trace API as it implements method stubs so we don't need to have them here.
+ // We'll override the functions we care about.
+ roomserver.RoomserverInternalAPITrace
+ userToJoinedRooms map[string][]string
+ events map[string]*gomatrixserverlib.HeaderedEvent
+}
+
+func (r *testRoomserverAPI) QueryEventsByID(ctx context.Context, req *roomserver.QueryEventsByIDRequest, res *roomserver.QueryEventsByIDResponse) error {
+ for _, eventID := range req.EventIDs {
+ ev := r.events[eventID]
+ if ev != nil {
+ res.Events = append(res.Events, ev)
+ }
+ }
+ return nil
+}
+
+func (r *testRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *roomserver.QueryMembershipForUserRequest, res *roomserver.QueryMembershipForUserResponse) error {
+ rooms := r.userToJoinedRooms[req.UserID]
+ for _, roomID := range rooms {
+ if roomID == req.RoomID {
+ res.IsInRoom = true
+ res.HasBeenInRoom = true
+ res.Membership = "join"
+ break
+ }
+ }
+ return nil
+}
+
+func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserver.RoomserverInternalAPI, events []*gomatrixserverlib.HeaderedEvent) *mux.Router {
+ t.Helper()
+ cfg := &config.Dendrite{}
+ cfg.Defaults()
+ cfg.Global.ServerName = "localhost"
+ cfg.MSCs.Database.ConnectionString = "file:msc2836_test.db"
+ cfg.MSCs.MSCs = []string{"msc2836"}
+ base := &setup.BaseDendrite{
+ Cfg: cfg,
+ PublicClientAPIMux: mux.NewRouter().PathPrefix(httputil.PublicClientPathPrefix).Subrouter(),
+ PublicFederationAPIMux: mux.NewRouter().PathPrefix(httputil.PublicFederationPathPrefix).Subrouter(),
+ }
+
+ err := msc2836.Enable(base, rsAPI, nil, userAPI, nil)
+ if err != nil {
+ t.Fatalf("failed to enable MSC2836: %s", err)
+ }
+ for _, ev := range events {
+ hooks.Run(hooks.KindNewEventPersisted, ev)
+ }
+ return base.PublicClientAPIMux
+}
+
+type fledglingEvent struct {
+ Type string
+ StateKey *string
+ Content interface{}
+ Sender string
+ RoomID string
+}
+
+func mustCreateEvent(t *testing.T, ev fledglingEvent) (result *gomatrixserverlib.HeaderedEvent) {
+ t.Helper()
+ roomVer := gomatrixserverlib.RoomVersionV6
+ seed := make([]byte, ed25519.SeedSize) // zero seed
+ key := ed25519.NewKeyFromSeed(seed)
+ eb := gomatrixserverlib.EventBuilder{
+ Sender: ev.Sender,
+ Depth: 999,
+ Type: ev.Type,
+ StateKey: ev.StateKey,
+ RoomID: ev.RoomID,
+ }
+ err := eb.SetContent(ev.Content)
+ if err != nil {
+ t.Fatalf("mustCreateEvent: failed to marshal event content %+v", ev.Content)
+ }
+ // make sure the origin_server_ts changes so we can test recency
+ time.Sleep(1 * time.Millisecond)
+ signedEvent, err := eb.Build(time.Now(), gomatrixserverlib.ServerName("localhost"), "ed25519:test", key, roomVer)
+ if err != nil {
+ t.Fatalf("mustCreateEvent: failed to sign event: %s", err)
+ }
+ h := signedEvent.Headered(roomVer)
+ return h
+}
diff --git a/setup/mscs/msc2836/storage.go b/setup/mscs/msc2836/storage.go
new file mode 100644
index 00000000..72ea5195
--- /dev/null
+++ b/setup/mscs/msc2836/storage.go
@@ -0,0 +1,226 @@
+package msc2836
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type eventInfo struct {
+ EventID string
+ OriginServerTS gomatrixserverlib.Timestamp
+ RoomID string
+ Servers []string
+}
+
+type Database interface {
+ // StoreRelation stores the parent->child and child->parent relationship for later querying.
+ // Also stores the event metadata e.g timestamp
+ StoreRelation(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error
+ // ChildrenForParent returns the events who have the given `eventID` as an m.relationship with the
+ // provided `relType`. The returned slice is sorted by origin_server_ts according to whether
+ // `recentFirst` is true or false.
+ ChildrenForParent(ctx context.Context, eventID, relType string, recentFirst bool) ([]eventInfo, error)
+}
+
+type DB struct {
+ db *sql.DB
+ writer sqlutil.Writer
+ insertEdgeStmt *sql.Stmt
+ insertNodeStmt *sql.Stmt
+ selectChildrenForParentOldestFirstStmt *sql.Stmt
+ selectChildrenForParentRecentFirstStmt *sql.Stmt
+}
+
+// NewDatabase loads the database for msc2836
+func NewDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
+ if dbOpts.ConnectionString.IsPostgres() {
+ return newPostgresDatabase(dbOpts)
+ }
+ return newSQLiteDatabase(dbOpts)
+}
+
+func newPostgresDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
+ d := DB{
+ writer: sqlutil.NewDummyWriter(),
+ }
+ var err error
+ if d.db, err = sqlutil.Open(dbOpts); err != nil {
+ return nil, err
+ }
+ _, err = d.db.Exec(`
+ CREATE TABLE IF NOT EXISTS msc2836_edges (
+ parent_event_id TEXT NOT NULL,
+ child_event_id TEXT NOT NULL,
+ rel_type TEXT NOT NULL,
+ parent_room_id TEXT NOT NULL,
+ parent_servers TEXT NOT NULL,
+ CONSTRAINT msc2836_edges_uniq UNIQUE (parent_event_id, child_event_id, rel_type)
+ );
+
+ CREATE TABLE IF NOT EXISTS msc2836_nodes (
+ event_id TEXT PRIMARY KEY NOT NULL,
+ origin_server_ts BIGINT NOT NULL,
+ room_id TEXT NOT NULL
+ );
+ `)
+ if err != nil {
+ return nil, err
+ }
+ if d.insertEdgeStmt, err = d.db.Prepare(`
+ INSERT INTO msc2836_edges(parent_event_id, child_event_id, rel_type, parent_room_id, parent_servers) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING
+ `); err != nil {
+ return nil, err
+ }
+ if d.insertNodeStmt, err = d.db.Prepare(`
+ INSERT INTO msc2836_nodes(event_id, origin_server_ts, room_id) VALUES($1, $2, $3) ON CONFLICT DO NOTHING
+ `); err != nil {
+ return nil, err
+ }
+ selectChildrenQuery := `
+ SELECT child_event_id, origin_server_ts, room_id FROM msc2836_edges
+ LEFT JOIN msc2836_nodes ON msc2836_edges.child_event_id = msc2836_nodes.event_id
+ WHERE parent_event_id = $1 AND rel_type = $2
+ ORDER BY origin_server_ts
+ `
+ if d.selectChildrenForParentOldestFirstStmt, err = d.db.Prepare(selectChildrenQuery + "ASC"); err != nil {
+ return nil, err
+ }
+ if d.selectChildrenForParentRecentFirstStmt, err = d.db.Prepare(selectChildrenQuery + "DESC"); err != nil {
+ return nil, err
+ }
+ return &d, err
+}
+
+func newSQLiteDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
+ d := DB{
+ writer: sqlutil.NewExclusiveWriter(),
+ }
+ var err error
+ if d.db, err = sqlutil.Open(dbOpts); err != nil {
+ return nil, err
+ }
+ _, err = d.db.Exec(`
+ CREATE TABLE IF NOT EXISTS msc2836_edges (
+ parent_event_id TEXT NOT NULL,
+ child_event_id TEXT NOT NULL,
+ rel_type TEXT NOT NULL,
+ parent_room_id TEXT NOT NULL,
+ parent_servers TEXT NOT NULL,
+ UNIQUE (parent_event_id, child_event_id, rel_type)
+ );
+
+ CREATE TABLE IF NOT EXISTS msc2836_nodes (
+ event_id TEXT PRIMARY KEY NOT NULL,
+ origin_server_ts BIGINT NOT NULL,
+ room_id TEXT NOT NULL
+ );
+ `)
+ if err != nil {
+ return nil, err
+ }
+ if d.insertEdgeStmt, err = d.db.Prepare(`
+ INSERT INTO msc2836_edges(parent_event_id, child_event_id, rel_type, parent_room_id, parent_servers) VALUES($1, $2, $3, $4, $5) ON CONFLICT (parent_event_id, child_event_id, rel_type) DO NOTHING
+ `); err != nil {
+ return nil, err
+ }
+ if d.insertNodeStmt, err = d.db.Prepare(`
+ INSERT INTO msc2836_nodes(event_id, origin_server_ts, room_id) VALUES($1, $2, $3) ON CONFLICT DO NOTHING
+ `); err != nil {
+ return nil, err
+ }
+ selectChildrenQuery := `
+ SELECT child_event_id, origin_server_ts, room_id FROM msc2836_edges
+ LEFT JOIN msc2836_nodes ON msc2836_edges.child_event_id = msc2836_nodes.event_id
+ WHERE parent_event_id = $1 AND rel_type = $2
+ ORDER BY origin_server_ts
+ `
+ if d.selectChildrenForParentOldestFirstStmt, err = d.db.Prepare(selectChildrenQuery + "ASC"); err != nil {
+ return nil, err
+ }
+ if d.selectChildrenForParentRecentFirstStmt, err = d.db.Prepare(selectChildrenQuery + "DESC"); err != nil {
+ return nil, err
+ }
+ return &d, nil
+}
+
+func (p *DB) StoreRelation(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error {
+ parent, child, relType := parentChildEventIDs(ev)
+ if parent == "" || child == "" {
+ return nil
+ }
+ relationRoomID, relationServers := roomIDAndServers(ev)
+ relationServersJSON, err := json.Marshal(relationServers)
+ if err != nil {
+ return err
+ }
+ return p.writer.Do(p.db, nil, func(txn *sql.Tx) error {
+ _, err := txn.Stmt(p.insertEdgeStmt).ExecContext(ctx, parent, child, relType, relationRoomID, string(relationServersJSON))
+ if err != nil {
+ return err
+ }
+ _, err = txn.Stmt(p.insertNodeStmt).ExecContext(ctx, ev.EventID(), ev.OriginServerTS(), ev.RoomID())
+ return err
+ })
+}
+
+func (p *DB) ChildrenForParent(ctx context.Context, eventID, relType string, recentFirst bool) ([]eventInfo, error) {
+ var rows *sql.Rows
+ var err error
+ if recentFirst {
+ rows, err = p.selectChildrenForParentRecentFirstStmt.QueryContext(ctx, eventID, relType)
+ } else {
+ rows, err = p.selectChildrenForParentOldestFirstStmt.QueryContext(ctx, eventID, relType)
+ }
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ var children []eventInfo
+ for rows.Next() {
+ var evInfo eventInfo
+ if err := rows.Scan(&evInfo.EventID, &evInfo.OriginServerTS, &evInfo.RoomID); err != nil {
+ return nil, err
+ }
+ children = append(children, evInfo)
+ }
+ return children, nil
+}
+
+func parentChildEventIDs(ev *gomatrixserverlib.HeaderedEvent) (parent, child, relType string) {
+ if ev == nil {
+ return
+ }
+ body := struct {
+ Relationship struct {
+ RelType string `json:"rel_type"`
+ EventID string `json:"event_id"`
+ } `json:"m.relationship"`
+ }{}
+ if err := json.Unmarshal(ev.Content(), &body); err != nil {
+ return
+ }
+ if body.Relationship.EventID == "" || body.Relationship.RelType == "" {
+ return
+ }
+ return body.Relationship.EventID, ev.EventID(), body.Relationship.RelType
+}
+
+func roomIDAndServers(ev *gomatrixserverlib.HeaderedEvent) (roomID string, servers []string) {
+ servers = []string{}
+ if ev == nil {
+ return
+ }
+ body := struct {
+ RoomID string `json:"relationship_room_id"`
+ Servers []string `json:"relationship_servers"`
+ }{}
+ if err := json.Unmarshal(ev.Unsigned(), &body); err != nil {
+ return
+ }
+ return body.RoomID, body.Servers
+}
diff --git a/setup/mscs/mscs.go b/setup/mscs/mscs.go
new file mode 100644
index 00000000..8b0498ce
--- /dev/null
+++ b/setup/mscs/mscs.go
@@ -0,0 +1,42 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mscs implements Matrix Spec Changes from https://github.com/matrix-org/matrix-doc
+package mscs
+
+import (
+ "fmt"
+
+ "github.com/matrix-org/dendrite/setup"
+ "github.com/matrix-org/dendrite/setup/mscs/msc2836"
+)
+
+// Enable MSCs - returns an error on unknown MSCs
+func Enable(base *setup.BaseDendrite, monolith *setup.Monolith) error {
+ for _, msc := range base.Cfg.MSCs.MSCs {
+ if err := EnableMSC(base, monolith, msc); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func EnableMSC(base *setup.BaseDendrite, monolith *setup.Monolith, msc string) error {
+ switch msc {
+ case "msc2836":
+ return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationSenderAPI, monolith.UserAPI, monolith.KeyRing)
+ default:
+ return fmt.Errorf("EnableMSC: unknown msc '%s'", msc)
+ }
+}