diff options
author | Kegsay <kegan@matrix.org> | 2020-05-13 12:14:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-13 12:14:50 +0100 |
commit | 5e9dce1c0c66736937eeddd5c33c92700d9a65a7 (patch) | |
tree | e0c40667cb17714d160fba2fbc64470eedf9ca7b /syncapi/storage/postgres | |
parent | 31e6a7f1932c11d9b5b682ad06a5b8db9d74a44f (diff) |
syncapi: Rename and split out tokens (#1025)
* syncapi: Rename and split out tokens
Previously we used the badly named `PaginationToken` which was
used for both `/sync` and `/messages` requests. This quickly
became confusing because named fields like `PDUPosition` meant
different things depending on the token type. Instead, we now have
two token types: `TopologyToken` and `StreamingToken`, both of
which have fields which make more sense for their specific situations.
Updated the codebase to use one or the other. `PaginationToken` still
lives on as `syncToken`, an unexported type which both tokens rely on.
This allows us to guarantee that the specific mappings of positions
to a string remain solely under the control of the `types` package.
This enables us to move high-level conceptual things like
"decrement this topological token" to function calls e.g
`TopologicalToken.Decrement()`.
Currently broken because `/messages` seemingly used both stream and
topological tokens, though I need to confirm this.
* final tweaks/hacks
* spurious logging
* Review comments and linting
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 127 |
1 files changed, 61 insertions, 66 deletions
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 1845ac38..d45bc09e 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -228,69 +228,68 @@ func (d *SyncServerDatasource) GetStateEventsForRoom( return } -func (d *SyncServerDatasource) GetEventsInRange( +func (d *SyncServerDatasource) GetEventsInTopologicalRange( ctx context.Context, - from, to *types.PaginationToken, + from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool, ) (events []types.StreamEvent, err error) { - // If the pagination token's type is types.PaginationTokenTypeTopology, the - // events must be retrieved from the rooms' topology table rather than the - // table contaning the syncapi server's whole stream of events. - if from.Type == types.PaginationTokenTypeTopology { - // Determine the backward and forward limit, i.e. the upper and lower - // limits to the selection in the room's topology, from the direction. - var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition - if backwardOrdering { - // Backward ordering is antichronological (latest event to oldest - // one). - backwardLimit = to.PDUPosition - forwardLimit = from.PDUPosition - forwardMicroLimit = from.EDUTypingPosition - } else { - // Forward ordering is chronological (oldest event to latest one). - backwardLimit = from.PDUPosition - forwardLimit = to.PDUPosition - } - - // Select the event IDs from the defined range. - var eIDs []string - eIDs, err = d.topology.selectEventIDsInRange( - ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, - ) - if err != nil { - return - } + // Determine the backward and forward limit, i.e. the upper and lower + // limits to the selection in the room's topology, from the direction. + var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition + if backwardOrdering { + // Backward ordering is antichronological (latest event to oldest + // one). + backwardLimit = to.Depth() + forwardLimit = from.Depth() + forwardMicroLimit = from.PDUPosition() + } else { + // Forward ordering is chronological (oldest event to latest one). + backwardLimit = from.Depth() + forwardLimit = to.Depth() + } - // Retrieve the events' contents using their IDs. - events, err = d.events.selectEvents(ctx, nil, eIDs) + // Select the event IDs from the defined range. + var eIDs []string + eIDs, err = d.topology.selectEventIDsInRange( + ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, + ) + if err != nil { return } - // If the pagination token's type is types.PaginationTokenTypeStream, the - // events must be retrieved from the table contaning the syncapi server's - // whole stream of events. + // Retrieve the events' contents using their IDs. + events, err = d.events.selectEvents(ctx, nil, eIDs) + return +} +// GetEventsInStreamingRange retrieves all of the events on a given ordering using the +// given extremities and limit. +func (d *SyncServerDatasource) GetEventsInStreamingRange( + ctx context.Context, + from, to *types.StreamingToken, + roomID string, limit int, + backwardOrdering bool, +) (events []types.StreamEvent, err error) { if backwardOrdering { // When using backward ordering, we want the most recent events first. if events, err = d.events.selectRecentEvents( - ctx, nil, roomID, to.PDUPosition, from.PDUPosition, limit, false, false, + ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, ); err != nil { return } } else { // When using forward ordering, we want the least recent events first. if events, err = d.events.selectEarlyEvents( - ctx, nil, roomID, from.PDUPosition, to.PDUPosition, limit, + ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, ); err != nil { return } } - - return + return events, err } -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.PaginationToken, error) { +func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) { return d.syncPositionTx(ctx, nil) } @@ -353,7 +352,7 @@ func (d *SyncServerDatasource) syncStreamPositionTx( func (d *SyncServerDatasource) syncPositionTx( ctx context.Context, txn *sql.Tx, -) (sp types.PaginationToken, err error) { +) (sp types.StreamingToken, err error) { maxEventID, err := d.events.selectMaxEventID(ctx, txn) if err != nil { @@ -373,8 +372,7 @@ func (d *SyncServerDatasource) syncPositionTx( if maxInviteID > maxEventID { maxEventID = maxInviteID } - sp.PDUPosition = types.StreamPosition(maxEventID) - sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition()) + sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.eduCache.GetLatestSyncPosition())) return } @@ -439,7 +437,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // addTypingDeltaToResponse adds all typing notifications to a sync response // since the specified position. func (d *SyncServerDatasource) addTypingDeltaToResponse( - since types.PaginationToken, + since types.StreamingToken, joinedRoomIDs []string, res *types.Response, ) error { @@ -448,7 +446,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var err error for _, roomID := range joinedRoomIDs { if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(since.EDUTypingPosition), + roomID, int64(since.EDUPosition()), ); updated { ev := gomatrixserverlib.ClientEvent{ Type: gomatrixserverlib.MTyping, @@ -473,12 +471,12 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( // addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if // the positions of that type are not equal in fromPos and toPos. func (d *SyncServerDatasource) addEDUDeltaToResponse( - fromPos, toPos types.PaginationToken, + fromPos, toPos types.StreamingToken, joinedRoomIDs []string, res *types.Response, ) (err error) { - if fromPos.EDUTypingPosition != toPos.EDUTypingPosition { + if fromPos.EDUPosition() != toPos.EDUPosition() { err = d.addTypingDeltaToResponse( fromPos, joinedRoomIDs, res, ) @@ -490,7 +488,7 @@ func (d *SyncServerDatasource) addEDUDeltaToResponse( func (d *SyncServerDatasource) IncrementalSync( ctx context.Context, device authtypes.Device, - fromPos, toPos types.PaginationToken, + fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool, ) (*types.Response, error) { @@ -499,9 +497,9 @@ func (d *SyncServerDatasource) IncrementalSync( var joinedRoomIDs []string var err error - if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { + if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res, + ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( @@ -530,7 +528,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( numRecentEventsPerRoom int, ) ( res *types.Response, - toPos types.PaginationToken, + toPos types.StreamingToken, joinedRoomIDs []string, err error, ) { @@ -577,7 +575,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent recentStreamEvents, err = d.events.selectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition, + ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), numRecentEventsPerRoom, true, true, ) if err != nil { @@ -588,27 +586,25 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // oldest event in the room's topology. var backwardTopologyPos, backwardStreamPos types.StreamPosition backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) - if backwardTopologyPos-1 <= 0 { - backwardTopologyPos = types.StreamPosition(1) - } else { - backwardTopologyPos-- + if err != nil { + return } + prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch.Decrement() // We don't include a device here as we don't need to send down // transaction IDs for complete syncs recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos, - ).String() + jr.Timeline.PrevBatch = prevBatch.String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = true jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { return } @@ -628,7 +624,7 @@ func (d *SyncServerDatasource) CompleteSync( // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( - types.PaginationToken{}, toPos, joinedRoomIDs, res, + types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, ) if err != nil { return nil, err @@ -757,14 +753,15 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents) + prevBatch := types.NewTopologyToken( + backwardTopologyPos, backwardStreamPos, + ) switch delta.membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos, - ).String() + jr.Timeline.PrevBatch = prevBatch.String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -775,9 +772,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos, - ).String() + lr.Timeline.PrevBatch = prevBatch.String() lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) |