From ecd7accbad724f26248498a9035a1fbc69e2f08d Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 12 Jun 2020 14:55:57 +0100 Subject: Rehuffle where things are in the internal package (#1122) renamed: internal/eventcontent.go -> internal/eventutil/eventcontent.go renamed: internal/events.go -> internal/eventutil/events.go renamed: internal/types.go -> internal/eventutil/types.go renamed: internal/http/http.go -> internal/httputil/http.go renamed: internal/httpapi.go -> internal/httputil/httpapi.go renamed: internal/httpapi_test.go -> internal/httputil/httpapi_test.go renamed: internal/httpapis/paths.go -> internal/httputil/paths.go renamed: internal/routing.go -> internal/httputil/routing.go renamed: internal/basecomponent/base.go -> internal/setup/base.go renamed: internal/basecomponent/flags.go -> internal/setup/flags.go renamed: internal/partition_offset_table.go -> internal/sqlutil/partition_offset_table.go renamed: internal/postgres.go -> internal/sqlutil/postgres.go renamed: internal/postgres_wasm.go -> internal/sqlutil/postgres_wasm.go renamed: internal/sql.go -> internal/sqlutil/sql.go --- internal/basecomponent/base.go | 316 ----------------------------- internal/basecomponent/flags.go | 42 ---- internal/consumers.go | 11 +- internal/eventcontent.go | 86 -------- internal/events.go | 150 -------------- internal/eventutil/eventcontent.go | 86 ++++++++ internal/eventutil/events.go | 150 ++++++++++++++ internal/eventutil/types.go | 66 ++++++ internal/http/http.go | 68 ------- internal/httpapi.go | 275 ------------------------- internal/httpapi_test.go | 95 --------- internal/httpapis/paths.go | 6 - internal/httputil/http.go | 81 ++++++++ internal/httputil/httpapi.go | 288 ++++++++++++++++++++++++++ internal/httputil/httpapi_test.go | 109 ++++++++++ internal/httputil/paths.go | 20 ++ internal/httputil/routing.go | 35 ++++ internal/partition_offset_table.go | 111 ---------- internal/postgres.go | 25 --- internal/postgres_wasm.go | 22 -- internal/routing.go | 35 ---- internal/setup/base.go | 316 +++++++++++++++++++++++++++++ internal/setup/flags.go | 42 ++++ internal/setup/monolith.go | 14 ++ internal/sql.go | 174 ---------------- internal/sqlutil/partition_offset_table.go | 126 ++++++++++++ internal/sqlutil/postgres.go | 25 +++ internal/sqlutil/postgres_wasm.go | 22 ++ internal/sqlutil/sql.go | 172 ++++++++++++++++ internal/sqlutil/trace.go | 5 +- internal/sqlutil/uri.go | 14 ++ internal/types.go | 66 ------ 32 files changed, 1570 insertions(+), 1483 deletions(-) delete mode 100644 internal/basecomponent/base.go delete mode 100644 internal/basecomponent/flags.go delete mode 100644 internal/eventcontent.go delete mode 100644 internal/events.go create mode 100644 internal/eventutil/eventcontent.go create mode 100644 internal/eventutil/events.go create mode 100644 internal/eventutil/types.go delete mode 100644 internal/http/http.go delete mode 100644 internal/httpapi.go delete mode 100644 internal/httpapi_test.go delete mode 100644 internal/httpapis/paths.go create mode 100644 internal/httputil/http.go create mode 100644 internal/httputil/httpapi.go create mode 100644 internal/httputil/httpapi_test.go create mode 100644 internal/httputil/paths.go create mode 100644 internal/httputil/routing.go delete mode 100644 internal/partition_offset_table.go delete mode 100644 internal/postgres.go delete mode 100644 internal/postgres_wasm.go delete mode 100644 internal/routing.go create mode 100644 internal/setup/base.go create mode 100644 internal/setup/flags.go delete mode 100644 internal/sql.go create mode 100644 internal/sqlutil/partition_offset_table.go create mode 100644 internal/sqlutil/postgres.go create mode 100644 internal/sqlutil/postgres_wasm.go create mode 100644 internal/sqlutil/sql.go delete mode 100644 internal/types.go (limited to 'internal') diff --git a/internal/basecomponent/base.go b/internal/basecomponent/base.go deleted file mode 100644 index 3ad1e4af..00000000 --- a/internal/basecomponent/base.go +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright 2017 New Vector Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package basecomponent - -import ( - "database/sql" - "fmt" - "io" - "net/http" - "net/url" - "time" - - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/httpapis" - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/naffka" - - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" - "github.com/matrix-org/dendrite/internal" - - "github.com/Shopify/sarama" - "github.com/gorilla/mux" - - appserviceAPI "github.com/matrix-org/dendrite/appservice/api" - asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" - eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" - fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp" - "github.com/matrix-org/dendrite/internal/config" - roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp" - serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api" - skinthttp "github.com/matrix-org/dendrite/serverkeyapi/inthttp" - "github.com/sirupsen/logrus" - - _ "net/http/pprof" -) - -// BaseDendrite is a base for creating new instances of dendrite. It parses -// command line flags and config, and exposes methods for creating various -// resources. All errors are handled by logging then exiting, so all methods -// should only be used during start up. -// Must be closed when shutting down. -type BaseDendrite struct { - componentName string - tracerCloser io.Closer - - // PublicAPIMux should be used to register new public matrix api endpoints - PublicAPIMux *mux.Router - InternalAPIMux *mux.Router - UseHTTPAPIs bool - httpClient *http.Client - Cfg *config.Dendrite - Caches *caching.Caches - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer -} - -const HTTPServerTimeout = time.Minute * 5 -const HTTPClientTimeout = time.Second * 30 - -// NewBaseDendrite creates a new instance to be used by a component. -// The componentName is used for logging purposes, and should be a friendly name -// of the compontent running, e.g. "SyncAPI" -func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite { - internal.SetupStdLogging() - internal.SetupHookLogging(cfg.Logging, componentName) - internal.SetupPprof() - - closer, err := cfg.SetupTracing("Dendrite" + componentName) - if err != nil { - logrus.WithError(err).Panicf("failed to start opentracing") - } - - var kafkaConsumer sarama.Consumer - var kafkaProducer sarama.SyncProducer - if cfg.Kafka.UseNaffka { - kafkaConsumer, kafkaProducer = setupNaffka(cfg) - } else { - kafkaConsumer, kafkaProducer = setupKafka(cfg) - } - - cache, err := caching.NewInMemoryLRUCache() - if err != nil { - logrus.WithError(err).Warnf("Failed to create cache") - } - - client := http.Client{Timeout: HTTPClientTimeout} - if cfg.Proxy != nil { - client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{ - Scheme: cfg.Proxy.Protocol, - Host: fmt.Sprintf("%s:%d", cfg.Proxy.Host, cfg.Proxy.Port), - })} - } - - // Ideally we would only use SkipClean on routes which we know can allow '/' but due to - // https://github.com/gorilla/mux/issues/460 we have to attach this at the top router. - // When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing - // path parameters: - // /foo/bar%2Fbaz == [foo, bar%2Fbaz] (from UseEncodedPath) - // /foo/bar%2F%2Fbaz == [foo, bar%2F%2Fbaz] (from SkipClean) - // In particular, rooms v3 event IDs are not urlsafe and can include '/' and because they - // are randomly generated it results in flakey tests. - // We need to be careful with media APIs if they read from a filesystem to make sure they - // are not inadvertently reading paths without cleaning, else this could introduce a - // directory traversal attack e.g /../../../etc/passwd - httpmux := mux.NewRouter().SkipClean(true) - - return &BaseDendrite{ - componentName: componentName, - UseHTTPAPIs: useHTTPAPIs, - tracerCloser: closer, - Cfg: cfg, - Caches: cache, - PublicAPIMux: httpmux.PathPrefix(httpapis.PublicPathPrefix).Subrouter().UseEncodedPath(), - InternalAPIMux: httpmux.PathPrefix(httpapis.InternalPathPrefix).Subrouter().UseEncodedPath(), - httpClient: &client, - KafkaConsumer: kafkaConsumer, - KafkaProducer: kafkaProducer, - } -} - -// Close implements io.Closer -func (b *BaseDendrite) Close() error { - return b.tracerCloser.Close() -} - -// AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP. -func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI { - a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.httpClient) - if err != nil { - logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed") - } - return a -} - -// RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP. -func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI { - rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.Caches) - if err != nil { - logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.httpClient) - } - return rsAPI -} - -// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP -func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI { - e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient) - if err != nil { - logrus.WithError(err).Panic("EDUServerClient failed", b.httpClient) - } - return e -} - -// FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting -// the federation sender over HTTP -func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI { - f, err := fsinthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.httpClient) - if err != nil { - logrus.WithError(err).Panic("FederationSenderHTTPClient failed", b.httpClient) - } - return f -} - -// ServerKeyAPIClient returns ServerKeyInternalAPI for hitting the server key API over HTTP -func (b *BaseDendrite) ServerKeyAPIClient() serverKeyAPI.ServerKeyInternalAPI { - f, err := skinthttp.NewServerKeyClient( - b.Cfg.ServerKeyAPIURL(), - b.httpClient, - b.Caches, - ) - if err != nil { - logrus.WithError(err).Panic("NewServerKeyInternalAPIHTTP failed", b.httpClient) - } - return f -} - -// CreateDeviceDB creates a new instance of the device database. Should only be -// called once per component. -func (b *BaseDendrite) CreateDeviceDB() devices.Database { - db, err := devices.NewDatabase(string(b.Cfg.Database.Device), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to devices db") - } - - return db -} - -// CreateAccountsDB creates a new instance of the accounts database. Should only -// be called once per component. -func (b *BaseDendrite) CreateAccountsDB() accounts.Database { - db, err := accounts.NewDatabase(string(b.Cfg.Database.Account), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to accounts db") - } - - return db -} - -// CreateFederationClient creates a new federation client. Should only be called -// once per component. -func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient { - return gomatrixserverlib.NewFederationClient( - b.Cfg.Matrix.ServerName, b.Cfg.Matrix.KeyID, b.Cfg.Matrix.PrivateKey, - ) -} - -// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on -// ApiMux under /api/ and adds a prometheus handler under /metrics. -func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) { - // If a separate bind address is defined, listen on that. Otherwise use - // the listen address - var addr string - if bindaddr != "" { - addr = bindaddr - } else { - addr = listenaddr - } - - serv := http.Server{ - Addr: addr, - WriteTimeout: HTTPServerTimeout, - } - - internal.SetupHTTPAPI( - http.DefaultServeMux, - b.PublicAPIMux, - b.InternalAPIMux, - b.Cfg, - b.UseHTTPAPIs, - ) - logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr) - - err := serv.ListenAndServe() - if err != nil { - logrus.WithError(err).Fatal("failed to serve http") - } - - logrus.Infof("Stopped %s server on %s", b.componentName, serv.Addr) -} - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - var err error - var db *sql.DB - var naffkaDB *naffka.DatabaseImpl - - uri, err := url.Parse(string(cfg.Database.Naffka)) - if err != nil || uri.Scheme == "file" { - var cs string - cs, err = sqlutil.ParseFileURI(string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to parse naffka database file URI") - } - db, err = sqlutil.Open(internal.SQLiteDriverName(), cs, nil) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewSqliteDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - } else { - db, err = sqlutil.Open("postgres", string(cfg.Database.Naffka), nil) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewPostgresqlDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - } - - if naffkaDB == nil { - panic("naffka connection string not understood") - } - - naff, err := naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - - return naff, naff -} diff --git a/internal/basecomponent/flags.go b/internal/basecomponent/flags.go deleted file mode 100644 index 117df079..00000000 --- a/internal/basecomponent/flags.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 New Vector Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package basecomponent - -import ( - "flag" - - "github.com/matrix-org/dendrite/internal/config" - - "github.com/sirupsen/logrus" -) - -var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") - -// ParseFlags parses the commandline flags and uses them to create a config. -func ParseFlags(monolith bool) *config.Dendrite { - flag.Parse() - - if *configPath == "" { - logrus.Fatal("--config must be supplied") - } - - cfg, err := config.Load(*configPath, monolith) - - if err != nil { - logrus.Fatalf("Invalid config file: %s", err) - } - - return cfg -} diff --git a/internal/consumers.go b/internal/consumers.go index df68cbfa..d7917f23 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -19,20 +19,13 @@ import ( "fmt" "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/sqlutil" ) -// A PartitionOffset is the offset into a partition of the input log. -type PartitionOffset struct { - // The ID of the partition. - Partition int32 - // The offset into the partition. - Offset int64 -} - // A PartitionStorer has the storage APIs needed by the consumer. type PartitionStorer interface { // PartitionOffsets returns the offsets the consumer has reached for each partition. - PartitionOffsets(ctx context.Context, topic string) ([]PartitionOffset, error) + PartitionOffsets(ctx context.Context, topic string) ([]sqlutil.PartitionOffset, error) // SetPartitionOffset records where the consumer has reached for a partition. SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error } diff --git a/internal/eventcontent.go b/internal/eventcontent.go deleted file mode 100644 index 64512836..00000000 --- a/internal/eventcontent.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import "github.com/matrix-org/gomatrixserverlib" - -// NameContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-name -type NameContent struct { - Name string `json:"name"` -} - -// TopicContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-topic -type TopicContent struct { - Topic string `json:"topic"` -} - -// GuestAccessContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-guest-access -type GuestAccessContent struct { - GuestAccess string `json:"guest_access"` -} - -// HistoryVisibilityContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-history-visibility -type HistoryVisibilityContent struct { - HistoryVisibility string `json:"history_visibility"` -} - -// CanonicalAlias is the event content for https://matrix.org/docs/spec/client_server/r0.6.0#m-room-canonical-alias -type CanonicalAlias struct { - Alias string `json:"alias"` -} - -// InitialPowerLevelsContent returns the initial values for m.room.power_levels on room creation -// if they have not been specified. -// http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-power-levels -// https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/handlers/room.py#L294 -func InitialPowerLevelsContent(roomCreator string) (c gomatrixserverlib.PowerLevelContent) { - c.Defaults() - c.Events = map[string]int64{ - "m.room.name": 50, - "m.room.power_levels": 100, - "m.room.history_visibility": 100, - "m.room.canonical_alias": 50, - "m.room.avatar": 50, - "m.room.aliases": 0, // anyone can publish aliases by default. Has to be 0 else state_default is used. - } - c.Users = map[string]int64{roomCreator: 100} - return c -} - -// AliasesContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-aliases -type AliasesContent struct { - Aliases []string `json:"aliases"` -} - -// CanonicalAliasContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-canonical-alias -type CanonicalAliasContent struct { - Alias string `json:"alias"` -} - -// AvatarContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar -type AvatarContent struct { - Info ImageInfo `json:"info,omitempty"` - URL string `json:"url"` - ThumbnailURL string `json:"thumbnail_url,omitempty"` - ThumbnailInfo ImageInfo `json:"thumbnail_info,omitempty"` -} - -// ImageInfo implements the ImageInfo structure from http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar -type ImageInfo struct { - Mimetype string `json:"mimetype"` - Height int64 `json:"h"` - Width int64 `json:"w"` - Size int64 `json:"size"` -} diff --git a/internal/events.go b/internal/events.go deleted file mode 100644 index 89c82e03..00000000 --- a/internal/events.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/roomserver/api" - - "github.com/matrix-org/gomatrixserverlib" -) - -// ErrRoomNoExists is returned when trying to lookup the state of a room that -// doesn't exist -var ErrRoomNoExists = errors.New("Room does not exist") - -// BuildEvent builds a Matrix event using the event builder and roomserver query -// API client provided. If also fills roomserver query API response (if provided) -// in case the function calling FillBuilder needs to use it. -// Returns ErrRoomNoExists if the state of the room could not be retrieved because -// the room doesn't exist -// Returns an error if something else went wrong -func BuildEvent( - ctx context.Context, - builder *gomatrixserverlib.EventBuilder, cfg *config.Dendrite, evTime time.Time, - rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, -) (*gomatrixserverlib.Event, error) { - if queryRes == nil { - queryRes = &api.QueryLatestEventsAndStateResponse{} - } - - err := AddPrevEventsToEvent(ctx, builder, rsAPI, queryRes) - if err != nil { - // This can pass through a ErrRoomNoExists to the caller - return nil, err - } - - event, err := builder.Build( - evTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, - cfg.Matrix.PrivateKey, queryRes.RoomVersion, - ) - if err != nil { - return nil, err - } - - return &event, nil -} - -// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder -func AddPrevEventsToEvent( - ctx context.Context, - builder *gomatrixserverlib.EventBuilder, - rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, -) error { - eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) - if err != nil { - return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) - } - - if len(eventsNeeded.Tuples()) == 0 { - return errors.New("expecting state tuples for event builder, got none") - } - - // Ask the roomserver for information about this room - queryReq := api.QueryLatestEventsAndStateRequest{ - RoomID: builder.RoomID, - StateToFetch: eventsNeeded.Tuples(), - } - if err = rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil { - return fmt.Errorf("rsAPI.QueryLatestEventsAndState: %w", err) - } - - if !queryRes.RoomExists { - return ErrRoomNoExists - } - - eventFormat, err := queryRes.RoomVersion.EventFormat() - if err != nil { - return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err) - } - - builder.Depth = queryRes.Depth - - authEvents := gomatrixserverlib.NewAuthEvents(nil) - - for i := range queryRes.StateEvents { - err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) - if err != nil { - return fmt.Errorf("authEvents.AddEvent: %w", err) - } - } - - refs, err := eventsNeeded.AuthEventReferences(&authEvents) - if err != nil { - return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) - } - - truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) - switch eventFormat { - case gomatrixserverlib.EventFormatV1: - builder.AuthEvents = truncAuth - builder.PrevEvents = truncPrev - case gomatrixserverlib.EventFormatV2: - v2AuthRefs, v2PrevRefs := []string{}, []string{} - for _, ref := range truncAuth { - v2AuthRefs = append(v2AuthRefs, ref.EventID) - } - for _, ref := range truncPrev { - v2PrevRefs = append(v2PrevRefs, ref.EventID) - } - builder.AuthEvents = v2AuthRefs - builder.PrevEvents = v2PrevRefs - } - - return nil -} - -// truncateAuthAndPrevEvents limits the number of events we add into -// an event as prev_events or auth_events. -// NOTSPEC: The limits here feel a bit arbitrary but they are currently -// here because of https://github.com/matrix-org/matrix-doc/issues/2307 -// and because Synapse will just drop events that don't comply. -func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( - truncAuth, truncPrev []gomatrixserverlib.EventReference, -) { - truncAuth, truncPrev = auth, prev - if len(truncAuth) > 10 { - truncAuth = truncAuth[:10] - } - if len(truncPrev) > 20 { - truncPrev = truncPrev[:20] - } - return -} diff --git a/internal/eventutil/eventcontent.go b/internal/eventutil/eventcontent.go new file mode 100644 index 00000000..873e20a8 --- /dev/null +++ b/internal/eventutil/eventcontent.go @@ -0,0 +1,86 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventutil + +import "github.com/matrix-org/gomatrixserverlib" + +// NameContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-name +type NameContent struct { + Name string `json:"name"` +} + +// TopicContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-topic +type TopicContent struct { + Topic string `json:"topic"` +} + +// GuestAccessContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-guest-access +type GuestAccessContent struct { + GuestAccess string `json:"guest_access"` +} + +// HistoryVisibilityContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-history-visibility +type HistoryVisibilityContent struct { + HistoryVisibility string `json:"history_visibility"` +} + +// CanonicalAlias is the event content for https://matrix.org/docs/spec/client_server/r0.6.0#m-room-canonical-alias +type CanonicalAlias struct { + Alias string `json:"alias"` +} + +// InitialPowerLevelsContent returns the initial values for m.room.power_levels on room creation +// if they have not been specified. +// http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-power-levels +// https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/handlers/room.py#L294 +func InitialPowerLevelsContent(roomCreator string) (c gomatrixserverlib.PowerLevelContent) { + c.Defaults() + c.Events = map[string]int64{ + "m.room.name": 50, + "m.room.power_levels": 100, + "m.room.history_visibility": 100, + "m.room.canonical_alias": 50, + "m.room.avatar": 50, + "m.room.aliases": 0, // anyone can publish aliases by default. Has to be 0 else state_default is used. + } + c.Users = map[string]int64{roomCreator: 100} + return c +} + +// AliasesContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-aliases +type AliasesContent struct { + Aliases []string `json:"aliases"` +} + +// CanonicalAliasContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-canonical-alias +type CanonicalAliasContent struct { + Alias string `json:"alias"` +} + +// AvatarContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar +type AvatarContent struct { + Info ImageInfo `json:"info,omitempty"` + URL string `json:"url"` + ThumbnailURL string `json:"thumbnail_url,omitempty"` + ThumbnailInfo ImageInfo `json:"thumbnail_info,omitempty"` +} + +// ImageInfo implements the ImageInfo structure from http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar +type ImageInfo struct { + Mimetype string `json:"mimetype"` + Height int64 `json:"h"` + Width int64 `json:"w"` + Size int64 `json:"size"` +} diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go new file mode 100644 index 00000000..e6c7a4ff --- /dev/null +++ b/internal/eventutil/events.go @@ -0,0 +1,150 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventutil + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" + + "github.com/matrix-org/gomatrixserverlib" +) + +// ErrRoomNoExists is returned when trying to lookup the state of a room that +// doesn't exist +var ErrRoomNoExists = errors.New("Room does not exist") + +// BuildEvent builds a Matrix event using the event builder and roomserver query +// API client provided. If also fills roomserver query API response (if provided) +// in case the function calling FillBuilder needs to use it. +// Returns ErrRoomNoExists if the state of the room could not be retrieved because +// the room doesn't exist +// Returns an error if something else went wrong +func BuildEvent( + ctx context.Context, + builder *gomatrixserverlib.EventBuilder, cfg *config.Dendrite, evTime time.Time, + rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, +) (*gomatrixserverlib.Event, error) { + if queryRes == nil { + queryRes = &api.QueryLatestEventsAndStateResponse{} + } + + err := AddPrevEventsToEvent(ctx, builder, rsAPI, queryRes) + if err != nil { + // This can pass through a ErrRoomNoExists to the caller + return nil, err + } + + event, err := builder.Build( + evTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, + cfg.Matrix.PrivateKey, queryRes.RoomVersion, + ) + if err != nil { + return nil, err + } + + return &event, nil +} + +// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder +func AddPrevEventsToEvent( + ctx context.Context, + builder *gomatrixserverlib.EventBuilder, + rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, +) error { + eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) + if err != nil { + return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) + } + + if len(eventsNeeded.Tuples()) == 0 { + return errors.New("expecting state tuples for event builder, got none") + } + + // Ask the roomserver for information about this room + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: builder.RoomID, + StateToFetch: eventsNeeded.Tuples(), + } + if err = rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil { + return fmt.Errorf("rsAPI.QueryLatestEventsAndState: %w", err) + } + + if !queryRes.RoomExists { + return ErrRoomNoExists + } + + eventFormat, err := queryRes.RoomVersion.EventFormat() + if err != nil { + return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err) + } + + builder.Depth = queryRes.Depth + + authEvents := gomatrixserverlib.NewAuthEvents(nil) + + for i := range queryRes.StateEvents { + err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) + if err != nil { + return fmt.Errorf("authEvents.AddEvent: %w", err) + } + } + + refs, err := eventsNeeded.AuthEventReferences(&authEvents) + if err != nil { + return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) + } + + truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) + switch eventFormat { + case gomatrixserverlib.EventFormatV1: + builder.AuthEvents = truncAuth + builder.PrevEvents = truncPrev + case gomatrixserverlib.EventFormatV2: + v2AuthRefs, v2PrevRefs := []string{}, []string{} + for _, ref := range truncAuth { + v2AuthRefs = append(v2AuthRefs, ref.EventID) + } + for _, ref := range truncPrev { + v2PrevRefs = append(v2PrevRefs, ref.EventID) + } + builder.AuthEvents = v2AuthRefs + builder.PrevEvents = v2PrevRefs + } + + return nil +} + +// truncateAuthAndPrevEvents limits the number of events we add into +// an event as prev_events or auth_events. +// NOTSPEC: The limits here feel a bit arbitrary but they are currently +// here because of https://github.com/matrix-org/matrix-doc/issues/2307 +// and because Synapse will just drop events that don't comply. +func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( + truncAuth, truncPrev []gomatrixserverlib.EventReference, +) { + truncAuth, truncPrev = auth, prev + if len(truncAuth) > 10 { + truncAuth = truncAuth[:10] + } + if len(truncPrev) > 20 { + truncPrev = truncPrev[:20] + } + return +} diff --git a/internal/eventutil/types.go b/internal/eventutil/types.go new file mode 100644 index 00000000..6d119ce6 --- /dev/null +++ b/internal/eventutil/types.go @@ -0,0 +1,66 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventutil + +import ( + "errors" + "strconv" +) + +// ErrProfileNoExists is returned when trying to lookup a user's profile that +// doesn't exist locally. +var ErrProfileNoExists = errors.New("no known profile for given user ID") + +// AccountData represents account data sent from the client API server to the +// sync API server +type AccountData struct { + RoomID string `json:"room_id"` + Type string `json:"type"` +} + +// ProfileResponse is a struct containing all known user profile data +type ProfileResponse struct { + AvatarURL string `json:"avatar_url"` + DisplayName string `json:"displayname"` +} + +// AvatarURL is a struct containing only the URL to a user's avatar +type AvatarURL struct { + AvatarURL string `json:"avatar_url"` +} + +// DisplayName is a struct containing only a user's display name +type DisplayName struct { + DisplayName string `json:"displayname"` +} + +// WeakBoolean is a type that will Unmarshal to true or false even if the encoded +// representation is "true"/1 or "false"/0, as well as whatever other forms are +// recognised by strconv.ParseBool +type WeakBoolean bool + +// UnmarshalJSON is overridden here to allow strings vaguely representing a true +// or false boolean to be set as their closest counterpart +func (b *WeakBoolean) UnmarshalJSON(data []byte) error { + result, err := strconv.ParseBool(string(data)) + if err != nil { + return err + } + + // Set boolean value based on string input + *b = WeakBoolean(result) + + return nil +} diff --git a/internal/http/http.go b/internal/http/http.go deleted file mode 100644 index 2b189140..00000000 --- a/internal/http/http.go +++ /dev/null @@ -1,68 +0,0 @@ -package http - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "net/url" - "strings" - - "github.com/matrix-org/dendrite/internal/httpapis" - opentracing "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" -) - -// PostJSON performs a POST request with JSON on an internal HTTP API -func PostJSON( - ctx context.Context, span opentracing.Span, httpClient *http.Client, - apiURL string, request, response interface{}, -) error { - jsonBytes, err := json.Marshal(request) - if err != nil { - return err - } - - parsedAPIURL, err := url.Parse(apiURL) - if err != nil { - return err - } - - parsedAPIURL.Path = httpapis.InternalPathPrefix + strings.TrimLeft(parsedAPIURL.Path, "/") - apiURL = parsedAPIURL.String() - - req, err := http.NewRequest(http.MethodPost, apiURL, bytes.NewReader(jsonBytes)) - if err != nil { - return err - } - - // Mark the span as being an RPC client. - ext.SpanKindRPCClient.Set(span) - carrier := opentracing.HTTPHeadersCarrier(req.Header) - tracer := opentracing.GlobalTracer() - - if err = tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil { - return err - } - - req.Header.Set("Content-Type", "application/json") - - res, err := httpClient.Do(req.WithContext(ctx)) - if res != nil { - defer (func() { err = res.Body.Close() })() - } - if err != nil { - return err - } - if res.StatusCode != http.StatusOK { - var errorBody struct { - Message string `json:"message"` - } - if msgerr := json.NewDecoder(res.Body).Decode(&errorBody); msgerr == nil { - return fmt.Errorf("Internal API: %d from %s: %s", res.StatusCode, apiURL, errorBody.Message) - } - return fmt.Errorf("Internal API: %d from %s", res.StatusCode, apiURL) - } - return json.NewDecoder(res.Body).Decode(response) -} diff --git a/internal/httpapi.go b/internal/httpapi.go deleted file mode 100644 index 991a9861..00000000 --- a/internal/httpapi.go +++ /dev/null @@ -1,275 +0,0 @@ -package internal - -import ( - "context" - "io" - "net/http" - "net/http/httptest" - "net/http/httputil" - "os" - "strings" - "sync" - "time" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/clientapi/auth" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - federationsenderAPI "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/httpapis" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - opentracing "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" -) - -// BasicAuth is used for authorization on /metrics handlers -type BasicAuth struct { - Username string `yaml:"username"` - Password string `yaml:"password"` -} - -// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request. -func MakeAuthAPI( - metricsName string, data auth.Data, - f func(*http.Request, *authtypes.Device) util.JSONResponse, -) http.Handler { - h := func(req *http.Request) util.JSONResponse { - device, err := auth.VerifyUserFromRequest(req, data) - if err != nil { - return *err - } - // add the user ID to the logger - logger := util.GetLogger((req.Context())) - logger = logger.WithField("user_id", device.UserID) - req = req.WithContext(util.ContextWithLogger(req.Context(), logger)) - - return f(req, device) - } - return MakeExternalAPI(metricsName, h) -} - -// MakeExternalAPI turns a util.JSONRequestHandler function into an http.Handler. -// This is used for APIs that are called from the internet. -func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler { - // TODO: We shouldn't be directly reading env vars here, inject it in instead. - // Refactor this when we split out config structs. - verbose := false - if os.Getenv("DENDRITE_TRACE_HTTP") == "1" { - verbose = true - } - h := util.MakeJSONAPI(util.NewJSONRequestHandler(f)) - withSpan := func(w http.ResponseWriter, req *http.Request) { - nextWriter := w - if verbose { - logger := logrus.NewEntry(logrus.StandardLogger()) - // Log outgoing response - rec := httptest.NewRecorder() - nextWriter = rec - defer func() { - resp := rec.Result() - dump, err := httputil.DumpResponse(resp, true) - if err != nil { - logger.Debugf("Failed to dump outgoing response: %s", err) - } else { - strSlice := strings.Split(string(dump), "\n") - for _, s := range strSlice { - logger.Debug(s) - } - } - // copy the response to the client - for hdr, vals := range resp.Header { - for _, val := range vals { - w.Header().Add(hdr, val) - } - } - w.WriteHeader(resp.StatusCode) - // discard errors as this is for debugging - _, _ = io.Copy(w, resp.Body) - _ = resp.Body.Close() - }() - - // Log incoming request - dump, err := httputil.DumpRequest(req, true) - if err != nil { - logger.Debugf("Failed to dump incoming request: %s", err) - } else { - strSlice := strings.Split(string(dump), "\n") - for _, s := range strSlice { - logger.Debug(s) - } - } - } - - span := opentracing.StartSpan(metricsName) - defer span.Finish() - req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) - h.ServeHTTP(nextWriter, req) - - } - - return http.HandlerFunc(withSpan) -} - -// MakeHTMLAPI adds Span metrics to the HTML Handler function -// This is used to serve HTML alongside JSON error messages -func MakeHTMLAPI(metricsName string, f func(http.ResponseWriter, *http.Request) *util.JSONResponse) http.Handler { - withSpan := func(w http.ResponseWriter, req *http.Request) { - span := opentracing.StartSpan(metricsName) - defer span.Finish() - req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) - if err := f(w, req); err != nil { - h := util.MakeJSONAPI(util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return *err - })) - h.ServeHTTP(w, req) - } - } - - return promhttp.InstrumentHandlerCounter( - promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: metricsName, - Help: "Total number of http requests for HTML resources", - }, - []string{"code"}, - ), - http.HandlerFunc(withSpan), - ) -} - -// MakeInternalAPI turns a util.JSONRequestHandler function into an http.Handler. -// This is used for APIs that are internal to dendrite. -// If we are passed a tracing context in the request headers then we use that -// as the parent of any tracing spans we create. -func MakeInternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler { - h := util.MakeJSONAPI(util.NewJSONRequestHandler(f)) - withSpan := func(w http.ResponseWriter, req *http.Request) { - carrier := opentracing.HTTPHeadersCarrier(req.Header) - tracer := opentracing.GlobalTracer() - clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier) - var span opentracing.Span - if err == nil { - // Default to a span without RPC context. - span = tracer.StartSpan(metricsName) - } else { - // Set the RPC context. - span = tracer.StartSpan(metricsName, ext.RPCServerOption(clientContext)) - } - defer span.Finish() - req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) - h.ServeHTTP(w, req) - } - - return http.HandlerFunc(withSpan) -} - -// MakeFedAPI makes an http.Handler that checks matrix federation authentication. -func MakeFedAPI( - metricsName string, - serverName gomatrixserverlib.ServerName, - keyRing gomatrixserverlib.KeyRing, - wakeup *FederationWakeups, - f func(*http.Request, *gomatrixserverlib.FederationRequest, map[string]string) util.JSONResponse, -) http.Handler { - h := func(req *http.Request) util.JSONResponse { - fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( - req, time.Now(), serverName, keyRing, - ) - if fedReq == nil { - return errResp - } - go wakeup.Wakeup(req.Context(), fedReq.Origin()) - vars, err := URLDecodeMapValues(mux.Vars(req)) - if err != nil { - return util.ErrorResponse(err) - } - - return f(req, fedReq, vars) - } - return MakeExternalAPI(metricsName, h) -} - -type FederationWakeups struct { - FsAPI federationsenderAPI.FederationSenderInternalAPI - origins sync.Map -} - -func (f *FederationWakeups) Wakeup(ctx context.Context, origin gomatrixserverlib.ServerName) { - key, keyok := f.origins.Load(origin) - if keyok { - lastTime, ok := key.(time.Time) - if ok && time.Since(lastTime) < time.Minute { - return - } - } - aliveReq := federationsenderAPI.PerformServersAliveRequest{ - Servers: []gomatrixserverlib.ServerName{origin}, - } - aliveRes := federationsenderAPI.PerformServersAliveResponse{} - if err := f.FsAPI.PerformServersAlive(ctx, &aliveReq, &aliveRes); err != nil { - util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ - "origin": origin, - }).Warn("incoming federation request failed to notify server alive") - } else { - f.origins.Store(origin, time.Now()) - } -} - -// SetupHTTPAPI registers an HTTP API mux under /api and sets up a metrics -// listener. -func SetupHTTPAPI(servMux *http.ServeMux, publicApiMux *mux.Router, internalApiMux *mux.Router, cfg *config.Dendrite, enableHTTPAPIs bool) { - if cfg.Metrics.Enabled { - servMux.Handle("/metrics", WrapHandlerInBasicAuth(promhttp.Handler(), cfg.Metrics.BasicAuth)) - } - if enableHTTPAPIs { - servMux.Handle(httpapis.InternalPathPrefix, internalApiMux) - } - servMux.Handle(httpapis.PublicPathPrefix, WrapHandlerInCORS(publicApiMux)) -} - -// WrapHandlerInBasicAuth adds basic auth to a handler. Only used for /metrics -func WrapHandlerInBasicAuth(h http.Handler, b BasicAuth) http.HandlerFunc { - if b.Username == "" || b.Password == "" { - logrus.Warn("Metrics are exposed without protection. Make sure you set up protection at proxy level.") - } - return func(w http.ResponseWriter, r *http.Request) { - // Serve without authorization if either Username or Password is unset - if b.Username == "" || b.Password == "" { - h.ServeHTTP(w, r) - return - } - user, pass, ok := r.BasicAuth() - - if !ok || user != b.Username || pass != b.Password { - http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden) - return - } - h.ServeHTTP(w, r) - } -} - -// WrapHandlerInCORS adds CORS headers to all responses, including all error -// responses. -// Handles OPTIONS requests directly. -func WrapHandlerInCORS(h http.Handler) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization") - - if r.Method == http.MethodOptions && r.Header.Get("Access-Control-Request-Method") != "" { - // Its easiest just to always return a 200 OK for everything. Whether - // this is technically correct or not is a question, but in the end this - // is what a lot of other people do (including synapse) and the clients - // are perfectly happy with it. - w.WriteHeader(http.StatusOK) - } else { - h.ServeHTTP(w, r) - } - }) -} diff --git a/internal/httpapi_test.go b/internal/httpapi_test.go deleted file mode 100644 index 6f159c8d..00000000 --- a/internal/httpapi_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package internal - -import ( - "net/http" - "net/http/httptest" - "testing" -) - -func TestWrapHandlerInBasicAuth(t *testing.T) { - type args struct { - h http.Handler - b BasicAuth - } - - dummyHandler := http.HandlerFunc(func(h http.ResponseWriter, r *http.Request) { - h.WriteHeader(http.StatusOK) - }) - - tests := []struct { - name string - args args - want int - reqAuth bool - }{ - { - name: "no user or password setup", - args: args{h: dummyHandler}, - want: http.StatusOK, - reqAuth: false, - }, - { - name: "only user set", - args: args{ - h: dummyHandler, - b: BasicAuth{Username: "test"}, // no basic auth - }, - want: http.StatusOK, - reqAuth: false, - }, - { - name: "only pass set", - args: args{ - h: dummyHandler, - b: BasicAuth{Password: "test"}, // no basic auth - }, - want: http.StatusOK, - reqAuth: false, - }, - { - name: "credentials correct", - args: args{ - h: dummyHandler, - b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled - }, - want: http.StatusOK, - reqAuth: true, - }, - { - name: "credentials wrong", - args: args{ - h: dummyHandler, - b: BasicAuth{Username: "test1", Password: "test"}, // basic auth enabled - }, - want: http.StatusForbidden, - reqAuth: true, - }, - { - name: "no basic auth in request", - args: args{ - h: dummyHandler, - b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled - }, - want: http.StatusForbidden, - reqAuth: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - baHandler := WrapHandlerInBasicAuth(tt.args.h, tt.args.b) - - req := httptest.NewRequest("GET", "http://localhost/metrics", nil) - if tt.reqAuth { - req.SetBasicAuth("test", "test") - } - - w := httptest.NewRecorder() - baHandler(w, req) - resp := w.Result() - - if resp.StatusCode != tt.want { - t.Errorf("Expected status code %d, got %d", resp.StatusCode, tt.want) - } - }) - } -} diff --git a/internal/httpapis/paths.go b/internal/httpapis/paths.go deleted file mode 100644 index 8adec2df..00000000 --- a/internal/httpapis/paths.go +++ /dev/null @@ -1,6 +0,0 @@ -package httpapis - -const ( - PublicPathPrefix = "/_matrix/" - InternalPathPrefix = "/api/" -) diff --git a/internal/httputil/http.go b/internal/httputil/http.go new file mode 100644 index 00000000..9197371a --- /dev/null +++ b/internal/httputil/http.go @@ -0,0 +1,81 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httputil + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +// PostJSON performs a POST request with JSON on an internal HTTP API +func PostJSON( + ctx context.Context, span opentracing.Span, httpClient *http.Client, + apiURL string, request, response interface{}, +) error { + jsonBytes, err := json.Marshal(request) + if err != nil { + return err + } + + parsedAPIURL, err := url.Parse(apiURL) + if err != nil { + return err + } + + parsedAPIURL.Path = InternalPathPrefix + strings.TrimLeft(parsedAPIURL.Path, "/") + apiURL = parsedAPIURL.String() + + req, err := http.NewRequest(http.MethodPost, apiURL, bytes.NewReader(jsonBytes)) + if err != nil { + return err + } + + // Mark the span as being an RPC client. + ext.SpanKindRPCClient.Set(span) + carrier := opentracing.HTTPHeadersCarrier(req.Header) + tracer := opentracing.GlobalTracer() + + if err = tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + res, err := httpClient.Do(req.WithContext(ctx)) + if res != nil { + defer (func() { err = res.Body.Close() })() + } + if err != nil { + return err + } + if res.StatusCode != http.StatusOK { + var errorBody struct { + Message string `json:"message"` + } + if msgerr := json.NewDecoder(res.Body).Decode(&errorBody); msgerr == nil { + return fmt.Errorf("Internal API: %d from %s: %s", res.StatusCode, apiURL, errorBody.Message) + } + return fmt.Errorf("Internal API: %d from %s", res.StatusCode, apiURL) + } + return json.NewDecoder(res.Body).Decode(response) +} diff --git a/internal/httputil/httpapi.go b/internal/httputil/httpapi.go new file mode 100644 index 00000000..0a37f06c --- /dev/null +++ b/internal/httputil/httpapi.go @@ -0,0 +1,288 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httputil + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "net/http/httputil" + "os" + "strings" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + federationsenderAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +// BasicAuth is used for authorization on /metrics handlers +type BasicAuth struct { + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request. +func MakeAuthAPI( + metricsName string, data auth.Data, + f func(*http.Request, *authtypes.Device) util.JSONResponse, +) http.Handler { + h := func(req *http.Request) util.JSONResponse { + device, err := auth.VerifyUserFromRequest(req, data) + if err != nil { + return *err + } + // add the user ID to the logger + logger := util.GetLogger((req.Context())) + logger = logger.WithField("user_id", device.UserID) + req = req.WithContext(util.ContextWithLogger(req.Context(), logger)) + + return f(req, device) + } + return MakeExternalAPI(metricsName, h) +} + +// MakeExternalAPI turns a util.JSONRequestHandler function into an http.Handler. +// This is used for APIs that are called from the internet. +func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler { + // TODO: We shouldn't be directly reading env vars here, inject it in instead. + // Refactor this when we split out config structs. + verbose := false + if os.Getenv("DENDRITE_TRACE_HTTP") == "1" { + verbose = true + } + h := util.MakeJSONAPI(util.NewJSONRequestHandler(f)) + withSpan := func(w http.ResponseWriter, req *http.Request) { + nextWriter := w + if verbose { + logger := logrus.NewEntry(logrus.StandardLogger()) + // Log outgoing response + rec := httptest.NewRecorder() + nextWriter = rec + defer func() { + resp := rec.Result() + dump, err := httputil.DumpResponse(resp, true) + if err != nil { + logger.Debugf("Failed to dump outgoing response: %s", err) + } else { + strSlice := strings.Split(string(dump), "\n") + for _, s := range strSlice { + logger.Debug(s) + } + } + // copy the response to the client + for hdr, vals := range resp.Header { + for _, val := range vals { + w.Header().Add(hdr, val) + } + } + w.WriteHeader(resp.StatusCode) + // discard errors as this is for debugging + _, _ = io.Copy(w, resp.Body) + _ = resp.Body.Close() + }() + + // Log incoming request + dump, err := httputil.DumpRequest(req, true) + if err != nil { + logger.Debugf("Failed to dump incoming request: %s", err) + } else { + strSlice := strings.Split(string(dump), "\n") + for _, s := range strSlice { + logger.Debug(s) + } + } + } + + span := opentracing.StartSpan(metricsName) + defer span.Finish() + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) + h.ServeHTTP(nextWriter, req) + + } + + return http.HandlerFunc(withSpan) +} + +// MakeHTMLAPI adds Span metrics to the HTML Handler function +// This is used to serve HTML alongside JSON error messages +func MakeHTMLAPI(metricsName string, f func(http.ResponseWriter, *http.Request) *util.JSONResponse) http.Handler { + withSpan := func(w http.ResponseWriter, req *http.Request) { + span := opentracing.StartSpan(metricsName) + defer span.Finish() + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) + if err := f(w, req); err != nil { + h := util.MakeJSONAPI(util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { + return *err + })) + h.ServeHTTP(w, req) + } + } + + return promhttp.InstrumentHandlerCounter( + promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: metricsName, + Help: "Total number of http requests for HTML resources", + }, + []string{"code"}, + ), + http.HandlerFunc(withSpan), + ) +} + +// MakeInternalAPI turns a util.JSONRequestHandler function into an http.Handler. +// This is used for APIs that are internal to dendrite. +// If we are passed a tracing context in the request headers then we use that +// as the parent of any tracing spans we create. +func MakeInternalAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler { + h := util.MakeJSONAPI(util.NewJSONRequestHandler(f)) + withSpan := func(w http.ResponseWriter, req *http.Request) { + carrier := opentracing.HTTPHeadersCarrier(req.Header) + tracer := opentracing.GlobalTracer() + clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier) + var span opentracing.Span + if err == nil { + // Default to a span without RPC context. + span = tracer.StartSpan(metricsName) + } else { + // Set the RPC context. + span = tracer.StartSpan(metricsName, ext.RPCServerOption(clientContext)) + } + defer span.Finish() + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) + h.ServeHTTP(w, req) + } + + return http.HandlerFunc(withSpan) +} + +// MakeFedAPI makes an http.Handler that checks matrix federation authentication. +func MakeFedAPI( + metricsName string, + serverName gomatrixserverlib.ServerName, + keyRing gomatrixserverlib.KeyRing, + wakeup *FederationWakeups, + f func(*http.Request, *gomatrixserverlib.FederationRequest, map[string]string) util.JSONResponse, +) http.Handler { + h := func(req *http.Request) util.JSONResponse { + fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( + req, time.Now(), serverName, keyRing, + ) + if fedReq == nil { + return errResp + } + go wakeup.Wakeup(req.Context(), fedReq.Origin()) + vars, err := URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + + return f(req, fedReq, vars) + } + return MakeExternalAPI(metricsName, h) +} + +type FederationWakeups struct { + FsAPI federationsenderAPI.FederationSenderInternalAPI + origins sync.Map +} + +func (f *FederationWakeups) Wakeup(ctx context.Context, origin gomatrixserverlib.ServerName) { + key, keyok := f.origins.Load(origin) + if keyok { + lastTime, ok := key.(time.Time) + if ok && time.Since(lastTime) < time.Minute { + return + } + } + aliveReq := federationsenderAPI.PerformServersAliveRequest{ + Servers: []gomatrixserverlib.ServerName{origin}, + } + aliveRes := federationsenderAPI.PerformServersAliveResponse{} + if err := f.FsAPI.PerformServersAlive(ctx, &aliveReq, &aliveRes); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "origin": origin, + }).Warn("incoming federation request failed to notify server alive") + } else { + f.origins.Store(origin, time.Now()) + } +} + +// SetupHTTPAPI registers an HTTP API mux under /api and sets up a metrics +// listener. +func SetupHTTPAPI(servMux *http.ServeMux, publicApiMux *mux.Router, internalApiMux *mux.Router, cfg *config.Dendrite, enableHTTPAPIs bool) { + if cfg.Metrics.Enabled { + servMux.Handle("/metrics", WrapHandlerInBasicAuth(promhttp.Handler(), cfg.Metrics.BasicAuth)) + } + if enableHTTPAPIs { + servMux.Handle(InternalPathPrefix, internalApiMux) + } + servMux.Handle(PublicPathPrefix, WrapHandlerInCORS(publicApiMux)) +} + +// WrapHandlerInBasicAuth adds basic auth to a handler. Only used for /metrics +func WrapHandlerInBasicAuth(h http.Handler, b BasicAuth) http.HandlerFunc { + if b.Username == "" || b.Password == "" { + logrus.Warn("Metrics are exposed without protection. Make sure you set up protection at proxy level.") + } + return func(w http.ResponseWriter, r *http.Request) { + // Serve without authorization if either Username or Password is unset + if b.Username == "" || b.Password == "" { + h.ServeHTTP(w, r) + return + } + user, pass, ok := r.BasicAuth() + + if !ok || user != b.Username || pass != b.Password { + http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden) + return + } + h.ServeHTTP(w, r) + } +} + +// WrapHandlerInCORS adds CORS headers to all responses, including all error +// responses. +// Handles OPTIONS requests directly. +func WrapHandlerInCORS(h http.Handler) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization") + + if r.Method == http.MethodOptions && r.Header.Get("Access-Control-Request-Method") != "" { + // Its easiest just to always return a 200 OK for everything. Whether + // this is technically correct or not is a question, but in the end this + // is what a lot of other people do (including synapse) and the clients + // are perfectly happy with it. + w.WriteHeader(http.StatusOK) + } else { + h.ServeHTTP(w, r) + } + }) +} diff --git a/internal/httputil/httpapi_test.go b/internal/httputil/httpapi_test.go new file mode 100644 index 00000000..de6ccf9b --- /dev/null +++ b/internal/httputil/httpapi_test.go @@ -0,0 +1,109 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httputil + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestWrapHandlerInBasicAuth(t *testing.T) { + type args struct { + h http.Handler + b BasicAuth + } + + dummyHandler := http.HandlerFunc(func(h http.ResponseWriter, r *http.Request) { + h.WriteHeader(http.StatusOK) + }) + + tests := []struct { + name string + args args + want int + reqAuth bool + }{ + { + name: "no user or password setup", + args: args{h: dummyHandler}, + want: http.StatusOK, + reqAuth: false, + }, + { + name: "only user set", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test"}, // no basic auth + }, + want: http.StatusOK, + reqAuth: false, + }, + { + name: "only pass set", + args: args{ + h: dummyHandler, + b: BasicAuth{Password: "test"}, // no basic auth + }, + want: http.StatusOK, + reqAuth: false, + }, + { + name: "credentials correct", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled + }, + want: http.StatusOK, + reqAuth: true, + }, + { + name: "credentials wrong", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test1", Password: "test"}, // basic auth enabled + }, + want: http.StatusForbidden, + reqAuth: true, + }, + { + name: "no basic auth in request", + args: args{ + h: dummyHandler, + b: BasicAuth{Username: "test", Password: "test"}, // basic auth enabled + }, + want: http.StatusForbidden, + reqAuth: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baHandler := WrapHandlerInBasicAuth(tt.args.h, tt.args.b) + + req := httptest.NewRequest("GET", "http://localhost/metrics", nil) + if tt.reqAuth { + req.SetBasicAuth("test", "test") + } + + w := httptest.NewRecorder() + baHandler(w, req) + resp := w.Result() + + if resp.StatusCode != tt.want { + t.Errorf("Expected status code %d, got %d", resp.StatusCode, tt.want) + } + }) + } +} diff --git a/internal/httputil/paths.go b/internal/httputil/paths.go new file mode 100644 index 00000000..728b5a87 --- /dev/null +++ b/internal/httputil/paths.go @@ -0,0 +1,20 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httputil + +const ( + PublicPathPrefix = "/_matrix/" + InternalPathPrefix = "/api/" +) diff --git a/internal/httputil/routing.go b/internal/httputil/routing.go new file mode 100644 index 00000000..0bd3655e --- /dev/null +++ b/internal/httputil/routing.go @@ -0,0 +1,35 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httputil + +import ( + "net/url" +) + +// URLDecodeMapValues is a function that iterates through each of the items in a +// map, URL decodes the value, and returns a new map with the decoded values +// under the same key names +func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) { + decoded := make(map[string]string, len(vmap)) + for key, value := range vmap { + decodedVal, err := url.PathUnescape(value) + if err != nil { + return make(map[string]string), err + } + decoded[key] = decodedVal + } + + return decoded, nil +} diff --git a/internal/partition_offset_table.go b/internal/partition_offset_table.go deleted file mode 100644 index 8b72819b..00000000 --- a/internal/partition_offset_table.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "context" - "database/sql" - "strings" -) - -const partitionOffsetsSchema = ` --- The offsets that the server has processed up to. -CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets ( - -- The name of the topic. - topic TEXT NOT NULL, - -- The 32-bit partition ID - partition INTEGER NOT NULL, - -- The 64-bit offset. - partition_offset BIGINT NOT NULL, - UNIQUE (topic, partition) -); -` - -const selectPartitionOffsetsSQL = "" + - "SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1" - -const upsertPartitionOffsetsSQL = "" + - "INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + - " ON CONFLICT (topic, partition)" + - " DO UPDATE SET partition_offset = $3" - -// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. -type PartitionOffsetStatements struct { - selectPartitionOffsetsStmt *sql.Stmt - upsertPartitionOffsetStmt *sql.Stmt -} - -// Prepare converts the raw SQL statements into prepared statements. -// Takes a prefix to prepend to the table name used to store the partition offsets. -// This allows multiple components to share the same database schema. -func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) { - _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) - if err != nil { - return - } - if s.selectPartitionOffsetsStmt, err = db.Prepare( - strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1), - ); err != nil { - return - } - if s.upsertPartitionOffsetStmt, err = db.Prepare( - strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1), - ); err != nil { - return - } - return -} - -// PartitionOffsets implements PartitionStorer -func (s *PartitionOffsetStatements) PartitionOffsets( - ctx context.Context, topic string, -) ([]PartitionOffset, error) { - return s.selectPartitionOffsets(ctx, topic) -} - -// SetPartitionOffset implements PartitionStorer -func (s *PartitionOffsetStatements) SetPartitionOffset( - ctx context.Context, topic string, partition int32, offset int64, -) error { - return s.upsertPartitionOffset(ctx, topic, partition, offset) -} - -// selectPartitionOffsets returns all the partition offsets for the given topic. -func (s *PartitionOffsetStatements) selectPartitionOffsets( - ctx context.Context, topic string, -) ([]PartitionOffset, error) { - rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic) - if err != nil { - return nil, err - } - defer CloseAndLogIfError(ctx, rows, "selectPartitionOffsets: rows.close() failed") - var results []PartitionOffset - for rows.Next() { - var offset PartitionOffset - if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { - return nil, err - } - results = append(results, offset) - } - return results, rows.Err() -} - -// UpsertPartitionOffset updates or inserts the partition offset for the given topic. -func (s *PartitionOffsetStatements) upsertPartitionOffset( - ctx context.Context, topic string, partition int32, offset int64, -) error { - _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) - return err -} diff --git a/internal/postgres.go b/internal/postgres.go deleted file mode 100644 index 7ae40d8f..00000000 --- a/internal/postgres.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build !wasm - -package internal - -import "github.com/lib/pq" - -// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error -func IsUniqueConstraintViolationErr(err error) bool { - pqErr, ok := err.(*pq.Error) - return ok && pqErr.Code == "23505" -} diff --git a/internal/postgres_wasm.go b/internal/postgres_wasm.go deleted file mode 100644 index 64d32829..00000000 --- a/internal/postgres_wasm.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build wasm - -package internal - -// IsUniqueConstraintViolationErr no-ops for this architecture -func IsUniqueConstraintViolationErr(err error) bool { - return false -} diff --git a/internal/routing.go b/internal/routing.go deleted file mode 100644 index 4462c70c..00000000 --- a/internal/routing.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2019 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "net/url" -) - -// URLDecodeMapValues is a function that iterates through each of the items in a -// map, URL decodes the value, and returns a new map with the decoded values -// under the same key names -func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) { - decoded := make(map[string]string, len(vmap)) - for key, value := range vmap { - decodedVal, err := url.PathUnescape(value) - if err != nil { - return make(map[string]string), err - } - decoded[key] = decodedVal - } - - return decoded, nil -} diff --git a/internal/setup/base.go b/internal/setup/base.go new file mode 100644 index 00000000..fb304893 --- /dev/null +++ b/internal/setup/base.go @@ -0,0 +1,316 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package setup + +import ( + "database/sql" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/naffka" + + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/internal" + + "github.com/Shopify/sarama" + "github.com/gorilla/mux" + + appserviceAPI "github.com/matrix-org/dendrite/appservice/api" + asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" + eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp" + "github.com/matrix-org/dendrite/internal/config" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp" + serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api" + skinthttp "github.com/matrix-org/dendrite/serverkeyapi/inthttp" + "github.com/sirupsen/logrus" + + _ "net/http/pprof" +) + +// BaseDendrite is a base for creating new instances of dendrite. It parses +// command line flags and config, and exposes methods for creating various +// resources. All errors are handled by logging then exiting, so all methods +// should only be used during start up. +// Must be closed when shutting down. +type BaseDendrite struct { + componentName string + tracerCloser io.Closer + + // PublicAPIMux should be used to register new public matrix api endpoints + PublicAPIMux *mux.Router + InternalAPIMux *mux.Router + UseHTTPAPIs bool + httpClient *http.Client + Cfg *config.Dendrite + Caches *caching.Caches + KafkaConsumer sarama.Consumer + KafkaProducer sarama.SyncProducer +} + +const HTTPServerTimeout = time.Minute * 5 +const HTTPClientTimeout = time.Second * 30 + +// NewBaseDendrite creates a new instance to be used by a component. +// The componentName is used for logging purposes, and should be a friendly name +// of the compontent running, e.g. "SyncAPI" +func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite { + internal.SetupStdLogging() + internal.SetupHookLogging(cfg.Logging, componentName) + internal.SetupPprof() + + closer, err := cfg.SetupTracing("Dendrite" + componentName) + if err != nil { + logrus.WithError(err).Panicf("failed to start opentracing") + } + + var kafkaConsumer sarama.Consumer + var kafkaProducer sarama.SyncProducer + if cfg.Kafka.UseNaffka { + kafkaConsumer, kafkaProducer = setupNaffka(cfg) + } else { + kafkaConsumer, kafkaProducer = setupKafka(cfg) + } + + cache, err := caching.NewInMemoryLRUCache() + if err != nil { + logrus.WithError(err).Warnf("Failed to create cache") + } + + client := http.Client{Timeout: HTTPClientTimeout} + if cfg.Proxy != nil { + client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{ + Scheme: cfg.Proxy.Protocol, + Host: fmt.Sprintf("%s:%d", cfg.Proxy.Host, cfg.Proxy.Port), + })} + } + + // Ideally we would only use SkipClean on routes which we know can allow '/' but due to + // https://github.com/gorilla/mux/issues/460 we have to attach this at the top router. + // When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing + // path parameters: + // /foo/bar%2Fbaz == [foo, bar%2Fbaz] (from UseEncodedPath) + // /foo/bar%2F%2Fbaz == [foo, bar%2F%2Fbaz] (from SkipClean) + // In particular, rooms v3 event IDs are not urlsafe and can include '/' and because they + // are randomly generated it results in flakey tests. + // We need to be careful with media APIs if they read from a filesystem to make sure they + // are not inadvertently reading paths without cleaning, else this could introduce a + // directory traversal attack e.g /../../../etc/passwd + httpmux := mux.NewRouter().SkipClean(true) + + return &BaseDendrite{ + componentName: componentName, + UseHTTPAPIs: useHTTPAPIs, + tracerCloser: closer, + Cfg: cfg, + Caches: cache, + PublicAPIMux: httpmux.PathPrefix(httputil.PublicPathPrefix).Subrouter().UseEncodedPath(), + InternalAPIMux: httpmux.PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), + httpClient: &client, + KafkaConsumer: kafkaConsumer, + KafkaProducer: kafkaProducer, + } +} + +// Close implements io.Closer +func (b *BaseDendrite) Close() error { + return b.tracerCloser.Close() +} + +// AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP. +func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI { + a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed") + } + return a +} + +// RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP. +func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI { + rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.Caches) + if err != nil { + logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.httpClient) + } + return rsAPI +} + +// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP +func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI { + e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("EDUServerClient failed", b.httpClient) + } + return e +} + +// FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting +// the federation sender over HTTP +func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI { + f, err := fsinthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("FederationSenderHTTPClient failed", b.httpClient) + } + return f +} + +// ServerKeyAPIClient returns ServerKeyInternalAPI for hitting the server key API over HTTP +func (b *BaseDendrite) ServerKeyAPIClient() serverKeyAPI.ServerKeyInternalAPI { + f, err := skinthttp.NewServerKeyClient( + b.Cfg.ServerKeyAPIURL(), + b.httpClient, + b.Caches, + ) + if err != nil { + logrus.WithError(err).Panic("NewServerKeyInternalAPIHTTP failed", b.httpClient) + } + return f +} + +// CreateDeviceDB creates a new instance of the device database. Should only be +// called once per component. +func (b *BaseDendrite) CreateDeviceDB() devices.Database { + db, err := devices.NewDatabase(string(b.Cfg.Database.Device), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to devices db") + } + + return db +} + +// CreateAccountsDB creates a new instance of the accounts database. Should only +// be called once per component. +func (b *BaseDendrite) CreateAccountsDB() accounts.Database { + db, err := accounts.NewDatabase(string(b.Cfg.Database.Account), b.Cfg.DbProperties(), b.Cfg.Matrix.ServerName) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to accounts db") + } + + return db +} + +// CreateFederationClient creates a new federation client. Should only be called +// once per component. +func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationClient { + return gomatrixserverlib.NewFederationClient( + b.Cfg.Matrix.ServerName, b.Cfg.Matrix.KeyID, b.Cfg.Matrix.PrivateKey, + ) +} + +// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on +// ApiMux under /api/ and adds a prometheus handler under /metrics. +func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) { + // If a separate bind address is defined, listen on that. Otherwise use + // the listen address + var addr string + if bindaddr != "" { + addr = bindaddr + } else { + addr = listenaddr + } + + serv := http.Server{ + Addr: addr, + WriteTimeout: HTTPServerTimeout, + } + + httputil.SetupHTTPAPI( + http.DefaultServeMux, + b.PublicAPIMux, + b.InternalAPIMux, + b.Cfg, + b.UseHTTPAPIs, + ) + logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr) + + err := serv.ListenAndServe() + if err != nil { + logrus.WithError(err).Fatal("failed to serve http") + } + + logrus.Infof("Stopped %s server on %s", b.componentName, serv.Addr) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { + var err error + var db *sql.DB + var naffkaDB *naffka.DatabaseImpl + + uri, err := url.Parse(string(cfg.Database.Naffka)) + if err != nil || uri.Scheme == "file" { + var cs string + cs, err = sqlutil.ParseFileURI(string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to parse naffka database file URI") + } + db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewSqliteDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } else { + db, err = sqlutil.Open("postgres", string(cfg.Database.Naffka), nil) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewPostgresqlDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } + + if naffkaDB == nil { + panic("naffka connection string not understood") + } + + naff, err := naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + + return naff, naff +} diff --git a/internal/setup/flags.go b/internal/setup/flags.go new file mode 100644 index 00000000..e4fc58d6 --- /dev/null +++ b/internal/setup/flags.go @@ -0,0 +1,42 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package setup + +import ( + "flag" + + "github.com/matrix-org/dendrite/internal/config" + + "github.com/sirupsen/logrus" +) + +var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") + +// ParseFlags parses the commandline flags and uses them to create a config. +func ParseFlags(monolith bool) *config.Dendrite { + flag.Parse() + + if *configPath == "" { + logrus.Fatal("--config must be supplied") + } + + cfg, err := config.Load(*configPath, monolith) + + if err != nil { + logrus.Fatalf("Invalid config file: %s", err) + } + + return cfg +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index 35fcd311..55ceffd6 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package setup import ( diff --git a/internal/sql.go b/internal/sql.go deleted file mode 100644 index e3c10afc..00000000 --- a/internal/sql.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "database/sql" - "errors" - "fmt" - "runtime" - "time" - - "go.uber.org/atomic" -) - -// ErrUserExists is returned if a username already exists in the database. -var ErrUserExists = errors.New("Username already exists") - -// A Transaction is something that can be committed or rolledback. -type Transaction interface { - // Commit the transaction - Commit() error - // Rollback the transaction. - Rollback() error -} - -// EndTransaction ends a transaction. -// If the transaction succeeded then it is committed, otherwise it is rolledback. -// You MUST check the error returned from this function to be sure that the transaction -// was applied correctly. For example, 'database is locked' errors in sqlite will happen here. -func EndTransaction(txn Transaction, succeeded *bool) error { - if *succeeded { - return txn.Commit() // nolint: errcheck - } else { - return txn.Rollback() // nolint: errcheck - } -} - -// WithTransaction runs a block of code passing in an SQL transaction -// If the code returns an error or panics then the transactions is rolledback -// Otherwise the transaction is committed. -func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { - txn, err := db.Begin() - if err != nil { - return - } - succeeded := false - defer func() { - err2 := EndTransaction(txn, &succeeded) - if err == nil && err2 != nil { // failed to commit/rollback - err = err2 - } - }() - - err = fn(txn) - if err != nil { - return - } - - succeeded = true - return -} - -// TxStmt wraps an SQL stmt inside an optional transaction. -// If the transaction is nil then it returns the original statement that will -// run outside of a transaction. -// Otherwise returns a copy of the statement that will run inside the transaction. -func TxStmt(transaction *sql.Tx, statement *sql.Stmt) *sql.Stmt { - if transaction != nil { - statement = transaction.Stmt(statement) - } - return statement -} - -// Hack of the century -func QueryVariadic(count int) string { - return QueryVariadicOffset(count, 0) -} - -func QueryVariadicOffset(count, offset int) string { - str := "(" - for i := 0; i < count; i++ { - str += fmt.Sprintf("$%d", i+offset+1) - if i < (count - 1) { - str += ", " - } - } - str += ")" - return str -} - -func SQLiteDriverName() string { - if runtime.GOOS == "js" { - return "sqlite3_js" - } - return "sqlite3" -} - -// DbProperties functions return properties used by database/sql/DB -type DbProperties interface { - MaxIdleConns() int - MaxOpenConns() int - ConnMaxLifetime() time.Duration -} - -// TransactionWriter allows queuing database writes so that you don't -// contend on database locks in, e.g. SQLite. Only one task will run -// at a time on a given TransactionWriter. -type TransactionWriter struct { - running atomic.Bool - todo chan transactionWriterTask -} - -func NewTransactionWriter() *TransactionWriter { - return &TransactionWriter{ - todo: make(chan transactionWriterTask), - } -} - -// transactionWriterTask represents a specific task. -type transactionWriterTask struct { - db *sql.DB - f func(txn *sql.Tx) error - wait chan error -} - -// Do queues a task to be run by a TransactionWriter. The function -// provided will be ran within a transaction as supplied by the -// database parameter. This will block until the task is finished. -func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error { - if w.todo == nil { - return errors.New("not initialised") - } - if !w.running.Load() { - go w.run() - } - task := transactionWriterTask{ - db: db, - f: f, - wait: make(chan error, 1), - } - w.todo <- task - return <-task.wait -} - -// run processes the tasks for a given transaction writer. Only one -// of these goroutines will run at a time. A transaction will be -// opened using the database object from the task and then this will -// be passed as a parameter to the task function. -func (w *TransactionWriter) run() { - if !w.running.CAS(false, true) { - return - } - defer w.running.Store(false) - for task := range w.todo { - task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { - return task.f(txn) - }) - close(task.wait) - } -} diff --git a/internal/sqlutil/partition_offset_table.go b/internal/sqlutil/partition_offset_table.go new file mode 100644 index 00000000..34882902 --- /dev/null +++ b/internal/sqlutil/partition_offset_table.go @@ -0,0 +1,126 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlutil + +import ( + "context" + "database/sql" + "strings" + + "github.com/matrix-org/util" +) + +// A PartitionOffset is the offset into a partition of the input log. +type PartitionOffset struct { + // The ID of the partition. + Partition int32 + // The offset into the partition. + Offset int64 +} + +const partitionOffsetsSchema = ` +-- The offsets that the server has processed up to. +CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets ( + -- The name of the topic. + topic TEXT NOT NULL, + -- The 32-bit partition ID + partition INTEGER NOT NULL, + -- The 64-bit offset. + partition_offset BIGINT NOT NULL, + UNIQUE (topic, partition) +); +` + +const selectPartitionOffsetsSQL = "" + + "SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1" + +const upsertPartitionOffsetsSQL = "" + + "INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + + " ON CONFLICT (topic, partition)" + + " DO UPDATE SET partition_offset = $3" + +// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. +type PartitionOffsetStatements struct { + selectPartitionOffsetsStmt *sql.Stmt + upsertPartitionOffsetStmt *sql.Stmt +} + +// Prepare converts the raw SQL statements into prepared statements. +// Takes a prefix to prepend to the table name used to store the partition offsets. +// This allows multiple components to share the same database schema. +func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) { + _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) + if err != nil { + return + } + if s.selectPartitionOffsetsStmt, err = db.Prepare( + strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1), + ); err != nil { + return + } + if s.upsertPartitionOffsetStmt, err = db.Prepare( + strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1), + ); err != nil { + return + } + return +} + +// PartitionOffsets implements PartitionStorer +func (s *PartitionOffsetStatements) PartitionOffsets( + ctx context.Context, topic string, +) ([]PartitionOffset, error) { + return s.selectPartitionOffsets(ctx, topic) +} + +// SetPartitionOffset implements PartitionStorer +func (s *PartitionOffsetStatements) SetPartitionOffset( + ctx context.Context, topic string, partition int32, offset int64, +) error { + return s.upsertPartitionOffset(ctx, topic, partition, offset) +} + +// selectPartitionOffsets returns all the partition offsets for the given topic. +func (s *PartitionOffsetStatements) selectPartitionOffsets( + ctx context.Context, topic string, +) ([]PartitionOffset, error) { + rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic) + if err != nil { + return nil, err + } + defer func() { + err2 := rows.Close() + if err2 != nil { + util.GetLogger(ctx).WithError(err2).Error("selectPartitionOffsets: rows.close() failed") + } + }() + var results []PartitionOffset + for rows.Next() { + var offset PartitionOffset + if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { + return nil, err + } + results = append(results, offset) + } + return results, rows.Err() +} + +// UpsertPartitionOffset updates or inserts the partition offset for the given topic. +func (s *PartitionOffsetStatements) upsertPartitionOffset( + ctx context.Context, topic string, partition int32, offset int64, +) error { + _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) + return err +} diff --git a/internal/sqlutil/postgres.go b/internal/sqlutil/postgres.go new file mode 100644 index 00000000..41a5508a --- /dev/null +++ b/internal/sqlutil/postgres.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !wasm + +package sqlutil + +import "github.com/lib/pq" + +// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error +func IsUniqueConstraintViolationErr(err error) bool { + pqErr, ok := err.(*pq.Error) + return ok && pqErr.Code == "23505" +} diff --git a/internal/sqlutil/postgres_wasm.go b/internal/sqlutil/postgres_wasm.go new file mode 100644 index 00000000..c45842f0 --- /dev/null +++ b/internal/sqlutil/postgres_wasm.go @@ -0,0 +1,22 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build wasm + +package sqlutil + +// IsUniqueConstraintViolationErr no-ops for this architecture +func IsUniqueConstraintViolationErr(err error) bool { + return false +} diff --git a/internal/sqlutil/sql.go b/internal/sqlutil/sql.go new file mode 100644 index 00000000..a25a4a5b --- /dev/null +++ b/internal/sqlutil/sql.go @@ -0,0 +1,172 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlutil + +import ( + "database/sql" + "errors" + "fmt" + "runtime" + "time" + + "go.uber.org/atomic" +) + +// ErrUserExists is returned if a username already exists in the database. +var ErrUserExists = errors.New("Username already exists") + +// A Transaction is something that can be committed or rolledback. +type Transaction interface { + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +} + +// EndTransaction ends a transaction. +// If the transaction succeeded then it is committed, otherwise it is rolledback. +// You MUST check the error returned from this function to be sure that the transaction +// was applied correctly. For example, 'database is locked' errors in sqlite will happen here. +func EndTransaction(txn Transaction, succeeded *bool) error { + if *succeeded { + return txn.Commit() // nolint: errcheck + } else { + return txn.Rollback() // nolint: errcheck + } +} + +// WithTransaction runs a block of code passing in an SQL transaction +// If the code returns an error or panics then the transactions is rolledback +// Otherwise the transaction is committed. +func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { + txn, err := db.Begin() + if err != nil { + return + } + succeeded := false + defer func() { + err2 := EndTransaction(txn, &succeeded) + if err == nil && err2 != nil { // failed to commit/rollback + err = err2 + } + }() + + err = fn(txn) + if err != nil { + return + } + + succeeded = true + return +} + +// TxStmt wraps an SQL stmt inside an optional transaction. +// If the transaction is nil then it returns the original statement that will +// run outside of a transaction. +// Otherwise returns a copy of the statement that will run inside the transaction. +func TxStmt(transaction *sql.Tx, statement *sql.Stmt) *sql.Stmt { + if transaction != nil { + statement = transaction.Stmt(statement) + } + return statement +} + +// Hack of the century +func QueryVariadic(count int) string { + return QueryVariadicOffset(count, 0) +} + +func QueryVariadicOffset(count, offset int) string { + str := "(" + for i := 0; i < count; i++ { + str += fmt.Sprintf("$%d", i+offset+1) + if i < (count - 1) { + str += ", " + } + } + str += ")" + return str +} + +func SQLiteDriverName() string { + if runtime.GOOS == "js" { + return "sqlite3_js" + } + return "sqlite3" +} + +// DbProperties functions return properties used by database/sql/DB +type DbProperties interface { + MaxIdleConns() int + MaxOpenConns() int + ConnMaxLifetime() time.Duration +} + +// TransactionWriter allows queuing database writes so that you don't +// contend on database locks in, e.g. SQLite. Only one task will run +// at a time on a given TransactionWriter. +type TransactionWriter struct { + running atomic.Bool + todo chan transactionWriterTask +} + +func NewTransactionWriter() *TransactionWriter { + return &TransactionWriter{ + todo: make(chan transactionWriterTask), + } +} + +// transactionWriterTask represents a specific task. +type transactionWriterTask struct { + db *sql.DB + f func(txn *sql.Tx) error + wait chan error +} + +// Do queues a task to be run by a TransactionWriter. The function +// provided will be ran within a transaction as supplied by the +// database parameter. This will block until the task is finished. +func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error { + if w.todo == nil { + return errors.New("not initialised") + } + if !w.running.Load() { + go w.run() + } + task := transactionWriterTask{ + db: db, + f: f, + wait: make(chan error, 1), + } + w.todo <- task + return <-task.wait +} + +// run processes the tasks for a given transaction writer. Only one +// of these goroutines will run at a time. A transaction will be +// opened using the database object from the task and then this will +// be passed as a parameter to the task function. +func (w *TransactionWriter) run() { + if !w.running.CAS(false, true) { + return + } + defer w.running.Store(false) + for task := range w.todo { + task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { + return task.f(txn) + }) + close(task.wait) + } +} diff --git a/internal/sqlutil/trace.go b/internal/sqlutil/trace.go index 1b008e1b..f6644d59 100644 --- a/internal/sqlutil/trace.go +++ b/internal/sqlutil/trace.go @@ -25,7 +25,6 @@ import ( "strings" "time" - "github.com/matrix-org/dendrite/internal" "github.com/ngrok/sqlmw" "github.com/sirupsen/logrus" ) @@ -78,7 +77,7 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [ // Open opens a database specified by its database driver name and a driver-specific data source name, // usually consisting of at least a database name and connection information. Includes tracing driver // if DENDRITE_TRACE_SQL=1 -func Open(driverName, dsn string, dbProperties internal.DbProperties) (*sql.DB, error) { +func Open(driverName, dsn string, dbProperties DbProperties) (*sql.DB, error) { if tracingEnabled { // install the wrapped driver driverName += "-trace" @@ -87,7 +86,7 @@ func Open(driverName, dsn string, dbProperties internal.DbProperties) (*sql.DB, if err != nil { return nil, err } - if driverName != internal.SQLiteDriverName() && dbProperties != nil { + if driverName != SQLiteDriverName() && dbProperties != nil { logrus.WithFields(logrus.Fields{ "MaxOpenConns": dbProperties.MaxOpenConns(), "MaxIdleConns": dbProperties.MaxIdleConns(), diff --git a/internal/sqlutil/uri.go b/internal/sqlutil/uri.go index f72e0242..703258e6 100644 --- a/internal/sqlutil/uri.go +++ b/internal/sqlutil/uri.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sqlutil import ( diff --git a/internal/types.go b/internal/types.go deleted file mode 100644 index be2717f3..00000000 --- a/internal/types.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal - -import ( - "errors" - "strconv" -) - -// ErrProfileNoExists is returned when trying to lookup a user's profile that -// doesn't exist locally. -var ErrProfileNoExists = errors.New("no known profile for given user ID") - -// AccountData represents account data sent from the client API server to the -// sync API server -type AccountData struct { - RoomID string `json:"room_id"` - Type string `json:"type"` -} - -// ProfileResponse is a struct containing all known user profile data -type ProfileResponse struct { - AvatarURL string `json:"avatar_url"` - DisplayName string `json:"displayname"` -} - -// AvatarURL is a struct containing only the URL to a user's avatar -type AvatarURL struct { - AvatarURL string `json:"avatar_url"` -} - -// DisplayName is a struct containing only a user's display name -type DisplayName struct { - DisplayName string `json:"displayname"` -} - -// WeakBoolean is a type that will Unmarshal to true or false even if the encoded -// representation is "true"/1 or "false"/0, as well as whatever other forms are -// recognised by strconv.ParseBool -type WeakBoolean bool - -// UnmarshalJSON is overridden here to allow strings vaguely representing a true -// or false boolean to be set as their closest counterpart -func (b *WeakBoolean) UnmarshalJSON(data []byte) error { - result, err := strconv.ParseBool(string(data)) - if err != nil { - return err - } - - // Set boolean value based on string input - *b = WeakBoolean(result) - - return nil -} -- cgit v1.2.3