diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-05-03 16:35:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-03 16:35:06 +0100 |
commit | 4ad5f9c982fe5dc9e306a9269621ead8c31248cf (patch) | |
tree | 9eac975c1d7232b35ce4d0c7f658db3c2289f0ab | |
parent | 979a551f1e2aeb9f3417df5e52a7279230b7a3ba (diff) |
Global database connection pool (for monolith mode) (#2411)
* Allow monolith components to share a single database pool
* Don't yell about missing connection strings
* Rename field
* Setup tweaks
* Fix panic
* Improve configuration checks
* Update config
* Fix lint errors
* Update comments
71 files changed, 345 insertions, 240 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go index b9909186..0db2c100 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -62,7 +62,7 @@ func NewInternalAPI( js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB - appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) + appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to appservice db") } diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index eaf947ff..a4c04b2c 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -22,6 +22,7 @@ import ( // Import postgres database driver _ "github.com/lib/pq" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -35,13 +36,12 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*Database, error) { var result Database var err error - if result.db, err = sqlutil.Open(dbProperties); err != nil { + if result.db, result.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } - result.writer = sqlutil.NewDummyWriter() if err = result.prepare(); err != nil { return nil, err } diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 9260c7fe..ad62b362 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -21,6 +21,7 @@ import ( // Import SQLite database driver "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -34,13 +35,12 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*Database, error) { var result Database var err error - if result.db, err = sqlutil.Open(dbProperties); err != nil { + if result.db, result.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } - result.writer = sqlutil.NewExclusiveWriter() if err = result.prepare(); err != nil { return nil, err } diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go index 97b8501e..89d5e0cc 100644 --- a/appservice/storage/storage.go +++ b/appservice/storage/storage.go @@ -22,17 +22,18 @@ import ( "github.com/matrix-org/dendrite/appservice/storage/postgres" "github.com/matrix-org/dendrite/appservice/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) // and sets DB connection parameters -func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties) + return postgres.NewDatabase(base, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/appservice/storage/storage_wasm.go b/appservice/storage/storage_wasm.go index 07d0e9ee..23025459 100644 --- a/appservice/storage/storage_wasm.go +++ b/appservice/storage/storage_wasm.go @@ -18,13 +18,14 @@ import ( "fmt" "github.com/matrix-org/dendrite/appservice/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index d047f3ff..8cf663d0 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -268,7 +268,6 @@ func (m *DendriteMonolith) Start() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() federation := conn.CreateFederationClient(base, m.PineconeQUIC) serverKeyAPI := &signing.YggdrasilKeys{} @@ -281,7 +280,7 @@ func (m *DendriteMonolith) Start() { ) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) - m.userAPI = userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) + m.userAPI = userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(m.userAPI) asAPI := appservice.NewInternalAPI(base, m.userAPI, rsAPI) @@ -295,7 +294,6 @@ func (m *DendriteMonolith) Start() { monolith := setup.Monolith{ Config: base.Cfg, - AccountDB: accountDB, Client: conn.CreateClient(base, m.PineconeQUIC), FedClient: federation, KeyRing: keyRing, @@ -309,7 +307,7 @@ func (m *DendriteMonolith) Start() { ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux, diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 4e95e397..2c7d4e91 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -107,7 +107,6 @@ func (m *DendriteMonolith) Start() { m.processContext = base.ProcessContext defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() federation := ygg.CreateFederationClient(base) serverKeyAPI := &signing.YggdrasilKeys{} @@ -120,7 +119,7 @@ func (m *DendriteMonolith) Start() { ) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) - userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) + userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) @@ -132,7 +131,6 @@ func (m *DendriteMonolith) Start() { monolith := setup.Monolith{ Config: base.Cfg, - AccountDB: accountDB, Client: ygg.CreateClient(base), FedClient: federation, KeyRing: keyRing, @@ -147,7 +145,7 @@ func (m *DendriteMonolith) Start() { ), } monolith.AddAllPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux, diff --git a/cmd/create-account/main.go b/cmd/create-account/main.go index 2719f868..7a566052 100644 --- a/cmd/create-account/main.go +++ b/cmd/create-account/main.go @@ -25,8 +25,8 @@ import ( "strings" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage" "github.com/sirupsen/logrus" "golang.org/x/term" ) @@ -99,8 +99,18 @@ func main() { } } - b := base.NewBaseDendrite(cfg, "Monolith") - accountDB := b.CreateAccountsDB() + accountDB, err := storage.NewUserAPIDatabase( + nil, + &cfg.UserAPI.AccountDatabase, + cfg.Global.ServerName, + cfg.UserAPI.BCryptCost, + cfg.UserAPI.OpenIDTokenLifetimeMS, + 0, // TODO + cfg.Global.ServerNotices.LocalPart, + ) + if err != nil { + logrus.WithError(err).Fatalln("Failed to connect to the database") + } accType := api.AccountTypeUser if *isAdmin { diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 785e7b46..33487e64 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -149,7 +149,6 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() federation := conn.CreateFederationClient(base, pQUIC) serverKeyAPI := &signing.YggdrasilKeys{} @@ -162,7 +161,7 @@ func main() { ) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) - userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) + userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) @@ -174,7 +173,6 @@ func main() { monolith := setup.Monolith{ Config: base.Cfg, - AccountDB: accountDB, Client: conn.CreateClient(base, pQUIC), FedClient: federation, KeyRing: keyRing, @@ -188,7 +186,7 @@ func main() { ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index f9234319..df9ba512 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -104,7 +104,6 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() federation := ygg.CreateFederationClient(base) serverKeyAPI := &signing.YggdrasilKeys{} @@ -117,7 +116,7 @@ func main() { ) rsAPI := rsComponent - userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) + userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) @@ -130,7 +129,6 @@ func main() { monolith := setup.Monolith{ Config: base.Cfg, - AccountDB: accountDB, Client: ygg.CreateClient(base), FedClient: federation, KeyRing: keyRing, @@ -145,7 +143,7 @@ func main() { ), } monolith.AddAllPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux, diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 5fd5c0b5..4c7c4297 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -71,7 +71,6 @@ func main() { base := basepkg.NewBaseDendrite(cfg, "Monolith", options...) defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() federation := base.CreateFederationClient() rsImpl := roomserver.NewInternalAPI(base) @@ -104,7 +103,7 @@ func main() { } pgClient := base.PushGatewayHTTPClient() - userImpl := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, pgClient) + userImpl := userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, pgClient) userAPI := userImpl if base.UseHTTPAPIs { userapi.AddInternalRoutes(base.InternalAPIMux, userAPI) @@ -135,7 +134,6 @@ func main() { monolith := setup.Monolith{ Config: base.Cfg, - AccountDB: accountDB, Client: base.CreateClient(), FedClient: federation, KeyRing: keyRing, @@ -146,7 +144,7 @@ func main() { KeyAPI: keyAPI, } monolith.AddAllPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux, diff --git a/cmd/dendrite-polylith-multi/personalities/mediaapi.go b/cmd/dendrite-polylith-multi/personalities/mediaapi.go index fa9d36a3..8c0bfa19 100644 --- a/cmd/dendrite-polylith-multi/personalities/mediaapi.go +++ b/cmd/dendrite-polylith-multi/personalities/mediaapi.go @@ -24,7 +24,10 @@ func MediaAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() client := base.CreateClient() - mediaapi.AddPublicRoutes(base.PublicMediaAPIMux, &base.Cfg.MediaAPI, &base.Cfg.ClientAPI.RateLimiting, userAPI, client) + mediaapi.AddPublicRoutes( + base, base.PublicMediaAPIMux, &base.Cfg.MediaAPI, &base.Cfg.ClientAPI.RateLimiting, + userAPI, client, + ) base.SetupAndServeHTTP( base.Cfg.MediaAPI.InternalAPI.Listen, diff --git a/cmd/dendrite-polylith-multi/personalities/syncapi.go b/cmd/dendrite-polylith-multi/personalities/syncapi.go index 6fee8419..f9f1c5a0 100644 --- a/cmd/dendrite-polylith-multi/personalities/syncapi.go +++ b/cmd/dendrite-polylith-multi/personalities/syncapi.go @@ -27,7 +27,7 @@ func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, userAPI, rsAPI, base.KeyServerHTTPClient(), federation, &cfg.SyncAPI, diff --git a/cmd/dendrite-polylith-multi/personalities/userapi.go b/cmd/dendrite-polylith-multi/personalities/userapi.go index f1fa379c..3fe5a43d 100644 --- a/cmd/dendrite-polylith-multi/personalities/userapi.go +++ b/cmd/dendrite-polylith-multi/personalities/userapi.go @@ -21,10 +21,8 @@ import ( ) func UserAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { - accountDB := base.CreateAccountsDB() - userAPI := userapi.NewInternalAPI( - base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, + base, &cfg.UserAPI, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient(), base.RoomserverHTTPClient(), base.PushGatewayHTTPClient(), ) diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index 5ecf1f2f..ead38136 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -180,7 +180,6 @@ func startup() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() federation := conn.CreateFederationClient(base, pSessions) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) @@ -189,7 +188,7 @@ func startup() { rsAPI := roomserver.NewInternalAPI(base) - userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) + userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) asQuery := appservice.NewInternalAPI( @@ -201,7 +200,6 @@ func startup() { monolith := setup.Monolith{ Config: base.Cfg, - AccountDB: accountDB, Client: conn.CreateClient(base, pSessions), FedClient: federation, KeyRing: keyRing, @@ -215,7 +213,7 @@ func startup() { ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation), } monolith.AddAllPublicRoutes( - base.ProcessContext, + base, base.PublicClientAPIMux, base.PublicFederationAPIMux, base.PublicKeyAPIMux, diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go index 30331fbb..c52fd6c4 100644 --- a/cmd/resolve-state/main.go +++ b/cmd/resolve-state/main.go @@ -45,7 +45,7 @@ func main() { panic(err) } - roomserverDB, err := storage.Open(&cfg.RoomServer.Database, cache) + roomserverDB, err := storage.Open(nil, &cfg.RoomServer.Database, cache) if err != nil { panic(err) } diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 1c11ef96..1647af15 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -54,6 +54,16 @@ global: # considered valid by other homeservers. key_validity_period: 168h0m0s + # Global database connection pool, for PostgreSQL monolith deployments only. If + # this section is populated then you can omit the "database" blocks in all other + # sections. For polylith deployments, or monolith deployments using SQLite databases, + # you must configure the "database" block for each component instead. + # database: + # connection_string: postgres://user:pass@hostname/database?sslmode=disable + # max_open_conns: 100 + # max_idle_conns: 5 + # conn_max_lifetime: -1 + # The server name to delegate server-server communications to, with optional port # e.g. localhost:443 well_known_server_name: "" diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 5bfe237a..1848a242 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -91,7 +91,7 @@ func NewInternalAPI( ) api.FederationInternalAPI { cfg := &base.Cfg.FederationAPI - federationDB, err := storage.NewDatabase(&cfg.Database, base.Caches, base.Cfg.Global.ServerName) + federationDB, err := storage.NewDatabase(base, &cfg.Database, base.Caches, base.Cfg.Global.ServerName) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index b2aea692..9863afb2 100644 --- a/federationapi/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -35,13 +36,12 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (*Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (*Database, error) { var d Database var err error - if d.db, err = sqlutil.Open(dbProperties); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } - d.writer = sqlutil.NewDummyWriter() joinedHosts, err := NewPostgresJoinedHostsTable(d.db) if err != nil { return nil, err diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index c2e83211..7d0cee90 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -34,13 +35,12 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (*Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (*Database, error) { var d Database var err error - if d.db, err = sqlutil.Open(dbProperties); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } - d.writer = sqlutil.NewExclusiveWriter() joinedHosts, err := NewSQLiteJoinedHostsTable(d.db) if err != nil { return nil, err diff --git a/federationapi/storage/storage.go b/federationapi/storage/storage.go index 4b52ca20..f246b9bc 100644 --- a/federationapi/storage/storage.go +++ b/federationapi/storage/storage.go @@ -23,17 +23,18 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage/postgres" "github.com/matrix-org/dendrite/federationapi/storage/sqlite3" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, cache, serverName) + return sqlite3.NewDatabase(base, dbProperties, cache, serverName) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties, cache, serverName) + return postgres.NewDatabase(base, dbProperties, cache, serverName) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/federationapi/storage/storage_wasm.go b/federationapi/storage/storage_wasm.go index 09abed63..84d5a3a4 100644 --- a/federationapi/storage/storage_wasm.go +++ b/federationapi/storage/storage_wasm.go @@ -19,15 +19,16 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage/sqlite3" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, cache, serverName) + return sqlite3.NewDatabase(base, dbProperties, cache, serverName) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/internal/sqlutil/sqlutil.go b/internal/sqlutil/sqlutil.go new file mode 100644 index 00000000..0cdae6d3 --- /dev/null +++ b/internal/sqlutil/sqlutil.go @@ -0,0 +1,51 @@ +package sqlutil + +import ( + "database/sql" + "fmt" + "regexp" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/sirupsen/logrus" +) + +// Open opens a database specified by its database driver name and a driver-specific data source name, +// usually consisting of at least a database name and connection information. Includes tracing driver +// if DENDRITE_TRACE_SQL=1 +func Open(dbProperties *config.DatabaseOptions, writer Writer) (*sql.DB, error) { + var err error + var driverName, dsn string + switch { + case dbProperties.ConnectionString.IsSQLite(): + driverName = "sqlite3" + dsn, err = ParseFileURI(dbProperties.ConnectionString) + if err != nil { + return nil, fmt.Errorf("ParseFileURI: %w", err) + } + case dbProperties.ConnectionString.IsPostgres(): + driverName = "postgres" + dsn = string(dbProperties.ConnectionString) + default: + return nil, fmt.Errorf("invalid database connection string %q", dbProperties.ConnectionString) + } + if tracingEnabled { + // install the wrapped driver + driverName += "-trace" + } + db, err := sql.Open(driverName, dsn) + if err != nil { + return nil, err + } + if driverName != "sqlite3" { + logrus.WithFields(logrus.Fields{ + "MaxOpenConns": dbProperties.MaxOpenConns(), + "MaxIdleConns": dbProperties.MaxIdleConns(), + "ConnMaxLifetime": dbProperties.ConnMaxLifetime(), + "dataSourceName": regexp.MustCompile(`://[^@]*@`).ReplaceAllLiteralString(dsn, "://"), + }).Debug("Setting DB connection limits") + db.SetMaxOpenConns(dbProperties.MaxOpenConns()) + db.SetMaxIdleConns(dbProperties.MaxIdleConns()) + db.SetConnMaxLifetime(dbProperties.ConnMaxLifetime()) + } + return db, nil +} diff --git a/internal/sqlutil/trace.go b/internal/sqlutil/trace.go index 51eaa1b4..c1673861 100644 --- a/internal/sqlutil/trace.go +++ b/internal/sqlutil/trace.go @@ -16,19 +16,16 @@ package sqlutil import ( "context" - "database/sql" "database/sql/driver" "fmt" "io" "os" - "regexp" "runtime" "strconv" "strings" "sync" "time" - "github.com/matrix-org/dendrite/setup/config" "github.com/ngrok/sqlmw" "github.com/sirupsen/logrus" ) @@ -96,47 +93,6 @@ func trackGoID(query string) { logrus.Warnf("unsafe goid %d: SQL executed not on an ExclusiveWriter: %s", thisGoID, q) } -// Open opens a database specified by its database driver name and a driver-specific data source name, -// usually consisting of at least a database name and connection information. Includes tracing driver -// if DENDRITE_TRACE_SQL=1 -func Open(dbProperties *config.DatabaseOptions) (*sql.DB, error) { - var err error - var driverName, dsn string - switch { - case dbProperties.ConnectionString.IsSQLite(): - driverName = "sqlite3" - dsn, err = ParseFileURI(dbProperties.ConnectionString) - if err != nil { - return nil, fmt.Errorf("ParseFileURI: %w", err) - } - case dbProperties.ConnectionString.IsPostgres(): - driverName = "postgres" - dsn = string(dbProperties.ConnectionString) - default: - return nil, fmt.Errorf("invalid database connection string %q", dbProperties.ConnectionString) - } - if tracingEnabled { - // install the wrapped driver - driverName += "-trace" - } - db, err := sql.Open(driverName, dsn) - if err != nil { - return nil, err - } - if driverName != "sqlite3" { - logrus.WithFields(logrus.Fields{ - "MaxOpenConns": dbProperties.MaxOpenConns(), - "MaxIdleConns": dbProperties.MaxIdleConns(), - "ConnMaxLifetime": dbProperties.ConnMaxLifetime(), - "dataSourceName": regexp.MustCompile(`://[^@]*@`).ReplaceAllLiteralString(dsn, "://"), - }).Debug("Setting DB connection limits") - db.SetMaxOpenConns(dbProperties.MaxOpenConns()) - db.SetMaxIdleConns(dbProperties.MaxIdleConns()) - db.SetConnMaxLifetime(dbProperties.ConnMaxLifetime()) - } - return db, nil -} - func init() { registerDrivers() } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index c557dfba..007a48a5 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -41,7 +41,7 @@ func NewInternalAPI( ) api.KeyInternalAPI { js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - db, err := storage.NewDatabase(&cfg.Database) + db, err := storage.NewDatabase(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") } diff --git a/keyserver/storage/postgres/storage.go b/keyserver/storage/postgres/storage.go index d4c7e2cc..b8f70acf 100644 --- a/keyserver/storage/postgres/storage.go +++ b/keyserver/storage/postgres/storage.go @@ -18,13 +18,14 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/keyserver/storage/shared" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase creates a new sync server database -func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.Database, error) { var err error - db, err := sqlutil.Open(dbProperties) + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) if err != nil { return nil, err } @@ -63,7 +64,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) } d := &shared.Database{ DB: db, - Writer: sqlutil.NewDummyWriter(), + Writer: writer, OneTimeKeysTable: otk, DeviceKeysTable: dk, KeyChangesTable: kc, diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index 84d4cdf5..aeea9eac 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -18,11 +18,12 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/keyserver/storage/shared" "github.com/matrix-org/dendrite/keyserver/storage/sqlite3/deltas" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) { - db, err := sqlutil.Open(dbProperties) +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.Database, error) { + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) if err != nil { return nil, err } @@ -62,7 +63,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) } d := &shared.Database{ DB: db, - Writer: sqlutil.NewExclusiveWriter(), + Writer: writer, OneTimeKeysTable: otk, DeviceKeysTable: dk, KeyChangesTable: kc, diff --git a/keyserver/storage/storage.go b/keyserver/storage/storage.go index 742e8463..ab6a3540 100644 --- a/keyserver/storage/storage.go +++ b/keyserver/storage/storage.go @@ -22,17 +22,18 @@ import ( "github.com/matrix-org/dendrite/keyserver/storage/postgres" "github.com/matrix-org/dendrite/keyserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) // and sets postgres connection parameters -func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties) + return postgres.NewDatabase(base, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 84d2098a..9940eac6 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -22,7 +22,7 @@ func MustCreateDatabase(t *testing.T) (Database, func()) { log.Fatal(err) } t.Logf("Database %s", tmpfile.Name()) - db, err := NewDatabase(&config.DatabaseOptions{ + db, err := NewDatabase(nil, &config.DatabaseOptions{ ConnectionString: config.DataSource(fmt.Sprintf("file://%s", tmpfile.Name())), }) if err != nil { diff --git a/keyserver/storage/storage_wasm.go b/keyserver/storage/storage_wasm.go index 8b31bfd0..75c9053e 100644 --- a/keyserver/storage/storage_wasm.go +++ b/keyserver/storage/storage_wasm.go @@ -18,13 +18,14 @@ import ( "fmt" "github.com/matrix-org/dendrite/keyserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/mediaapi/mediaapi.go b/mediaapi/mediaapi.go index e5daf480..f2fa1438 100644 --- a/mediaapi/mediaapi.go +++ b/mediaapi/mediaapi.go @@ -18,6 +18,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/mediaapi/routing" "github.com/matrix-org/dendrite/mediaapi/storage" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -26,13 +27,14 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the MediaAPI component. func AddPublicRoutes( + base *base.BaseDendrite, router *mux.Router, cfg *config.MediaAPI, rateLimit *config.RateLimiting, userAPI userapi.UserInternalAPI, client *gomatrixserverlib.Client, ) { - mediaDB, err := storage.NewMediaAPIDatasource(&cfg.Database) + mediaDB, err := storage.NewMediaAPIDatasource(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to media db") } diff --git a/mediaapi/routing/upload_test.go b/mediaapi/routing/upload_test.go index e04c010f..420d0eba 100644 --- a/mediaapi/routing/upload_test.go +++ b/mediaapi/routing/upload_test.go @@ -50,7 +50,7 @@ func Test_uploadRequest_doUpload(t *testing.T) { _ = os.Mkdir(testdataPath, os.ModePerm) defer fileutils.RemoveDir(types.Path(testdataPath), nil) - db, err := storage.NewMediaAPIDatasource(&config.DatabaseOptions{ + db, err := storage.NewMediaAPIDatasource(nil, &config.DatabaseOptions{ ConnectionString: "file::memory:?cache=shared", MaxOpenConnections: 100, MaxIdleConnections: 2, diff --git a/mediaapi/storage/postgres/mediaapi.go b/mediaapi/storage/postgres/mediaapi.go index ea70e575..30ec64f8 100644 --- a/mediaapi/storage/postgres/mediaapi.go +++ b/mediaapi/storage/postgres/mediaapi.go @@ -20,12 +20,13 @@ import ( _ "github.com/lib/pq" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage/shared" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a postgres database. -func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) { - db, err := sqlutil.Open(dbProperties) +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.Database, error) { + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) if err != nil { return nil, err } @@ -41,6 +42,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) MediaRepository: mediaRepo, Thumbnails: thumbnails, DB: db, - Writer: sqlutil.NewExclusiveWriter(), + Writer: writer, }, nil } diff --git a/mediaapi/storage/sqlite3/mediaapi.go b/mediaapi/storage/sqlite3/mediaapi.go index abf32936..c0ab10e9 100644 --- a/mediaapi/storage/sqlite3/mediaapi.go +++ b/mediaapi/storage/sqlite3/mediaapi.go @@ -19,12 +19,13 @@ import ( // Import the postgres database driver. "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage/shared" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a SQLIte database. -func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) { - db, err := sqlutil.Open(dbProperties) +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.Database, error) { + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) if err != nil { return nil, err } @@ -40,6 +41,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) MediaRepository: mediaRepo, Thumbnails: thumbnails, DB: db, - Writer: sqlutil.NewExclusiveWriter(), + Writer: writer, }, nil } diff --git a/mediaapi/storage/storage.go b/mediaapi/storage/storage.go index baa242e5..f673ae7e 100644 --- a/mediaapi/storage/storage.go +++ b/mediaapi/storage/storage.go @@ -22,16 +22,17 @@ import ( "github.com/matrix-org/dendrite/mediaapi/storage/postgres" "github.com/matrix-org/dendrite/mediaapi/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewMediaAPIDatasource opens a database connection. -func NewMediaAPIDatasource(dbProperties *config.DatabaseOptions) (Database, error) { +func NewMediaAPIDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties) + return postgres.NewDatabase(base, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/mediaapi/storage/storage_test.go b/mediaapi/storage/storage_test.go index fa88cd8e..81f0a5d2 100644 --- a/mediaapi/storage/storage_test.go +++ b/mediaapi/storage/storage_test.go @@ -13,7 +13,7 @@ import ( func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewMediaAPIDatasource(&config.DatabaseOptions{ + db, err := storage.NewMediaAPIDatasource(nil, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { diff --git a/mediaapi/storage/storage_wasm.go b/mediaapi/storage/storage_wasm.go index f67f9d5e..41e4a28c 100644 --- a/mediaapi/storage/storage_wasm.go +++ b/mediaapi/storage/storage_wasm.go @@ -18,14 +18,15 @@ import ( "fmt" "github.com/matrix-org/dendrite/mediaapi/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // Open opens a postgres database. -func NewMediaAPIDatasource(dbProperties *config.DatabaseOptions) (Database, error) { +func NewMediaAPIDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go index 81c86ae3..5d34842b 100644 --- a/roomserver/internal/input/input_test.go +++ b/roomserver/internal/input/input_test.go @@ -53,6 +53,7 @@ func TestSingleTransactionOnInput(t *testing.T) { t.Fatal(err) } db, err := storage.Open( + nil, &config.DatabaseOptions{ ConnectionString: "", MaxOpenConnections: 1, diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 36e3c526..46261eb3 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -45,7 +45,7 @@ func NewInternalAPI( perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) } - roomserverDB, err := storage.Open(&cfg.Database, base.Caches) + roomserverDB, err := storage.Open(base, &cfg.Database, base.Caches) if err != nil { logrus.WithError(err).Panicf("failed to connect to room server db") } diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index b5e05c98..da8d2584 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/shared" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) @@ -35,11 +36,11 @@ type Database struct { } // Open a postgres database. -func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { +func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { var d Database - var db *sql.DB var err error - if db, err = sqlutil.Open(dbProperties); err != nil { + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) + if err != nil { return nil, fmt.Errorf("sqlutil.Open: %w", err) } @@ -59,7 +60,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) // Then prepare the statements. Now that the migrations have run, any columns referred // to in the database code should now exist. - if err := d.prepare(db, cache); err != nil { + if err := d.prepare(db, writer, cache); err != nil { return nil, err } @@ -110,7 +111,7 @@ func (d *Database) create(db *sql.DB) error { return nil } -func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { +func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.RoomServerCaches) error { eventStateKeys, err := prepareEventStateKeysTable(db) if err != nil { return err @@ -166,7 +167,7 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { d.Database = shared.Database{ DB: db, Cache: cache, - Writer: sqlutil.NewDummyWriter(), + Writer: writer, EventTypesTable: eventTypes, EventStateKeysTable: eventStateKeys, EventJSONTable: eventJSON, diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 325c253b..e6cf1a53 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -18,12 +18,14 @@ package sqlite3 import ( "context" "database/sql" + "fmt" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -34,12 +36,12 @@ type Database struct { } // Open a sqlite database. -func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { +func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { var d Database - var db *sql.DB var err error - if db, err = sqlutil.Open(dbProperties); err != nil { - return nil, err + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) + if err != nil { + return nil, fmt.Errorf("sqlutil.Open: %w", err) } //db.Exec("PRAGMA journal_mode=WAL;") @@ -49,7 +51,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) // cause the roomserver to be unresponsive to new events because something will // acquire the global mutex and never unlock it because it is waiting for a connection // which it will never obtain. - db.SetMaxOpenConns(20) + // db.SetMaxOpenConns(20) // Create the tables. if err := d.create(db); err != nil { @@ -67,7 +69,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) // Then prepare the statements. Now that the migrations have run, any columns referred // to in the database code should now exist. - if err := d.prepare(db, cache); err != nil { + if err := d.prepare(db, writer, cache); err != nil { return nil, err } @@ -118,7 +120,7 @@ func (d *Database) create(db *sql.DB) error { return nil } -func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { +func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.RoomServerCaches) error { eventStateKeys, err := prepareEventStateKeysTable(db) if err != nil { return err @@ -174,7 +176,7 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { d.Database = shared.Database{ DB: db, Cache: cache, - Writer: sqlutil.NewExclusiveWriter(), + Writer: writer, EventsTable: events, EventTypesTable: eventTypes, EventStateKeysTable: eventStateKeys, diff --git a/roomserver/storage/storage.go b/roomserver/storage/storage.go index 9f98ea3e..8a87b7d7 100644 --- a/roomserver/storage/storage.go +++ b/roomserver/storage/storage.go @@ -23,16 +23,17 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/storage/postgres" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // Open opens a database connection. -func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { +func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.Open(dbProperties, cache) + return sqlite3.Open(base, dbProperties, cache) case dbProperties.ConnectionString.IsPostgres(): - return postgres.Open(dbProperties, cache) + return postgres.Open(base, dbProperties, cache) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/roomserver/storage/storage_wasm.go b/roomserver/storage/storage_wasm.go index dfc374e6..df5a56ac 100644 --- a/roomserver/storage/storage_wasm.go +++ b/roomserver/storage/storage_wasm.go @@ -19,14 +19,15 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewPublicRoomsServerDatabase opens a database connection. -func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { +func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.Open(dbProperties, cache) + return sqlite3.Open(base, dbProperties, cache) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/setup/base/base.go b/setup/base/base.go index 4b771aa3..9b227b70 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -17,6 +17,7 @@ package base import ( "context" "crypto/tls" + "database/sql" "fmt" "io" "net" @@ -32,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/atomic" @@ -40,7 +42,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/process" - userdb "github.com/matrix-org/dendrite/userapi/storage" "github.com/gorilla/mux" "github.com/kardianos/minwinsvc" @@ -81,6 +82,8 @@ type BaseDendrite struct { Cfg *config.Dendrite Caches *caching.Caches DNSCache *gomatrixserverlib.DNSCache + Database *sql.DB + DatabaseWriter sqlutil.Writer } const NoListener = "" @@ -112,7 +115,8 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base } configErrors := &config.ConfigErrors{} - cfg.Verify(configErrors, componentName == "Monolith") // TODO: better way? + isMonolith := componentName == "Monolith" // TODO: better way? + cfg.Verify(configErrors, isMonolith) if len(*configErrors) > 0 { for _, err := range *configErrors { logrus.Errorf("Configuration error: %s", err) @@ -185,6 +189,24 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base }, } + // If we're in monolith mode, we'll set up a global pool of database + // connections. A component is welcome to use this pool if they don't + // have a separate database config of their own. + var db *sql.DB + var writer sqlutil.Writer + if cfg.Global.DatabaseOptions.ConnectionString != "" { + if !isMonolith { + logrus.Panic("Using a global database connection pool is not supported in polylith deployments") + } + if cfg.Global.DatabaseOptions.ConnectionString.IsSQLite() { + logrus.Panic("Using a global database connection pool is not supported with SQLite databases") + } + if db, err = sqlutil.Open(&cfg.Global.DatabaseOptions, sqlutil.NewDummyWriter()); err != nil { + logrus.WithError(err).Panic("Failed to set up global database connections") + } + logrus.Debug("Using global database connection pool") + } + // Ideally we would only use SkipClean on routes which we know can allow '/' but due to // https://github.com/gorilla/mux/issues/460 we have to attach this at the top router. // When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing @@ -214,6 +236,8 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(), SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, + Database: db, // set if monolith with global connection pool only + DatabaseWriter: writer, // set if monolith with global connection pool only } } @@ -222,6 +246,29 @@ func (b *BaseDendrite) Close() error { return b.tracerCloser.Close() } +// DatabaseConnection assists in setting up a database connection. It accepts +// the database properties and a new writer for the given component. If we're +// running in monolith mode with a global connection pool configured then we +// will return that connection, along with the global writer, effectively +// ignoring the options provided. Otherwise we'll open a new database connection +// using the supplied options and writer. Note that it's possible for the pointer +// receiver to be nil here – that's deliberate as some of the unit tests don't +// have a BaseDendrite and just want a connection with the supplied config +// without any pooling stuff. +func (b *BaseDendrite) DatabaseConnection(dbProperties *config.DatabaseOptions, writer sqlutil.Writer) (*sql.DB, sqlutil.Writer, error) { + if dbProperties.ConnectionString != "" || b == nil { + // Open a new database connection using the supplied config. + db, err := sqlutil.Open(dbProperties, writer) + return db, writer, err + } + if b.Database != nil && b.DatabaseWriter != nil { + // Ignore the supplied config and return the global pool and + // writer. + return b.Database, b.DatabaseWriter, nil + } + return nil, nil, fmt.Errorf("no database connections configured") +} + // AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP. func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI { a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.apiHttpClient) @@ -273,24 +320,6 @@ func (b *BaseDendrite) PushGatewayHTTPClient() pushgateway.Client { return pushgateway.NewHTTPClient(b.Cfg.UserAPI.PushGatewayDisableTLSValidation) } -// CreateAccountsDB creates a new instance of the accounts database. Should only -// be called once per component. -func (b *BaseDendrite) CreateAccountsDB() userdb.Database { - db, err := userdb.NewUserAPIDatabase( - &b.Cfg.UserAPI.AccountDatabase, - b.Cfg.Global.ServerName, - b.Cfg.UserAPI.BCryptCost, - b.Cfg.UserAPI.OpenIDTokenLifetimeMS, - userapi.DefaultLoginTokenLifetime, - b.Cfg.Global.ServerNotices.LocalPart, - ) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to accounts db") - } - - return db -} - // CreateClient creates a new client (normally used for media fetch requests). // Should only be called once per component. func (b *BaseDendrite) CreateClient() *gomatrixserverlib.Client { diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go index 3f4e1c91..d93b6ebe 100644 --- a/setup/config/config_appservice.go +++ b/setup/config/config_appservice.go @@ -52,7 +52,9 @@ func (c *AppServiceAPI) Defaults(generate bool) { func (c *AppServiceAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { checkURL(configErrs, "app_service_api.internal_api.listen", string(c.InternalAPI.Listen)) checkURL(configErrs, "app_service_api.internal_api.bind", string(c.InternalAPI.Connect)) - checkNotEmpty(configErrs, "app_service_api.database.connection_string", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "app_service_api.database.connection_string", string(c.Database.ConnectionString)) + } } // ApplicationServiceNamespace is the namespace that a specific application diff --git a/setup/config/config_federationapi.go b/setup/config/config_federationapi.go index 176334dd..f62a23e1 100644 --- a/setup/config/config_federationapi.go +++ b/setup/config/config_federationapi.go @@ -49,7 +49,9 @@ func (c *FederationAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { if !isMonolith { checkURL(configErrs, "federation_api.external_api.listen", string(c.ExternalAPI.Listen)) } - checkNotEmpty(configErrs, "federation_api.database.connection_string", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "federation_api.database.connection_string", string(c.Database.ConnectionString)) + } } // The config for setting a proxy to use for server->server requests diff --git a/setup/config/config_global.go b/setup/config/config_global.go index c1650f07..d609e246 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -34,6 +34,13 @@ type Global struct { // Defaults to 24 hours. KeyValidityPeriod time.Duration `yaml:"key_validity_period"` + // Global pool of database connections, which is used only in monolith mode. If a + // component does not specify any database options of its own, then this pool of + // connections will be used instead. This way we don't have to manage connection + // counts on a per-component basis, but can instead do it for the entire monolith. + // In a polylith deployment, this will be ignored. + DatabaseOptions DatabaseOptions `yaml:"database"` + // The server name to delegate server-server communications to, with optional port WellKnownServerName string `yaml:"well_known_server_name"` diff --git a/setup/config/config_keyserver.go b/setup/config/config_keyserver.go index 6180ccbc..9e2d54cd 100644 --- a/setup/config/config_keyserver.go +++ b/setup/config/config_keyserver.go @@ -20,5 +20,7 @@ func (c *KeyServer) Defaults(generate bool) { func (c *KeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) { checkURL(configErrs, "key_server.internal_api.listen", string(c.InternalAPI.Listen)) checkURL(configErrs, "key_server.internal_api.bind", string(c.InternalAPI.Connect)) - checkNotEmpty(configErrs, "key_server.database.connection_string", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "key_server.database.connection_string", string(c.Database.ConnectionString)) + } } diff --git a/setup/config/config_mediaapi.go b/setup/config/config_mediaapi.go index c85020d2..273de322 100644 --- a/setup/config/config_mediaapi.go +++ b/setup/config/config_mediaapi.go @@ -58,7 +58,9 @@ func (c *MediaAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { if !isMonolith { checkURL(configErrs, "media_api.external_api.listen", string(c.ExternalAPI.Listen)) } - checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString)) + } checkNotEmpty(configErrs, "media_api.base_path", string(c.BasePath)) checkPositive(configErrs, "media_api.max_file_size_bytes", int64(c.MaxFileSizeBytes)) diff --git a/setup/config/config_mscs.go b/setup/config/config_mscs.go index 66a4c80c..b992f715 100644 --- a/setup/config/config_mscs.go +++ b/setup/config/config_mscs.go @@ -31,5 +31,7 @@ func (c *MSCs) Enabled(msc string) bool { } func (c *MSCs) Verify(configErrs *ConfigErrors, isMonolith bool) { - checkNotEmpty(configErrs, "mscs.database.connection_string", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "mscs.database.connection_string", string(c.Database.ConnectionString)) + } } diff --git a/setup/config/config_roomserver.go b/setup/config/config_roomserver.go index 73abb4f4..8a322734 100644 --- a/setup/config/config_roomserver.go +++ b/setup/config/config_roomserver.go @@ -20,5 +20,7 @@ func (c *RoomServer) Defaults(generate bool) { func (c *RoomServer) Verify(configErrs *ConfigErrors, isMonolith bool) { checkURL(configErrs, "room_server.internal_api.listen", string(c.InternalAPI.Listen)) checkURL(configErrs, "room_server.internal_ap.bind", string(c.InternalAPI.Connect)) - checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString)) + } } diff --git a/setup/config/config_syncapi.go b/setup/config/config_syncapi.go index dc813cb7..48fd9f50 100644 --- a/setup/config/config_syncapi.go +++ b/setup/config/config_syncapi.go @@ -27,5 +27,7 @@ func (c *SyncAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { if !isMonolith { checkURL(configErrs, "sync_api.external_api.listen", string(c.ExternalAPI.Listen)) } - checkNotEmpty(configErrs, "sync_api.database", string(c.Database.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "sync_api.database", string(c.Database.ConnectionString)) + } } diff --git a/setup/config/config_userapi.go b/setup/config/config_userapi.go index 570dc603..4aa3b57b 100644 --- a/setup/config/config_userapi.go +++ b/setup/config/config_userapi.go @@ -37,6 +37,8 @@ func (c *UserAPI) Defaults(generate bool) { func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { checkURL(configErrs, "user_api.internal_api.listen", string(c.InternalAPI.Listen)) checkURL(configErrs, "user_api.internal_api.connect", string(c.InternalAPI.Connect)) - checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString)) + if c.Matrix.DatabaseOptions.ConnectionString == "" { + checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString)) + } checkPositive(configErrs, "user_api.openid_token_lifetime_ms", c.OpenIDTokenLifetimeMS) } diff --git a/setup/monolith.go b/setup/monolith.go index c86ec7b6..a414172c 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -25,11 +25,10 @@ import ( keyAPI "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/mediaapi" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi" userapi "github.com/matrix-org/dendrite/userapi/api" - userdb "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/gomatrixserverlib" ) @@ -37,7 +36,6 @@ import ( // all components of Dendrite, for use in monolith mode. type Monolith struct { Config *config.Dendrite - AccountDB userdb.Database KeyRing *gomatrixserverlib.KeyRing Client *gomatrixserverlib.Client FedClient *gomatrixserverlib.FederationClient @@ -54,26 +52,28 @@ type Monolith struct { } // AddAllPublicRoutes attaches all public paths to the given router -func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux, dendriteMux *mux.Router) { +func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux, dendriteMux *mux.Router) { userDirectoryProvider := m.ExtUserDirectoryProvider if userDirectoryProvider == nil { userDirectoryProvider = m.UserAPI } clientapi.AddPublicRoutes( - process, csMux, synapseMux, dendriteMux, &m.Config.ClientAPI, - m.FedClient, m.RoomserverAPI, - m.AppserviceAPI, transactions.New(), + base.ProcessContext, csMux, synapseMux, dendriteMux, &m.Config.ClientAPI, + m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(), m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI, m.ExtPublicRoomsProvider, &m.Config.MSCs, ) federationapi.AddPublicRoutes( - process, ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, - m.KeyRing, m.RoomserverAPI, m.FederationAPI, + base.ProcessContext, ssMux, keyMux, wkMux, &m.Config.FederationAPI, + m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, m.KeyAPI, &m.Config.MSCs, nil, ) - mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client) + mediaapi.AddPublicRoutes( + base, mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, + m.UserAPI, m.Client, + ) syncapi.AddPublicRoutes( - process, csMux, m.UserAPI, m.RoomserverAPI, + base, csMux, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, &m.Config.SyncAPI, ) } diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index 29c781a8..452b1458 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -102,7 +102,7 @@ func Enable( base *base.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, ) error { - db, err := NewDatabase(&base.Cfg.MSCs.Database) + db, err := NewDatabase(base, &base.Cfg.MSCs.Database) if err != nil { return fmt.Errorf("cannot enable MSC2836: %w", err) } diff --git a/setup/mscs/msc2836/storage.go b/setup/mscs/msc2836/storage.go index 72523916..827e82f7 100644 --- a/setup/mscs/msc2836/storage.go +++ b/setup/mscs/msc2836/storage.go @@ -8,6 +8,7 @@ import ( "encoding/json" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -58,19 +59,17 @@ type DB struct { } // NewDatabase loads the database for msc2836 -func NewDatabase(dbOpts *config.DatabaseOptions) (Database, error) { +func NewDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) { if dbOpts.ConnectionString.IsPostgres() { - return newPostgresDatabase(dbOpts) + return newPostgresDatabase(base, dbOpts) } - return newSQLiteDatabase(dbOpts) + return newSQLiteDatabase(base, dbOpts) } -func newPostgresDatabase(dbOpts *config.DatabaseOptions) (Database, error) { - d := DB{ - writer: sqlutil.NewDummyWriter(), - } +func newPostgresDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) { + d := DB{} var err error - if d.db, err = sqlutil.Open(dbOpts); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbOpts, sqlutil.NewDummyWriter()); err != nil { return nil, err } _, err = d.db.Exec(` @@ -145,12 +144,10 @@ func newPostgresDatabase(dbOpts *config.DatabaseOptions) (Database, error) { return &d, err } -func newSQLiteDatabase(dbOpts *config.DatabaseOptions) (Database, error) { - d := DB{ - writer: sqlutil.NewExclusiveWriter(), - } +func newSQLiteDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) { + d := DB{} var err error - if d.db, err = sqlutil.Open(dbOpts); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbOpts, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } _, err = d.db.Exec(` diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index b0382512..9cfe7c07 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -21,6 +21,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" @@ -35,13 +36,12 @@ type SyncServerDatasource struct { } // NewDatabase creates a new sync server database -func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, err = sqlutil.Open(dbProperties); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } - d.writer = sqlutil.NewDummyWriter() accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index dfc28948..e08a0ba8 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" @@ -35,13 +36,12 @@ type SyncServerDatasource struct { // NewDatabase creates a new sync server database // nolint: gocyclo -func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, err = sqlutil.Open(dbProperties); err != nil { + if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } - d.writer = sqlutil.NewExclusiveWriter() if err = d.prepare(dbProperties); err != nil { return nil, err } diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index 7f9c28e9..5b20c6cc 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -20,18 +20,19 @@ package storage import ( "fmt" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewSyncServerDatasource opens a database connection. -func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties) + return postgres.NewDatabase(base, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 15bb769a..1150c2f3 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -17,7 +17,7 @@ var ctx = context.Background() func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewSyncServerDatasource(&config.DatabaseOptions{ + db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go index f7fef962..c1544474 100644 --- a/syncapi/storage/storage_wasm.go +++ b/syncapi/storage/storage_wasm.go @@ -17,15 +17,16 @@ package storage import ( "fmt" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewPublicRoomsServerDatabase opens a database connection. -func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties) + return sqlite3.NewDatabase(base, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go index a143e5ec..8bbf879d 100644 --- a/syncapi/storage/tables/output_room_events_test.go +++ b/syncapi/storage/tables/output_room_events_test.go @@ -21,7 +21,7 @@ func newOutputRoomEventsTable(t *testing.T, dbType test.DBType) (tables.Events, connStr, close := test.PrepareDBConnectionString(t, dbType) db, err := sqlutil.Open(&config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }) + }, sqlutil.NewExclusiveWriter()) if err != nil { t.Fatalf("failed to open db: %s", err) } diff --git a/syncapi/storage/tables/topology_test.go b/syncapi/storage/tables/topology_test.go index b6ece0b0..2334aae2 100644 --- a/syncapi/storage/tables/topology_test.go +++ b/syncapi/storage/tables/topology_test.go @@ -20,7 +20,7 @@ func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.D connStr, close := test.PrepareDBConnectionString(t, dbType) db, err := sqlutil.Open(&config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }) + }, sqlutil.NewExclusiveWriter()) if err != nil { t.Fatalf("failed to open db: %s", err) } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index b2d333f7..a2b8859c 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -23,9 +23,9 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -41,7 +41,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI // component. func AddPublicRoutes( - process *process.ProcessContext, + base *base.BaseDendrite, router *mux.Router, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, @@ -49,9 +49,9 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) + syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } @@ -86,7 +86,7 @@ func AddPublicRoutes( } keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) @@ -95,7 +95,7 @@ func AddPublicRoutes( } roomConsumer := consumers.NewOutputRoomEventConsumer( - process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer, ) if err = roomConsumer.Start(); err != nil { @@ -103,7 +103,7 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, userAPIReadUpdateProducer, ) if err = clientConsumer.Start(); err != nil { @@ -111,28 +111,28 @@ func AddPublicRoutes( } notificationConsumer := consumers.NewOutputNotificationDataConsumer( - process, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, ) if err = notificationConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start notification data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - process, cfg, js, eduCache, notifier, streams.TypingStreamProvider, + base.ProcessContext, cfg, js, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - process, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, + base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, userAPIReadUpdateProducer, ) if err = receiptConsumer.Start(); err != nil { @@ -140,7 +140,7 @@ func AddPublicRoutes( } presenceConsumer := consumers.NewPresenceConsumer( - process, cfg, js, natsClient, syncDB, + base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.PresenceStreamProvider, userAPI, ) diff --git a/userapi/storage/postgres/storage.go b/userapi/storage/postgres/storage.go index b2a51760..74100a72 100644 --- a/userapi/storage/postgres/storage.go +++ b/userapi/storage/postgres/storage.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/userapi/storage/shared" @@ -30,8 +31,8 @@ import ( ) // NewDatabase creates a new accounts and profiles database -func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { - db, err := sqlutil.Open(dbProperties) +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) if err != nil { return nil, err } @@ -107,7 +108,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver Notifications: notificationsTable, ServerName: serverName, DB: db, - Writer: sqlutil.NewDummyWriter(), + Writer: writer, LoginTokenLifetime: loginTokenLifetime, BcryptCost: bcryptCost, OpenIDTokenLifetimeMS: openIDTokenLifetimeMS, diff --git a/userapi/storage/sqlite3/storage.go b/userapi/storage/sqlite3/storage.go index 03c013f0..6858d3d1 100644 --- a/userapi/storage/sqlite3/storage.go +++ b/userapi/storage/sqlite3/storage.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/shared" @@ -31,8 +32,8 @@ import ( ) // NewDatabase creates a new accounts and profiles database -func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { - db, err := sqlutil.Open(dbProperties) +func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { + db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) if err != nil { return nil, err } @@ -108,7 +109,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver Notifications: notificationsTable, ServerName: serverName, DB: db, - Writer: sqlutil.NewExclusiveWriter(), + Writer: writer, LoginTokenLifetime: loginTokenLifetime, BcryptCost: bcryptCost, OpenIDTokenLifetimeMS: openIDTokenLifetimeMS, diff --git a/userapi/storage/storage.go b/userapi/storage/storage.go index faf1ce75..42221e75 100644 --- a/userapi/storage/storage.go +++ b/userapi/storage/storage.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/postgres" "github.com/matrix-org/dendrite/userapi/storage/sqlite3" @@ -30,12 +31,12 @@ import ( // NewUserAPIDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) // and sets postgres connection parameters -func NewUserAPIDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (Database, error) { +func NewUserAPIDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + return sqlite3.NewDatabase(base, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + return postgres.NewDatabase(base, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/userapi/storage/storage_test.go b/userapi/storage/storage_test.go index ef25b800..79d5a8da 100644 --- a/userapi/storage/storage_test.go +++ b/userapi/storage/storage_test.go @@ -29,7 +29,7 @@ var ( func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewUserAPIDatabase(&config.DatabaseOptions{ + db, err := storage.NewUserAPIDatabase(nil, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }, "localhost", bcrypt.MinCost, openIDLifetimeMS, loginTokenLifetime, "_server") if err != nil { diff --git a/userapi/storage/storage_wasm.go b/userapi/storage/storage_wasm.go index a8e6f031..5d5d292e 100644 --- a/userapi/storage/storage_wasm.go +++ b/userapi/storage/storage_wasm.go @@ -18,12 +18,14 @@ import ( "fmt" "time" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/sqlite3" "github.com/matrix-org/gomatrixserverlib" ) func NewUserAPIDatabase( + base *base.BaseDendrite, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, @@ -33,7 +35,7 @@ func NewUserAPIDatabase( ) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + return sqlite3.NewDatabase(base, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/userapi/userapi.go b/userapi/userapi.go index e91ce3a7..9174119e 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -42,12 +42,25 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *base.BaseDendrite, db storage.Database, cfg *config.UserAPI, + base *base.BaseDendrite, cfg *config.UserAPI, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI, rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client, ) api.UserInternalAPI { js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + db, err := storage.NewUserAPIDatabase( + base, + &cfg.AccountDatabase, + cfg.Matrix.ServerName, + cfg.BCryptCost, + cfg.OpenIDTokenLifetimeMS, + api.DefaultLoginTokenLifetime, + cfg.Matrix.ServerNotices.LocalPart, + ) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to accounts db") + } + syncProducer := producers.NewSyncAPI( db, js, // TODO: user API should handle syncs for account data. Right now, diff --git a/userapi/userapi_test.go b/userapi/userapi_test.go index 076b4f3c..64e23909 100644 --- a/userapi/userapi_test.go +++ b/userapi/userapi_test.go @@ -52,7 +52,7 @@ func MustMakeInternalAPI(t *testing.T, opts apiTestOpts) (api.UserInternalAPI, s MaxOpenConnections: 1, MaxIdleConnections: 1, } - accountDB, err := storage.NewUserAPIDatabase(dbopts, serverName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime, "") + accountDB, err := storage.NewUserAPIDatabase(nil, dbopts, serverName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime, "") if err != nil { t.Fatalf("failed to create account DB: %s", err) } |