aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_presence.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/streams/stream_presence.go')
-rw-r--r--syncapi/streams/stream_presence.go12
1 files changed, 10 insertions, 2 deletions
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 614b88d4..675a7a17 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -53,7 +53,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
- presences, err := p.DB.PresenceAfter(ctx, from)
+ // We pull out a larger number than the filter asks for, since we're filtering out events later
+ presences, err := p.DB.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
if err != nil {
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
return from
@@ -72,6 +73,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("unable to refresh notifier lists")
return from
}
+ NewlyJoinedLoop:
for _, roomID := range newlyJoined {
roomUsers := p.notifier.JoinedUsers(roomID)
for i := range roomUsers {
@@ -86,11 +88,14 @@ func (p *PresenceStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("unable to query presence for user")
return from
}
+ if len(presences) > req.Filter.Presence.Limit {
+ break NewlyJoinedLoop
+ }
}
}
}
- lastPos := to
+ lastPos := from
for _, presence := range presences {
if presence == nil {
continue
@@ -135,6 +140,9 @@ func (p *PresenceStreamProvider) IncrementalSync(
if presence.StreamPos > lastPos {
lastPos = presence.StreamPos
}
+ if len(req.Response.Presence.Events) == req.Filter.Presence.Limit {
+ break
+ }
p.cache.Store(cacheKey, presence)
}