aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-25 12:38:16 +0000
committerGitHub <noreply@github.com>2022-03-25 12:38:16 +0000
commitb113217a6d4023aa2b0c468eb2be38dd493ad821 (patch)
tree34d7813444aa1441aea7bc7779ae33c7c7764d4c /syncapi
parente6d4bdeed5a05f26677f81c02f7a43c84a4a947e (diff)
Use most recent event in response to get latest stream position in incremental sync (#2302)
* Use latest event position in response for advancing the stream position in an incremental sync * Create some calm * Use To in worst case * Don't waste CPU cycles on an empty response after all * Bug fixes * Fix another bug
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/streams/stream_pdu.go56
1 files changed, 45 insertions, 11 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 1afcbe75..ccdac086 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -147,7 +147,6 @@ func (p *PDUStreamProvider) IncrementalSync(
To: to,
Backwards: from > to,
}
- newPos = to
var err error
var stateDeltas []types.StateDelta
@@ -172,14 +171,26 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Rooms[roomID] = gomatrixserverlib.Join
}
+ if len(stateDeltas) == 0 {
+ return to
+ }
+
+ newPos = from
for _, delta := range stateDeltas {
- if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
+ var pos types.StreamPosition
+ if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
- return newPos
+ return to
+ }
+ switch {
+ case r.Backwards && pos < newPos:
+ fallthrough
+ case !r.Backwards && pos > newPos:
+ newPos = pos
}
}
- return r.To
+ return newPos
}
func (p *PDUStreamProvider) addRoomDeltaToResponse(
@@ -189,7 +200,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
res *types.Response,
-) error {
+) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
@@ -204,19 +215,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
eventFilter, true, true,
)
if err != nil {
- return err
+ return r.From, err
}
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil {
- return err
+ return r.From, err
}
- // XXX: should we ever get this far if we have no recent events or state in this room?
- // in practice we do for peeks, but possibly not joins?
+ // If we didn't return any events at all then don't bother doing anything else.
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
- return nil
+ return r.To, nil
+ }
+
+ // Sort the events so that we can pick out the latest events from both sections.
+ recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
+ delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents)
+
+ // Work out what the highest stream position is for all of the events in this
+ // room that were returned.
+ latestPosition := r.To
+ updateLatestPosition := func(mostRecentEventID string) {
+ if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
+ switch {
+ case r.Backwards && pos > latestPosition:
+ fallthrough
+ case !r.Backwards && pos < latestPosition:
+ latestPosition = pos
+ }
+ }
+ }
+ if len(recentEvents) > 0 {
+ updateLatestPosition(recentEvents[len(recentEvents)-1].EventID())
+ }
+ if len(delta.StateEvents) > 0 {
+ updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
switch delta.Membership {
@@ -250,7 +284,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
res.Rooms.Leave[delta.RoomID] = *lr
}
- return nil
+ return latestPosition, nil
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(