From a25d477cdb8f1ba49b3b5e9d931f808ae45b4853 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 13 May 2020 17:28:42 +0100 Subject: Initial syncapi storage refactor to share pq/sqlite code (#1030) * Initial syncapi storage refactor to share pq/sqlite code This goes down a different route than https://github.com/matrix-org/dendrite/pull/985 which tried to even reduce the boilerplate of `ExecContext` etc. The previous pattern fails badly when there are subtle differences in parameters and hence the shared boilerplate to read from `QueryContext` breaks. Rather than attacking it at that level, the main place where we want to reuse code is for the `syncserver.go` itself - the database implementation which has lots of complex logic. So instead, this commit: - Makes `invites_table.go` an interface. - Makes `SyncServerDatasource` use that interface - This means some functions are now identical for pq/sqlite, so factor them out to a temporary `shared.Database` struct which will grow until it replaces all of `SyncServerDatasource`. * Missing files --- syncapi/storage/postgres/syncserver.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'syncapi/storage/postgres/syncserver.go') diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 90976168..9883c362 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -32,6 +32,7 @@ import ( _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -54,10 +55,10 @@ type SyncServerDatasource struct { accountData accountDataStatements events outputRoomEventsStatements roomstate currentRoomStateStatements - invites inviteEventsStatements eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities tables.BackwardsExtremities + shared *shared.Database } // NewSyncServerDatasource creates a new sync server database @@ -79,7 +80,8 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp if err = d.roomstate.prepare(d.db); err != nil { return nil, err } - if err = d.invites.prepare(d.db); err != nil { + invites, err := NewPostgresInvitesTable(d.db) + if err != nil { return nil, err } if err = d.topology.prepare(d.db); err != nil { @@ -90,6 +92,10 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp return nil, err } d.eduCache = cache.New() + d.shared = &shared.Database{ + DB: d.db, + Invites: invites, + } return &d, nil } @@ -340,7 +346,7 @@ func (d *SyncServerDatasource) syncStreamPositionTx( if maxAccountDataID > maxID { maxID = maxAccountDataID } - maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn) + maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return 0, err } @@ -365,7 +371,7 @@ func (d *SyncServerDatasource) syncPositionTx( if maxAccountDataID > maxEventID { maxEventID = maxAccountDataID } - maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn) + maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return sp, err } @@ -662,17 +668,14 @@ func (d *SyncServerDatasource) UpsertAccountData( func (d *SyncServerDatasource) AddInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, -) (types.StreamPosition, error) { - return d.invites.insertInviteEvent(ctx, inviteEvent) +) (sp types.StreamPosition, err error) { + return d.shared.AddInviteEvent(ctx, inviteEvent) } func (d *SyncServerDatasource) RetireInviteEvent( ctx context.Context, inviteEventID string, ) error { - // TODO: Record that invite has been retired in a stream so that we can - // notify the user in an incremental sync. - err := d.invites.deleteInviteEvent(ctx, inviteEventID) - return err + return d.shared.RetireInviteEvent(ctx, inviteEventID) } func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { @@ -697,7 +700,7 @@ func (d *SyncServerDatasource) addInvitesToResponse( fromPos, toPos types.StreamPosition, res *types.Response, ) error { - invites, err := d.invites.selectInviteEventsInRange( + invites, err := d.shared.Invites.SelectInviteEventsInRange( ctx, txn, userID, fromPos, toPos, ) if err != nil { -- cgit v1.2.3