diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-05-03 16:35:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-03 16:35:06 +0100 |
commit | 4ad5f9c982fe5dc9e306a9269621ead8c31248cf (patch) | |
tree | 9eac975c1d7232b35ce4d0c7f658db3c2289f0ab /syncapi | |
parent | 979a551f1e2aeb9f3417df5e52a7279230b7a3ba (diff) |
Global database connection pool (for monolith mode) (#2411)
* Allow monolith components to share a single database pool
* Don't yell about missing connection strings
* Rename field
* Setup tweaks
* Fix panic
* Improve configuration checks
* Update config
* Fix lint errors
* Update comments
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 6 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 6 | ||||
-rw-r--r-- | syncapi/storage/storage.go | 7 | ||||
-rw-r--r-- | syncapi/storage/storage_test.go | 2 | ||||
-rw-r--r-- | syncapi/storage/storage_wasm.go | 5 | ||||
-rw-r--r-- | syncapi/storage/tables/output_room_events_test.go | 2 | ||||
-rw-r--r-- | syncapi/storage/tables/topology_test.go | 2 | ||||
-rw-r--r-- | syncapi/syncapi.go | 24 |
8 files changed, 28 insertions, 26 deletions
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index b0382512..9cfe7c07 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -21,6 +21,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" @@ -35,13 +36,12 @@ type SyncServerDatasource struct { } // NewDatabase creates a new sync server database -func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, err = sqlutil.Open(dbProperties); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } - d.writer = sqlutil.NewDummyWriter() accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index dfc28948..e08a0ba8 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" @@ -35,13 +36,12 @@ type SyncServerDatasource struct { // NewDatabase creates a new sync server database // nolint: gocyclo -func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, err = sqlutil.Open(dbProperties); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } - d.writer = sqlutil.NewExclusiveWriter() if err = d.prepare(dbProperties); err != nil { return nil, err } diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index 7f9c28e9..5b20c6cc 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -20,18 +20,19 @@ package storage import ( "fmt" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewSyncServerDatasource opens a database connection. -func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties) + return postgres.NewDatabase(base, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 15bb769a..1150c2f3 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -17,7 +17,7 @@ var ctx = context.Background() func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewSyncServerDatasource(&config.DatabaseOptions{ + db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go index f7fef962..c1544474 100644 --- a/syncapi/storage/storage_wasm.go +++ b/syncapi/storage/storage_wasm.go @@ -17,15 +17,16 @@ package storage import ( "fmt" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewPublicRoomsServerDatabase opens a database connection. -func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go index a143e5ec..8bbf879d 100644 --- a/syncapi/storage/tables/output_room_events_test.go +++ b/syncapi/storage/tables/output_room_events_test.go @@ -21,7 +21,7 @@ func newOutputRoomEventsTable(t *testing.T, dbType test.DBType) (tables.Events, connStr, close := test.PrepareDBConnectionString(t, dbType) db, err := sqlutil.Open(&config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }) + }, sqlutil.NewExclusiveWriter()) if err != nil { t.Fatalf("failed to open db: %s", err) } diff --git a/syncapi/storage/tables/topology_test.go b/syncapi/storage/tables/topology_test.go index b6ece0b0..2334aae2 100644 --- a/syncapi/storage/tables/topology_test.go +++ b/syncapi/storage/tables/topology_test.go @@ -20,7 +20,7 @@ func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.D connStr, close := test.PrepareDBConnectionString(t, dbType) db, err := sqlutil.Open(&config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }) + }, sqlutil.NewExclusiveWriter()) if err != nil { t.Fatalf("failed to open db: %s", err) } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index b2d333f7..a2b8859c 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -23,9 +23,9 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -41,7 +41,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI // component. func AddPublicRoutes( - process *process.ProcessContext, + base *base.BaseDendrite, router *mux.Router, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, @@ -49,9 +49,9 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) + syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } @@ -86,7 +86,7 @@ func AddPublicRoutes( } keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) @@ -95,7 +95,7 @@ func AddPublicRoutes( } roomConsumer := consumers.NewOutputRoomEventConsumer( - process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer, ) if err = roomConsumer.Start(); err != nil { @@ -103,7 +103,7 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, userAPIReadUpdateProducer, ) if err = clientConsumer.Start(); err != nil { @@ -111,28 +111,28 @@ func AddPublicRoutes( } notificationConsumer := consumers.NewOutputNotificationDataConsumer( - process, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, ) if err = notificationConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start notification data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - process, cfg, js, eduCache, notifier, streams.TypingStreamProvider, + base.ProcessContext, cfg, js, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - process, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, userAPIReadUpdateProducer, ) if err = receiptConsumer.Start(); err != nil { @@ -140,7 +140,7 @@ func AddPublicRoutes( } presenceConsumer := consumers.NewPresenceConsumer( - process, cfg, js, natsClient, syncDB, + base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.PresenceStreamProvider, userAPI, ) |