diff options
Diffstat (limited to 'syncapi/streams/stream_presence.go')
-rw-r--r-- | syncapi/streams/stream_presence.go | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 614b88d4..675a7a17 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -53,7 +53,8 @@ func (p *PresenceStreamProvider) IncrementalSync( req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { - presences, err := p.DB.PresenceAfter(ctx, from) + // We pull out a larger number than the filter asks for, since we're filtering out events later + presences, err := p.DB.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000}) if err != nil { req.Log.WithError(err).Error("p.DB.PresenceAfter failed") return from @@ -72,6 +73,7 @@ func (p *PresenceStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to refresh notifier lists") return from } + NewlyJoinedLoop: for _, roomID := range newlyJoined { roomUsers := p.notifier.JoinedUsers(roomID) for i := range roomUsers { @@ -86,11 +88,14 @@ func (p *PresenceStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to query presence for user") return from } + if len(presences) > req.Filter.Presence.Limit { + break NewlyJoinedLoop + } } } } - lastPos := to + lastPos := from for _, presence := range presences { if presence == nil { continue @@ -135,6 +140,9 @@ func (p *PresenceStreamProvider) IncrementalSync( if presence.StreamPos > lastPos { lastPos = presence.StreamPos } + if len(req.Response.Presence.Events) == req.Filter.Presence.Limit { + break + } p.cache.Store(cacheKey, presence) } |