aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2022-05-17 15:53:08 +0100
committerGitHub <noreply@github.com>2022-05-17 15:53:08 +0100
commitb3162755a9053bbb30a83f00928ff0a0852ad32e (patch)
tree551faedac5f0a060c9cf749cdf45915328f7bfac /syncapi/consumers
parentac92e047728efc3d50d6dddbe392ca44afd63a38 (diff)
bugfix: fix race condition when updating presence via /sync (#2470)
* bugfix: fix race condition when updating presence via /sync Previously when presence is updated via /sync, we would send the presence update asyncly via NATS. This created a race condition: - If the presence update is processed quickly, the /sync which triggered the presence update would see an online presence. - If the presence update was processed slowly, the /sync which triggered the presence update would see an offline presence. This is the root cause behind the flakey sytest: 'User sees their own presence in a sync'. The fix is to ensure we update the database/advance the stream position synchronously for local users. * Bugfix for test
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/presence.go21
1 files changed, 14 insertions, 7 deletions
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go
index 388c08ff..bfd72d60 100644
--- a/syncapi/consumers/presence.go
+++ b/syncapi/consumers/presence.go
@@ -138,9 +138,12 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
presence := msg.Header.Get("presence")
timestamp := msg.Header.Get("last_active_ts")
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
-
logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
+ if fromSync { // do not process local presence changes; we already did this synchronously.
+ return true
+ }
+
ts, err := strconv.Atoi(timestamp)
if err != nil {
return true
@@ -151,15 +154,19 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
newMsg := msg.Header.Get("status_msg")
statusMsg = &newMsg
}
- // OK is already checked, so no need to do it again
+ // already checked, so no need to check error
p, _ := types.PresenceFromString(presence)
- pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
+
+ s.EmitPresence(ctx, userID, p, statusMsg, ts, fromSync)
+ return true
+}
+
+func (s *PresenceConsumer) EmitPresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, ts int, fromSync bool) {
+ pos, err := s.db.UpdatePresence(ctx, userID, presence, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
if err != nil {
- return true
+ logrus.WithError(err).WithField("user", userID).WithField("presence", presence).Warn("failed to updated presence for user")
+ return
}
-
s.stream.Advance(pos)
s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID)
-
- return true
}