diff options
author | Kegsay <kegan@matrix.org> | 2020-05-14 09:53:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-14 09:53:55 +0100 |
commit | 9ed68a3125d9024f52bf89810abf3b203f4b25b7 (patch) | |
tree | f30a532965d841de3d8faa370d3bf5cc8beb5ba1 /syncapi/storage/postgres/syncserver.go | |
parent | a25d477cdb8f1ba49b3b5e9d931f808ae45b4853 (diff) |
Factor out account data and events table (#1031)
* Factor out account data
* Factor out events table and EDU cache
* linting
* fix npe
Diffstat (limited to 'syncapi/storage/postgres/syncserver.go')
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 180 |
1 files changed, 26 insertions, 154 deletions
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 9883c362..4fa08ce5 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -20,9 +20,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" - - "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -50,15 +47,12 @@ type stateDelta struct { // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { + shared.Database db *sql.DB common.PartitionOffsetStatements - accountData accountDataStatements - events outputRoomEventsStatements roomstate currentRoomStateStatements - eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities tables.BackwardsExtremities - shared *shared.Database } // NewSyncServerDatasource creates a new sync server database @@ -71,10 +65,12 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { return nil, err } - if err = d.accountData.prepare(d.db); err != nil { + accountData, err := NewPostgresAccountDataTable(d.db) + if err != nil { return nil, err } - if err = d.events.prepare(d.db); err != nil { + events, err := NewPostgresEventsTable(d.db) + if err != nil { return nil, err } if err = d.roomstate.prepare(d.db); err != nil { @@ -91,10 +87,12 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp if err != nil { return nil, err } - d.eduCache = cache.New() - d.shared = &shared.Database{ - DB: d.db, - Invites: invites, + d.Database = shared.Database{ + DB: d.db, + Invites: invites, + AccountData: accountData, + OutputEvents: events, + EDUCache: cache.New(), } return &d, nil } @@ -103,17 +101,6 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s return d.roomstate.selectJoinedUsers(ctx) } -func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { - streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) - if err != nil { - return nil, err - } - - // We don't include a device here as we only include transaction IDs in - // incremental syncs. - return d.StreamEventsToEvents(nil, streamEvents), nil -} - // handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. @@ -124,7 +111,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) + prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) if err != nil { return err } @@ -157,7 +144,7 @@ func (d *SyncServerDatasource) WriteEvent( ) (pduPosition types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent( + pos, err := d.Database.OutputEvents.InsertEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { @@ -265,36 +252,10 @@ func (d *SyncServerDatasource) GetEventsInTopologicalRange( } // Retrieve the events' contents using their IDs. - events, err = d.events.selectEvents(ctx, nil, eIDs) + events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) return } -// GetEventsInStreamingRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *SyncServerDatasource) GetEventsInStreamingRange( - ctx context.Context, - from, to *types.StreamingToken, - roomID string, limit int, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - if backwardOrdering { - // When using backward ordering, we want the most recent events first. - if events, err = d.events.selectRecentEvents( - ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, - ); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = d.events.selectEarlyEvents( - ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, - ); err != nil { - return - } - } - return events, err -} - func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) { return d.syncPositionTx(ctx, nil) } @@ -319,7 +280,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( return nil, err } - return d.events.selectEvents(ctx, nil, eIDs) + return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) } func (d *SyncServerDatasource) EventPositionInTopology( @@ -328,57 +289,29 @@ func (d *SyncServerDatasource) EventPositionInTopology( return d.topology.selectPositionInTopology(ctx, eventID) } -func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { - return d.syncStreamPositionTx(ctx, nil) -} - -func (d *SyncServerDatasource) syncStreamPositionTx( - ctx context.Context, txn *sql.Tx, -) (types.StreamPosition, error) { - maxID, err := d.events.selectMaxEventID(ctx, txn) - if err != nil { - return 0, err - } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) - if err != nil { - return 0, err - } - if maxAccountDataID > maxID { - maxID = maxAccountDataID - } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) - if err != nil { - return 0, err - } - if maxInviteID > maxID { - maxID = maxInviteID - } - return types.StreamPosition(maxID), nil -} - func (d *SyncServerDatasource) syncPositionTx( ctx context.Context, txn *sql.Tx, ) (sp types.StreamingToken, err error) { - maxEventID, err := d.events.selectMaxEventID(ctx, txn) + maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return sp, err } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) if err != nil { return sp, err } if maxAccountDataID > maxEventID { maxEventID = maxAccountDataID } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) + maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return sp, err } if maxInviteID > maxEventID { maxEventID = maxInviteID } - sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.eduCache.GetLatestSyncPosition())) + sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition())) return } @@ -451,7 +384,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUPosition()), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -580,7 +513,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.events.selectRecentEvents( + recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), numRecentEventsPerRoom, true, true, ) @@ -653,54 +586,13 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -func (d *SyncServerDatasource) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, - accountDataFilterPart *gomatrixserverlib.EventFilter, -) (map[string][]string, error) { - return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) -} - -func (d *SyncServerDatasource) UpsertAccountData( - ctx context.Context, userID, roomID, dataType string, -) (types.StreamPosition, error) { - return d.accountData.insertAccountData(ctx, userID, roomID, dataType) -} - -func (d *SyncServerDatasource) AddInviteEvent( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, -) (sp types.StreamPosition, err error) { - return d.shared.AddInviteEvent(ctx, inviteEvent) -} - -func (d *SyncServerDatasource) RetireInviteEvent( - ctx context.Context, inviteEventID string, -) error { - return d.shared.RetireInviteEvent(ctx, inviteEventID) -} - -func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.eduCache.SetTimeoutCallback(fn) -} - -func (d *SyncServerDatasource) AddTypingUser( - userID, roomID string, expireTime *time.Time, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) -} - -func (d *SyncServerDatasource) RemoveTypingUser( - userID, roomID string, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) -} - func (d *SyncServerDatasource) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, res *types.Response, ) error { - invites, err := d.shared.Invites.SelectInviteEventsInRange( + invites, err := d.Database.Invites.SelectInviteEventsInRange( ctx, txn, userID, fromPos, toPos, ) if err != nil { @@ -750,7 +642,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, err := d.events.selectRecentEvents( + recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), numRecentEventsPerRoom, true, true, ) @@ -841,7 +733,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( ) ([]types.StreamEvent, error) { // Fetch from the events table first so we pick up the stream ID for the // event. - events, err := d.events.selectEvents(ctx, txn, eventIDs) + events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) if err != nil { return nil, err } @@ -895,7 +787,7 @@ func (d *SyncServerDatasource) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) if err != nil { return nil, nil, err } @@ -981,7 +873,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) if err != nil { return nil, nil, err } @@ -1025,26 +917,6 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom( return s, nil } -func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent { - out := make([]gomatrixserverlib.HeaderedEvent, len(in)) - for i := 0; i < len(in); i++ { - out[i] = in[i].HeaderedEvent - if device != nil && in[i].TransactionID != nil { - if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID { - err := out[i].SetUnsignedField( - "transaction_id", in[i].TransactionID.TransactionID, - ) - if err != nil { - logrus.WithFields(logrus.Fields{ - "event_id": out[i].EventID(), - }).WithError(err).Warnf("Failed to add transaction ID to event") - } - } - } - } - return out -} - // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. |