diff options
author | Joakim Recht <recht@braindump.dk> | 2024-01-25 20:10:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-25 20:10:46 +0100 |
commit | 00217a69d12aa65865966601504060f3b422dc93 (patch) | |
tree | c8489fbb20d7c96ea16dfd4a2d836d5167160d73 /syncapi | |
parent | d58daf96655d8533c40be674fd11b74c38f40999 (diff) |
Only fetch events once for all rooms (#3311)
This refactors `PDUStreamProvider` a bit so that it doesn't trigger a
database query per room, but instead utilizes the fact that it's
possible to bulk query. This improves sync performance significantly
when you have 1000s of rooms.
### Pull Request Checklist
<!-- Please read
https://matrix-org.github.io/dendrite/development/contributing before
submitting your pull request -->
* [x] I have added Go unit tests or [Complement integration
tests](https://github.com/matrix-org/complement) for this PR _or_ I have
justified why this PR doesn't need tests
* [x] Pull request includes a [sign off below using a legally
identifiable
name](https://matrix-org.github.io/dendrite/development/contributing#sign-off)
_or_ I have already signed off privately
Signed-off-by: `Joakim Recht <joakim@beyondwork.ai>`
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/streams/stream_pdu.go | 94 |
1 files changed, 70 insertions, 24 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 3abb0b3c..790f5bd1 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,6 +203,12 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } + dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot) + if err != nil { + req.Log.WithError(err).Error("unable to get recent events") + return r.From + } + newPos = from for _, delta := range stateDeltas { newRange := r @@ -218,7 +224,7 @@ func (p *PDUStreamProvider) IncrementalSync( } } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { return newPos @@ -240,6 +246,66 @@ func (p *PDUStreamProvider) IncrementalSync( return newPos } +func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { + var roomIDs []string + var newlyJoinedRoomIDs []string + for _, delta := range stateDeltas { + if delta.NewlyJoined { + newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID) + } else { + roomIDs = append(roomIDs, delta.RoomID) + } + } + dbEvents := make(map[string]types.RecentEvents) + if len(roomIDs) > 0 { + events, err := snapshot.RecentEvents( + ctx, roomIDs, r, + &eventFilter, true, true, + ) + if err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + for k, v := range events { + dbEvents[k] = v + } + } + if len(newlyJoinedRoomIDs) > 0 { + // For rooms that were joined in this sync, try to fetch + // as much timeline events as allowed by the filter. + + filter := eventFilter + // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest + if eventFilter.Limit < recentEventBackwardsLimit { + filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way + diff := r.From - r.To + if diff > 0 && diff < recentEventBackwardsLimit { + filter.Limit = int(diff) + } + } + + events, err := snapshot.RecentEvents( + ctx, newlyJoinedRoomIDs, types.Range{ + From: r.To, + To: 0, + Backwards: true, + }, + &filter, true, true, + ) + if err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + for k, v := range events { + dbEvents[k] = v + } + } + + return dbEvents, nil +} + // Limit the recent events to X when going backwards const recentEventBackwardsLimit = 100 @@ -253,29 +319,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter *synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, + dbEvents map[string]types.RecentEvents, ) (types.StreamPosition, error) { var err error - originalLimit := eventFilter.Limit - // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest - if r.Backwards && originalLimit < recentEventBackwardsLimit { - eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way - diff := r.From - r.To - if diff > 0 && diff < recentEventBackwardsLimit { - eventFilter.Limit = int(diff) - } - } - - dbEvents, err := snapshot.RecentEvents( - ctx, []string{delta.RoomID}, r, - eventFilter, true, true, - ) - if err != nil { - if err == sql.ErrNoRows { - return r.To, nil - } - return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err) - } - recentStreamEvents := dbEvents[delta.RoomID].Events limited := dbEvents[delta.RoomID].Limited @@ -337,9 +383,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( logrus.WithError(err).Error("unable to apply history visibility filter") } - if r.Backwards && len(events) > originalLimit { + if r.Backwards && len(events) > eventFilter.Limit { // We're going backwards and the events are ordered chronologically, so take the last `limit` events - events = events[len(events)-originalLimit:] + events = events[len(events)-eventFilter.Limit:] limited = true } |