aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorJoakim Recht <recht@braindump.dk>2024-01-25 20:10:46 +0100
committerGitHub <noreply@github.com>2024-01-25 20:10:46 +0100
commit00217a69d12aa65865966601504060f3b422dc93 (patch)
treec8489fbb20d7c96ea16dfd4a2d836d5167160d73 /syncapi
parentd58daf96655d8533c40be674fd11b74c38f40999 (diff)
Only fetch events once for all rooms (#3311)
This refactors `PDUStreamProvider` a bit so that it doesn't trigger a database query per room, but instead utilizes the fact that it's possible to bulk query. This improves sync performance significantly when you have 1000s of rooms. ### Pull Request Checklist <!-- Please read https://matrix-org.github.io/dendrite/development/contributing before submitting your pull request --> * [x] I have added Go unit tests or [Complement integration tests](https://github.com/matrix-org/complement) for this PR _or_ I have justified why this PR doesn't need tests * [x] Pull request includes a [sign off below using a legally identifiable name](https://matrix-org.github.io/dendrite/development/contributing#sign-off) _or_ I have already signed off privately Signed-off-by: `Joakim Recht <joakim@beyondwork.ai>`
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/streams/stream_pdu.go94
1 files changed, 70 insertions, 24 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 3abb0b3c..790f5bd1 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -203,6 +203,12 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
+ dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot)
+ if err != nil {
+ req.Log.WithError(err).Error("unable to get recent events")
+ return r.From
+ }
+
newPos = from
for _, delta := range stateDeltas {
newRange := r
@@ -218,7 +224,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
return newPos
@@ -240,6 +246,66 @@ func (p *PDUStreamProvider) IncrementalSync(
return newPos
}
+func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) {
+ var roomIDs []string
+ var newlyJoinedRoomIDs []string
+ for _, delta := range stateDeltas {
+ if delta.NewlyJoined {
+ newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID)
+ } else {
+ roomIDs = append(roomIDs, delta.RoomID)
+ }
+ }
+ dbEvents := make(map[string]types.RecentEvents)
+ if len(roomIDs) > 0 {
+ events, err := snapshot.RecentEvents(
+ ctx, roomIDs, r,
+ &eventFilter, true, true,
+ )
+ if err != nil {
+ if err != sql.ErrNoRows {
+ return nil, err
+ }
+ }
+ for k, v := range events {
+ dbEvents[k] = v
+ }
+ }
+ if len(newlyJoinedRoomIDs) > 0 {
+ // For rooms that were joined in this sync, try to fetch
+ // as much timeline events as allowed by the filter.
+
+ filter := eventFilter
+ // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
+ if eventFilter.Limit < recentEventBackwardsLimit {
+ filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
+ diff := r.From - r.To
+ if diff > 0 && diff < recentEventBackwardsLimit {
+ filter.Limit = int(diff)
+ }
+ }
+
+ events, err := snapshot.RecentEvents(
+ ctx, newlyJoinedRoomIDs, types.Range{
+ From: r.To,
+ To: 0,
+ Backwards: true,
+ },
+ &filter, true, true,
+ )
+ if err != nil {
+ if err != sql.ErrNoRows {
+ return nil, err
+ }
+ }
+ for k, v := range events {
+ dbEvents[k] = v
+ }
+ }
+
+ return dbEvents, nil
+}
+
// Limit the recent events to X when going backwards
const recentEventBackwardsLimit = 100
@@ -253,29 +319,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
eventFilter *synctypes.RoomEventFilter,
stateFilter *synctypes.StateFilter,
req *types.SyncRequest,
+ dbEvents map[string]types.RecentEvents,
) (types.StreamPosition, error) {
var err error
- originalLimit := eventFilter.Limit
- // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
- if r.Backwards && originalLimit < recentEventBackwardsLimit {
- eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
- diff := r.From - r.To
- if diff > 0 && diff < recentEventBackwardsLimit {
- eventFilter.Limit = int(diff)
- }
- }
-
- dbEvents, err := snapshot.RecentEvents(
- ctx, []string{delta.RoomID}, r,
- eventFilter, true, true,
- )
- if err != nil {
- if err == sql.ErrNoRows {
- return r.To, nil
- }
- return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
- }
-
recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited
@@ -337,9 +383,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}
- if r.Backwards && len(events) > originalLimit {
+ if r.Backwards && len(events) > eventFilter.Limit {
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
- events = events[len(events)-originalLimit:]
+ events = events[len(events)-eventFilter.Limit:]
limited = true
}