aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_presence.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/streams/stream_presence.go')
-rw-r--r--syncapi/streams/stream_presence.go80
1 files changed, 50 insertions, 30 deletions
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 030b7c5d..445e46b3 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -17,6 +17,7 @@ package streams
import (
"context"
"encoding/json"
+ "fmt"
"sync"
"github.com/matrix-org/gomatrixserverlib"
@@ -70,39 +71,25 @@ func (p *PresenceStreamProvider) IncrementalSync(
return from
}
- if len(presences) == 0 {
+ getPresenceForUsers, err := p.getNeededUsersFromRequest(ctx, req, presences)
+ if err != nil {
+ req.Log.WithError(err).Error("getNeededUsersFromRequest failed")
+ return from
+ }
+
+ // Got no presence between range and no presence to get from the database
+ if len(getPresenceForUsers) == 0 && len(presences) == 0 {
return to
}
- // add newly joined rooms user presences
- newlyJoined := joinedRooms(req.Response, req.Device.UserID)
- if len(newlyJoined) > 0 {
- // TODO: Check if this is working better than before.
- if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
- req.Log.WithError(err).Error("unable to refresh notifier lists")
- return from
- }
- NewlyJoinedLoop:
- for _, roomID := range newlyJoined {
- roomUsers := p.notifier.JoinedUsers(roomID)
- for i := range roomUsers {
- // we already got a presence from this user
- if _, ok := presences[roomUsers[i]]; ok {
- continue
- }
- // Bear in mind that this might return nil, but at least populating
- // a nil means that there's a map entry so we won't repeat this call.
- presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i])
- if err != nil {
- req.Log.WithError(err).Error("unable to query presence for user")
- _ = snapshot.Rollback()
- return from
- }
- if len(presences) > req.Filter.Presence.Limit {
- break NewlyJoinedLoop
- }
- }
- }
+ dbPresences, err := snapshot.GetPresences(ctx, getPresenceForUsers)
+ if err != nil {
+ req.Log.WithError(err).Error("unable to query presence for user")
+ _ = snapshot.Rollback()
+ return from
+ }
+ for _, presence := range dbPresences {
+ presences[presence.UserID] = presence
}
lastPos := from
@@ -164,6 +151,39 @@ func (p *PresenceStreamProvider) IncrementalSync(
return lastPos
}
+func (p *PresenceStreamProvider) getNeededUsersFromRequest(ctx context.Context, req *types.SyncRequest, presences map[string]*types.PresenceInternal) ([]string, error) {
+ getPresenceForUsers := []string{}
+ // Add presence for users which newly joined a room
+ for userID := range req.MembershipChanges {
+ if _, ok := presences[userID]; ok {
+ continue
+ }
+ getPresenceForUsers = append(getPresenceForUsers, userID)
+ }
+
+ // add newly joined rooms user presences
+ newlyJoined := joinedRooms(req.Response, req.Device.UserID)
+ if len(newlyJoined) == 0 {
+ return getPresenceForUsers, nil
+ }
+
+ // TODO: Check if this is working better than before.
+ if err := p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
+ return getPresenceForUsers, fmt.Errorf("unable to refresh notifier lists: %w", err)
+ }
+ for _, roomID := range newlyJoined {
+ roomUsers := p.notifier.JoinedUsers(roomID)
+ for i := range roomUsers {
+ // we already got a presence from this user
+ if _, ok := presences[roomUsers[i]]; ok {
+ continue
+ }
+ getPresenceForUsers = append(getPresenceForUsers, roomUsers[i])
+ }
+ }
+ return getPresenceForUsers, nil
+}
+
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {