diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-10 14:18:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-10 14:18:04 +0100 |
commit | 4b09f445c992fd0a389efc34d75aaa7e5bd50e9c (patch) | |
tree | 18d6168718ac06e569eb271f25ed4dc064010b50 /federationsender | |
parent | fdabba1851c489d801ea4029bce9dec7d415b2df (diff) |
Configuration format v1 (#1230)
* Initial pass at refactoring config (not finished)
* Don't forget current state and EDU servers
* More shifting around
* Update server key API tests
* Fix roomserver test
* Fix more tests
* Further tweaks
* Fix current state server test (sort of)
* Maybe fix appservices
* Fix client API test
* Include database connection string in database options
* Fix sync API build
* Update config test
* Fix unit tests
* Fix federation sender build
* Fix gobind build
* Set Listen address for all services in HTTP monolith mode
* Validate config, reinstate appservice derived in directory, tweaks
* Tweak federation API test
* Set MaxOpenConnections/MaxIdleConnections to previous values
* Update generate-config
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/consumers/eduserver.go | 10 | ||||
-rw-r--r-- | federationsender/consumers/keychange.go | 4 | ||||
-rw-r--r-- | federationsender/consumers/roomserver.go | 6 | ||||
-rw-r--r-- | federationsender/federationsender.go | 22 | ||||
-rw-r--r-- | federationsender/internal/api.go | 4 | ||||
-rw-r--r-- | federationsender/storage/postgres/storage.go | 5 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/storage.go | 9 | ||||
-rw-r--r-- | federationsender/storage/storage.go | 22 | ||||
-rw-r--r-- | federationsender/storage/storage_wasm.go | 24 |
9 files changed, 47 insertions, 59 deletions
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 7b539530..74ce65db 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -43,27 +43,27 @@ type OutputEDUConsumer struct { // NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. func NewOutputEDUConsumer( - cfg *config.Dendrite, + cfg *config.FederationSender, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, ) *OutputEDUConsumer { c := &OutputEDUConsumer{ typingConsumer: &internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputTypingEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), Consumer: kafkaConsumer, PartitionStore: store, }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent), - SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), + TypingTopic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), + SendToDeviceTopic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), } c.typingConsumer.ProcessMessage = c.onTypingEvent c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index d33a6e07..8060125e 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -41,7 +41,7 @@ type KeyChangeConsumer struct { // NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers. func NewKeyChangeConsumer( - cfg *config.Dendrite, + cfg *config.KeyServer, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, @@ -49,7 +49,7 @@ func NewKeyChangeConsumer( ) *KeyChangeConsumer { c := &KeyChangeConsumer{ consumer: &internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 299c7b37..b3a4cde3 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -33,7 +33,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { - cfg *config.Dendrite + cfg *config.FederationSender rsAPI api.RoomserverInternalAPI rsConsumer *internal.ContinualConsumer db storage.Database @@ -42,14 +42,14 @@ type OutputRoomEventConsumer struct { // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( - cfg *config.Dendrite, + cfg *config.FederationSender, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index fbf506aa..b02686fe 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -45,27 +45,29 @@ func NewInternalAPI( stateAPI stateapi.CurrentStateInternalAPI, keyRing *gomatrixserverlib.KeyRing, ) api.FederationSenderInternalAPI { - federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties()) + cfg := &base.Cfg.FederationSender + + federationSenderDB, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } stats := &statistics.Statistics{ DB: federationSenderDB, - FailuresUntilBlacklist: base.Cfg.Matrix.FederationMaxRetries, + FailuresUntilBlacklist: cfg.FederationMaxRetries, } queues := queue.NewOutgoingQueues( - federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, stats, + federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats, &queue.SigningInfo{ - KeyID: base.Cfg.Matrix.KeyID, - PrivateKey: base.Cfg.Matrix.PrivateKey, - ServerName: base.Cfg.Matrix.ServerName, + KeyID: cfg.Matrix.KeyID, + PrivateKey: cfg.Matrix.PrivateKey, + ServerName: cfg.Matrix.ServerName, }, ) rsConsumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, queues, + cfg, base.KafkaConsumer, queues, federationSenderDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -73,17 +75,17 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - base.Cfg, base.KafkaConsumer, queues, federationSenderDB, + cfg, base.KafkaConsumer, queues, federationSenderDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI, + &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, stateAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") } - return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues) + return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues) } diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 9a9880ce..647e3fcb 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -12,7 +12,7 @@ import ( // FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI type FederationSenderInternalAPI struct { db storage.Database - cfg *config.Dendrite + cfg *config.FederationSender statistics *statistics.Statistics rsAPI api.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient @@ -21,7 +21,7 @@ type FederationSenderInternalAPI struct { } func NewFederationSenderInternalAPI( - db storage.Database, cfg *config.Dendrite, + db storage.Database, cfg *config.FederationSender, rsAPI api.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index a3094bda..b65ff0b6 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" ) @@ -30,10 +31,10 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (*Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { var d Database var err error - if d.db, err = sqlutil.Open("postgres", dataSourceName, dbProperties); err != nil { + if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } joinedHosts, err := NewPostgresJoinedHostsTable(d.db) diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index c303d094..41b91871 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -21,6 +21,7 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" ) @@ -32,14 +33,10 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dataSourceName string) (*Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { var d Database var err error - cs, err := sqlutil.ParseFileURI(dataSourceName) - if err != nil { - return nil, err - } - if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil { + if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } joinedHosts, err := NewSQLiteJoinedHostsTable(d.db) diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go index d3736005..1380fefd 100644 --- a/federationsender/storage/storage.go +++ b/federationsender/storage/storage.go @@ -17,25 +17,21 @@ package storage import ( - "net/url" + "fmt" "github.com/matrix-org/dendrite/federationsender/storage/postgres" "github.com/matrix-org/dendrite/federationsender/storage/sqlite3" - "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/internal/config" ) // NewDatabase opens a new database -func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) { - uri, err := url.Parse(dataSourceName) - if err != nil { - return postgres.NewDatabase(dataSourceName, dbProperties) - } - switch uri.Scheme { - case "file": - return sqlite3.NewDatabase(dataSourceName) - case "postgres": - return postgres.NewDatabase(dataSourceName, dbProperties) +func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return postgres.NewDatabase(dbProperties) default: - return postgres.NewDatabase(dataSourceName, dbProperties) + return nil, fmt.Errorf("unexpected database type") } } diff --git a/federationsender/storage/storage_wasm.go b/federationsender/storage/storage_wasm.go index e5c8f293..459329e9 100644 --- a/federationsender/storage/storage_wasm.go +++ b/federationsender/storage/storage_wasm.go @@ -16,27 +16,19 @@ package storage import ( "fmt" - "net/url" "github.com/matrix-org/dendrite/federationsender/storage/sqlite3" - "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/internal/config" ) // NewDatabase opens a new database -func NewDatabase( - dataSourceName string, - dbProperties sqlutil.DbProperties, // nolint:unparam -) (Database, error) { - uri, err := url.Parse(dataSourceName) - if err != nil { - return nil, fmt.Errorf("Cannot use postgres implementation") - } - switch uri.Scheme { - case "file": - return sqlite3.NewDatabase(dataSourceName) - case "postgres": - return nil, fmt.Errorf("Cannot use postgres implementation") +func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") default: - return nil, fmt.Errorf("Cannot use postgres implementation") + return nil, fmt.Errorf("unexpected database type") } } |