aboutsummaryrefslogtreecommitdiff
path: root/internal/setup/base.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/setup/base.go')
-rw-r--r--internal/setup/base.go316
1 files changed, 316 insertions, 0 deletions
diff --git a/internal/setup/base.go b/internal/setup/base.go
new file mode 100644
index 00000000..fb304893
--- /dev/null
+++ b/internal/setup/base.go
@@ -0,0 +1,316 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package setup
+
+import (
+ "database/sql"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/naffka"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
+ "github.com/matrix-org/dendrite/internal"
+
+ "github.com/Shopify/sarama"
+ "github.com/gorilla/mux"
+
+ appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
+ asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
+ eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
+ eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
+ fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp"
+ "github.com/matrix-org/dendrite/internal/config"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp"
+ serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api"
+ skinthttp "github.com/matrix-org/dendrite/serverkeyapi/inthttp"
+ "github.com/sirupsen/logrus"
+
+ _ "net/http/pprof"
+)
+
+// BaseDendrite is a base for creating new instances of dendrite. It parses
+// command line flags and config, and exposes methods for creating various
+// resources. All errors are handled by logging then exiting, so all methods
+// should only be used during start up.
+// Must be closed when shutting down.
+type BaseDendrite struct {
+ componentName string
+ tracerCloser io.Closer
+
+ // PublicAPIMux should be used to register new public matrix api endpoints
+ PublicAPIMux *mux.Router
+ InternalAPIMux *mux.Router
+ UseHTTPAPIs bool
+ httpClient *http.Client
+ Cfg *config.Dendrite
+ Caches *caching.Caches
+ KafkaConsumer sarama.Consumer
+ KafkaProducer sarama.SyncProducer
+}
+
+const HTTPServerTimeout = time.Minute * 5
+const HTTPClientTimeout = time.Second * 30
+
+// NewBaseDendrite creates a new instance to be used by a component.
+// The componentName is used for logging purposes, and should be a friendly name
+// of the compontent running, e.g. "SyncAPI"
+func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite {
+ internal.SetupStdLogging()
+ internal.SetupHookLogging(cfg.Logging, componentName)
+ internal.SetupPprof()
+
+ closer, err := cfg.SetupTracing("Dendrite" + componentName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to start opentracing")
+ }
+
+ var kafkaConsumer sarama.Consumer
+ var kafkaProducer sarama.SyncProducer
+ if cfg.Kafka.UseNaffka {
+ kafkaConsumer, kafkaProducer = setupNaffka(cfg)
+ } else {
+ kafkaConsumer, kafkaProducer = setupKafka(cfg)
+ }
+
+ cache, err := caching.NewInMemoryLRUCache()
+ if err != nil {
+ logrus.WithError(err).Warnf("Failed to create cache")
+ }
+
+ client := http.Client{Timeout: HTTPClientTimeout}
+ if cfg.Proxy != nil {
+ client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{
+ Scheme: cfg.Proxy.Protocol,
+ Host: fmt.Sprintf("%s:%d", cfg.Proxy.Host, cfg.Proxy.Port),
+ })}
+ }
+
+ // Ideally we would only use SkipClean on routes which we know can allow '/' but due to
+ // https://github.com/gorilla/mux/issues/460 we have to attach this at the top router.
+ // When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing
+ // path parameters:
+ // /foo/bar%2Fbaz == [foo, bar%2Fbaz] (from UseEncodedPath)
+ // /foo/bar%2F%2Fbaz == [foo, bar%2F%2Fbaz] (from SkipClean)
+ // In particular, rooms v3 event IDs are not urlsafe and can include '/' and because they
+ // are randomly generated it results in flakey tests.
+ // We need to be careful with media APIs if they read from a filesystem to make sure they
+ // are not inadvertently reading paths without cleaning, else this could introduce a
+ // directory traversal attack e.g /../../../etc/passwd
+ httpmux := mux.NewRouter().SkipClean(true)
+
+ return &BaseDendrite{
+ componentName: componentName,
+ UseHTTPAPIs: useHTTPAPIs,
+ tracerCloser: closer,
+ Cfg: cfg,
+ Caches: cache,
+ PublicAPIMux: httpmux.PathPrefix(httputil.PublicPathPrefix).Subrouter().UseEncodedPath(),
+ InternalAPIMux: httpmux.PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
+ httpClient: &client,
+ KafkaConsumer: kafkaConsumer,
+ KafkaProducer: kafkaProducer,
+ }
+}
+
+// Close implements io.Closer
+func (b *BaseDendrite) Close() error {
+ return b.tracerCloser.Close()
+}
+
+// AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP.
+func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI {
+ a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.httpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed")
+ }
+ return a
+}
+
+// RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP.
+func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI {
+ rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.Caches)
+ if err != nil {
+ logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.httpClient)
+ }
+ return rsAPI
+}
+
+// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
+func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
+ e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("EDUServerClient failed", b.httpClient)
+ }
+ return e
+}
+
+// FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting
+// the federation sender over HTTP
+func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI {
+ f, err := fsinthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.httpClient)
+ if err != nil {
+ logrus.WithError(err).Panic("FederationSenderHTTPClient failed", b.httpClient)
+ }
+ return f
+}
+
+// ServerKeyAPIClient returns ServerKeyInternalAPI for hitting the server key API over HTTP
+func (b *BaseDendrite) ServerKeyAPIClient() serverKeyAPI.ServerKeyInternalAPI {
+ f, err := skinthttp.NewServerKeyClient(
+ b.Cfg.ServerKeyAPIURL(),
+ b.httpClient,
+ b.Caches,
+ )
+ if err != nil {
+ logrus.WithError(err).Panic("NewServerKeyInternalAPIHTTP failed", b.httpClient)
+ }
+ return f
+}
+
+// CreateDeviceDB creates a new instance of the device database. Should only be
+// called once per component.
+func (b *BaseDendrite) CreateDeviceDB() devices.Database {
+ db, err := devices.NewDatabase(string(b.Cfg.Database.Device), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to devices db")
+ }
+
+ return db
+}
+
+// CreateAccountsDB creates a new instance of the accounts database. Should only
+// be called once per component.
+func (b *BaseDendrite) CreateAccountsDB() accounts.Database {
+ db, err := accounts.NewDatabase(string(b.Cfg.Database.Account), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to accounts db")
+ }
+
+ return db
+}
+
+// CreateFederationClient creates a new federation client. Should only be called
+// once per component.
+func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient {
+ return gomatrixserverlib.NewFederationClient(
+ b.Cfg.Matrix.ServerName, b.Cfg.Matrix.KeyID, b.Cfg.Matrix.PrivateKey,
+ )
+}
+
+// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on
+// ApiMux under /api/ and adds a prometheus handler under /metrics.
+func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) {
+ // If a separate bind address is defined, listen on that. Otherwise use
+ // the listen address
+ var addr string
+ if bindaddr != "" {
+ addr = bindaddr
+ } else {
+ addr = listenaddr
+ }
+
+ serv := http.Server{
+ Addr: addr,
+ WriteTimeout: HTTPServerTimeout,
+ }
+
+ httputil.SetupHTTPAPI(
+ http.DefaultServeMux,
+ b.PublicAPIMux,
+ b.InternalAPIMux,
+ b.Cfg,
+ b.UseHTTPAPIs,
+ )
+ logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr)
+
+ err := serv.ListenAndServe()
+ if err != nil {
+ logrus.WithError(err).Fatal("failed to serve http")
+ }
+
+ logrus.Infof("Stopped %s server on %s", b.componentName, serv.Addr)
+}
+
+// setupKafka creates kafka consumer/producer pair from the config.
+func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
+ consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to start kafka consumer")
+ }
+
+ producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to setup kafka producers")
+ }
+
+ return consumer, producer
+}
+
+// setupNaffka creates kafka consumer/producer pair from the config.
+func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
+ var err error
+ var db *sql.DB
+ var naffkaDB *naffka.DatabaseImpl
+
+ uri, err := url.Parse(string(cfg.Database.Naffka))
+ if err != nil || uri.Scheme == "file" {
+ var cs string
+ cs, err = sqlutil.ParseFileURI(string(cfg.Database.Naffka))
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to parse naffka database file URI")
+ }
+ db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to open naffka database")
+ }
+
+ naffkaDB, err = naffka.NewSqliteDatabase(db)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ } else {
+ db, err = sqlutil.Open("postgres", string(cfg.Database.Naffka), nil)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to open naffka database")
+ }
+
+ naffkaDB, err = naffka.NewPostgresqlDatabase(db)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ }
+
+ if naffkaDB == nil {
+ panic("naffka connection string not understood")
+ }
+
+ naff, err := naffka.New(naffkaDB)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka")
+ }
+
+ return naff, naff
+}