aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/presence.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/presence.go')
-rw-r--r--syncapi/consumers/presence.go21
1 files changed, 14 insertions, 7 deletions
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go
index 388c08ff..bfd72d60 100644
--- a/syncapi/consumers/presence.go
+++ b/syncapi/consumers/presence.go
@@ -138,9 +138,12 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
presence := msg.Header.Get("presence")
timestamp := msg.Header.Get("last_active_ts")
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
-
logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
+ if fromSync { // do not process local presence changes; we already did this synchronously.
+ return true
+ }
+
ts, err := strconv.Atoi(timestamp)
if err != nil {
return true
@@ -151,15 +154,19 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
newMsg := msg.Header.Get("status_msg")
statusMsg = &newMsg
}
- // OK is already checked, so no need to do it again
+ // already checked, so no need to check error
p, _ := types.PresenceFromString(presence)
- pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
+
+ s.EmitPresence(ctx, userID, p, statusMsg, ts, fromSync)
+ return true
+}
+
+func (s *PresenceConsumer) EmitPresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, ts int, fromSync bool) {
+ pos, err := s.db.UpdatePresence(ctx, userID, presence, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
if err != nil {
- return true
+ logrus.WithError(err).WithField("user", userID).WithField("presence", presence).Warn("failed to updated presence for user")
+ return
}
-
s.stream.Advance(pos)
s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID)
-
- return true
}