From 17e046f18fc182731c112e1dd653cc7021ebd422 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 1 May 2020 12:41:38 +0100 Subject: Fix prev_batch tokens (#999) --- syncapi/storage/postgres/syncserver.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) (limited to 'syncapi/storage/postgres/syncserver.go') diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 744ae7b8..a6de1517 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -320,12 +320,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( func (d *SyncServerDatasource) EventPositionInTopology( ctx context.Context, eventID string, ) (depth types.StreamPosition, stream types.StreamPosition, err error) { - depth, err = d.topology.selectPositionInTopology(ctx, eventID) - if err != nil { - return - } - stream, err = d.events.selectStreamPositionForEventID(ctx, eventID) - return + return d.topology.selectPositionInTopology(ctx, eventID) } func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { @@ -591,8 +586,8 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. - var backwardTopologyPos types.StreamPosition - backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) } else { @@ -605,7 +600,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos, 0, + types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos, ).String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = true @@ -720,9 +715,9 @@ func (d *SyncServerDatasource) addInvitesToResponse( func (d *SyncServerDatasource) getBackwardTopologyPos( ctx context.Context, events []types.StreamEvent, -) (pos types.StreamPosition) { +) (pos, spos types.StreamPosition) { if len(events) > 0 { - pos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID()) + pos, spos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID()) } if pos-1 <= 0 { pos = types.StreamPosition(1) @@ -761,14 +756,14 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( } recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - backwardTopologyPos := d.getBackwardTopologyPos(ctx, recentStreamEvents) + backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents) switch delta.membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos, 0, + types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos, ).String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true @@ -781,7 +776,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos, 0, + types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos, ).String() lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true -- cgit v1.2.3