diff options
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/clientapi.go | 4 | ||||
-rw-r--r-- | syncapi/consumers/eduserver_sendtodevice.go | 4 | ||||
-rw-r--r-- | syncapi/consumers/eduserver_typing.go | 4 | ||||
-rw-r--r-- | syncapi/consumers/roomserver.go | 4 | ||||
-rw-r--r-- | syncapi/routing/messages.go | 4 | ||||
-rw-r--r-- | syncapi/routing/routing.go | 2 | ||||
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 5 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 10 | ||||
-rw-r--r-- | syncapi/storage/storage.go | 22 | ||||
-rw-r--r-- | syncapi/storage/storage_test.go | 5 | ||||
-rw-r--r-- | syncapi/storage/storage_wasm.go | 24 | ||||
-rw-r--r-- | syncapi/syncapi.go | 6 |
12 files changed, 42 insertions, 52 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index f7cf96d9..ceaa735a 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -37,14 +37,14 @@ type OutputClientDataConsumer struct { // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. func NewOutputClientDataConsumer( - cfg *config.Dendrite, + cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, n *sync.Notifier, store storage.Database, ) *OutputClientDataConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputClientData), + Topic: string(cfg.Matrix.Kafka.Topics.OutputClientData), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 06a8928d..20dd1756 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -41,14 +41,14 @@ type OutputSendToDeviceEventConsumer struct { // NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. // Call Start() to begin consuming from the EDU server. func NewOutputSendToDeviceEventConsumer( - cfg *config.Dendrite, + cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, n *sync.Notifier, store storage.Database, ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 0a9a9c0c..fc5703d3 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -37,14 +37,14 @@ type OutputTypingEventConsumer struct { // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. // Call Start() to begin consuming from the EDU server. func NewOutputTypingEventConsumer( - cfg *config.Dendrite, + cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, n *sync.Notifier, store storage.Database, ) *OutputTypingEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputTypingEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index f8cdcd5c..06c904c3 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -40,7 +40,7 @@ type OutputRoomEventConsumer struct { // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( - cfg *config.Dendrite, + cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, n *sync.Notifier, store storage.Database, @@ -49,7 +49,7 @@ func NewOutputRoomEventConsumer( ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 15add1b4..0999d3e8 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -36,7 +36,7 @@ type messagesReq struct { db storage.Database rsAPI api.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient - cfg *config.Dendrite + cfg *config.SyncAPI roomID string from *types.TopologyToken to *types.TopologyToken @@ -61,7 +61,7 @@ func OnIncomingMessagesRequest( req *http.Request, db storage.Database, roomID string, federation *gomatrixserverlib.FederationClient, rsAPI api.RoomserverInternalAPI, - cfg *config.Dendrite, + cfg *config.SyncAPI, ) util.JSONResponse { var err error diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index ed0f872e..ec21c1b4 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -39,7 +39,7 @@ func Setup( publicAPIMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient, rsAPI api.RoomserverInternalAPI, - cfg *config.Dendrite, + cfg *config.SyncAPI, ) { r0mux := publicAPIMux.PathPrefix(pathPrefixR0).Subrouter() diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 10c1b37c..26ef082f 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -21,6 +21,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/shared" ) @@ -34,10 +35,10 @@ type SyncServerDatasource struct { } // NewDatabase creates a new sync server database -func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*SyncServerDatasource, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil { + if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index c85db5a4..9564a23a 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -22,6 +22,7 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/shared" ) @@ -37,13 +38,10 @@ type SyncServerDatasource struct { // NewDatabase creates a new sync server database // nolint: gocyclo -func NewDatabase(dataSourceName string) (*SyncServerDatasource, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource - cs, err := sqlutil.ParseFileURI(dataSourceName) - if err != nil { - return nil, err - } - if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil { + var err error + if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } if err = d.prepare(); err != nil { diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index ea69da3b..c16dcd81 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -17,25 +17,21 @@ package storage import ( - "net/url" + "fmt" - "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewSyncServerDatasource opens a database connection. -func NewSyncServerDatasource(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) { - uri, err := url.Parse(dataSourceName) - if err != nil { - return postgres.NewDatabase(dataSourceName, dbProperties) - } - switch uri.Scheme { - case "postgres": - return postgres.NewDatabase(dataSourceName, dbProperties) - case "file": - return sqlite3.NewDatabase(dataSourceName) +func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return postgres.NewDatabase(dbProperties) default: - return postgres.NewDatabase(dataSourceName, dbProperties) + return nil, fmt.Errorf("unexpected database type") } } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 1f679def..0e827c95 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" "github.com/matrix-org/dendrite/syncapi/types" @@ -59,7 +60,9 @@ func MustCreateDatabase(t *testing.T) storage.Database { t.Fatalf("tried to delete stale test database but failed: %s", err) } } - db, err := sqlite3.NewDatabase(fmt.Sprintf("file:%s", dbname)) + db, err := sqlite3.NewDatabase(&config.DatabaseOptions{ + ConnectionString: config.DataSource(fmt.Sprintf("file:%s", dbname)), + }) if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go index 0886b8c2..43b7bbea 100644 --- a/syncapi/storage/storage_wasm.go +++ b/syncapi/storage/storage_wasm.go @@ -16,27 +16,19 @@ package storage import ( "fmt" - "net/url" - "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewPublicRoomsServerDatabase opens a database connection. -func NewSyncServerDatasource( - dataSourceName string, - dbProperties sqlutil.DbProperties, // nolint:unparam -) (Database, error) { - uri, err := url.Parse(dataSourceName) - if err != nil { - return nil, fmt.Errorf("Cannot use postgres implementation") - } - switch uri.Scheme { - case "postgres": - return nil, fmt.Errorf("Cannot use postgres implementation") - case "file": - return sqlite3.NewDatabase(dataSourceName) +func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") default: - return nil, fmt.Errorf("Cannot use postgres implementation") + return nil, fmt.Errorf("unexpected database type") } } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 5198d59b..9caed7be 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -44,9 +44,9 @@ func AddPublicRoutes( keyAPI keyapi.KeyInternalAPI, currentStateAPI currentstateapi.CurrentStateInternalAPI, federation *gomatrixserverlib.FederationClient, - cfg *config.Dendrite, + cfg *config.SyncAPI, ) { - syncDB, err := storage.NewSyncServerDatasource(string(cfg.Database.SyncAPI), cfg.DbProperties()) + syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } @@ -65,7 +65,7 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent), consumer, notifier, keyAPI, currentStateAPI, syncDB, ) if err = keyChangeConsumer.Start(); err != nil { |