diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-03-25 12:38:16 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-25 12:38:16 +0000 |
commit | b113217a6d4023aa2b0c468eb2be38dd493ad821 (patch) | |
tree | 34d7813444aa1441aea7bc7779ae33c7c7764d4c /syncapi | |
parent | e6d4bdeed5a05f26677f81c02f7a43c84a4a947e (diff) |
Use most recent event in response to get latest stream position in incremental sync (#2302)
* Use latest event position in response for advancing the stream position in an incremental sync
* Create some calm
* Use To in worst case
* Don't waste CPU cycles on an empty response after all
* Bug fixes
* Fix another bug
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/streams/stream_pdu.go | 56 |
1 files changed, 45 insertions, 11 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1afcbe75..ccdac086 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -147,7 +147,6 @@ func (p *PDUStreamProvider) IncrementalSync( To: to, Backwards: from > to, } - newPos = to var err error var stateDeltas []types.StateDelta @@ -172,14 +171,26 @@ func (p *PDUStreamProvider) IncrementalSync( req.Rooms[roomID] = gomatrixserverlib.Join } + if len(stateDeltas) == 0 { + return to + } + + newPos = from for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + var pos types.StreamPosition + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return newPos + return to + } + switch { + case r.Backwards && pos < newPos: + fallthrough + case !r.Backwards && pos > newPos: + newPos = pos } } - return r.To + return newPos } func (p *PDUStreamProvider) addRoomDeltaToResponse( @@ -189,7 +200,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, res *types.Response, -) error { +) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: @@ -204,19 +215,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter, true, true, ) if err != nil { - return err + return r.From, err } recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) if err != nil { - return err + return r.From, err } - // XXX: should we ever get this far if we have no recent events or state in this room? - // in practice we do for peeks, but possibly not joins? + // If we didn't return any events at all then don't bother doing anything else. if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { - return nil + return r.To, nil + } + + // Sort the events so that we can pick out the latest events from both sections. + recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents) + delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents) + + // Work out what the highest stream position is for all of the events in this + // room that were returned. + latestPosition := r.To + updateLatestPosition := func(mostRecentEventID string) { + if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + switch { + case r.Backwards && pos > latestPosition: + fallthrough + case !r.Backwards && pos < latestPosition: + latestPosition = pos + } + } + } + if len(recentEvents) > 0 { + updateLatestPosition(recentEvents[len(recentEvents)-1].EventID()) + } + if len(delta.StateEvents) > 0 { + updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } switch delta.Membership { @@ -250,7 +284,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( res.Rooms.Leave[delta.RoomID] = *lr } - return nil + return latestPosition, nil } func (p *PDUStreamProvider) getJoinResponseForCompleteSync( |