aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-10 14:18:04 +0100
committerGitHub <noreply@github.com>2020-08-10 14:18:04 +0100
commit4b09f445c992fd0a389efc34d75aaa7e5bd50e9c (patch)
tree18d6168718ac06e569eb271f25ed4dc064010b50 /federationsender
parentfdabba1851c489d801ea4029bce9dec7d415b2df (diff)
Configuration format v1 (#1230)
* Initial pass at refactoring config (not finished) * Don't forget current state and EDU servers * More shifting around * Update server key API tests * Fix roomserver test * Fix more tests * Further tweaks * Fix current state server test (sort of) * Maybe fix appservices * Fix client API test * Include database connection string in database options * Fix sync API build * Update config test * Fix unit tests * Fix federation sender build * Fix gobind build * Set Listen address for all services in HTTP monolith mode * Validate config, reinstate appservice derived in directory, tweaks * Tweak federation API test * Set MaxOpenConnections/MaxIdleConnections to previous values * Update generate-config
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/consumers/eduserver.go10
-rw-r--r--federationsender/consumers/keychange.go4
-rw-r--r--federationsender/consumers/roomserver.go6
-rw-r--r--federationsender/federationsender.go22
-rw-r--r--federationsender/internal/api.go4
-rw-r--r--federationsender/storage/postgres/storage.go5
-rw-r--r--federationsender/storage/sqlite3/storage.go9
-rw-r--r--federationsender/storage/storage.go22
-rw-r--r--federationsender/storage/storage_wasm.go24
9 files changed, 47 insertions, 59 deletions
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go
index 7b539530..74ce65db 100644
--- a/federationsender/consumers/eduserver.go
+++ b/federationsender/consumers/eduserver.go
@@ -43,27 +43,27 @@ type OutputEDUConsumer struct {
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputEDUConsumer(
- cfg *config.Dendrite,
+ cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
- TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent),
- SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
+ TypingTopic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent),
+ SendToDeviceTopic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent),
}
c.typingConsumer.ProcessMessage = c.onTypingEvent
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go
index d33a6e07..8060125e 100644
--- a/federationsender/consumers/keychange.go
+++ b/federationsender/consumers/keychange.go
@@ -41,7 +41,7 @@ type KeyChangeConsumer struct {
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
- cfg *config.Dendrite,
+ cfg *config.KeyServer,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
@@ -49,7 +49,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 299c7b37..b3a4cde3 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.Dendrite
+ cfg *config.FederationSender
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
@@ -42,14 +42,14 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
- cfg *config.Dendrite,
+ cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index fbf506aa..b02686fe 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -45,27 +45,29 @@ func NewInternalAPI(
stateAPI stateapi.CurrentStateInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI {
- federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties())
+ cfg := &base.Cfg.FederationSender
+
+ federationSenderDB, err := storage.NewDatabase(&cfg.Database)
if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
stats := &statistics.Statistics{
DB: federationSenderDB,
- FailuresUntilBlacklist: base.Cfg.Matrix.FederationMaxRetries,
+ FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
queues := queue.NewOutgoingQueues(
- federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, stats,
+ federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats,
&queue.SigningInfo{
- KeyID: base.Cfg.Matrix.KeyID,
- PrivateKey: base.Cfg.Matrix.PrivateKey,
- ServerName: base.Cfg.Matrix.ServerName,
+ KeyID: cfg.Matrix.KeyID,
+ PrivateKey: cfg.Matrix.PrivateKey,
+ ServerName: cfg.Matrix.ServerName,
},
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, base.KafkaConsumer, queues,
+ cfg, base.KafkaConsumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@@ -73,17 +75,17 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
- base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
+ cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
- base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
+ &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
}
- return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues)
+ return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues)
}
diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go
index 9a9880ce..647e3fcb 100644
--- a/federationsender/internal/api.go
+++ b/federationsender/internal/api.go
@@ -12,7 +12,7 @@ import (
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
db storage.Database
- cfg *config.Dendrite
+ cfg *config.FederationSender
statistics *statistics.Statistics
rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
@@ -21,7 +21,7 @@ type FederationSenderInternalAPI struct {
}
func NewFederationSenderInternalAPI(
- db storage.Database, cfg *config.Dendrite,
+ db storage.Database, cfg *config.FederationSender,
rsAPI api.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index a3094bda..b65ff0b6 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -19,6 +19,7 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
@@ -30,10 +31,10 @@ type Database struct {
}
// NewDatabase opens a new database
-func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (*Database, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
- if d.db, err = sqlutil.Open("postgres", dataSourceName, dbProperties); err != nil {
+ if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
joinedHosts, err := NewPostgresJoinedHostsTable(d.db)
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index c303d094..41b91871 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -21,6 +21,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
@@ -32,14 +33,10 @@ type Database struct {
}
// NewDatabase opens a new database
-func NewDatabase(dataSourceName string) (*Database, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
- cs, err := sqlutil.ParseFileURI(dataSourceName)
- if err != nil {
- return nil, err
- }
- if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil {
+ if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
joinedHosts, err := NewSQLiteJoinedHostsTable(d.db)
diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go
index d3736005..1380fefd 100644
--- a/federationsender/storage/storage.go
+++ b/federationsender/storage/storage.go
@@ -17,25 +17,21 @@
package storage
import (
- "net/url"
+ "fmt"
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
- "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/internal/config"
)
// NewDatabase opens a new database
-func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) {
- uri, err := url.Parse(dataSourceName)
- if err != nil {
- return postgres.NewDatabase(dataSourceName, dbProperties)
- }
- switch uri.Scheme {
- case "file":
- return sqlite3.NewDatabase(dataSourceName)
- case "postgres":
- return postgres.NewDatabase(dataSourceName, dbProperties)
+func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
+ switch {
+ case dbProperties.ConnectionString.IsSQLite():
+ return sqlite3.NewDatabase(dbProperties)
+ case dbProperties.ConnectionString.IsPostgres():
+ return postgres.NewDatabase(dbProperties)
default:
- return postgres.NewDatabase(dataSourceName, dbProperties)
+ return nil, fmt.Errorf("unexpected database type")
}
}
diff --git a/federationsender/storage/storage_wasm.go b/federationsender/storage/storage_wasm.go
index e5c8f293..459329e9 100644
--- a/federationsender/storage/storage_wasm.go
+++ b/federationsender/storage/storage_wasm.go
@@ -16,27 +16,19 @@ package storage
import (
"fmt"
- "net/url"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
- "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/internal/config"
)
// NewDatabase opens a new database
-func NewDatabase(
- dataSourceName string,
- dbProperties sqlutil.DbProperties, // nolint:unparam
-) (Database, error) {
- uri, err := url.Parse(dataSourceName)
- if err != nil {
- return nil, fmt.Errorf("Cannot use postgres implementation")
- }
- switch uri.Scheme {
- case "file":
- return sqlite3.NewDatabase(dataSourceName)
- case "postgres":
- return nil, fmt.Errorf("Cannot use postgres implementation")
+func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
+ switch {
+ case dbProperties.ConnectionString.IsSQLite():
+ return sqlite3.NewDatabase(dbProperties)
+ case dbProperties.ConnectionString.IsPostgres():
+ return nil, fmt.Errorf("can't use Postgres implementation")
default:
- return nil, fmt.Errorf("Cannot use postgres implementation")
+ return nil, fmt.Errorf("unexpected database type")
}
}