diff options
author | kegsay <kegan@matrix.org> | 2022-05-17 15:53:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-17 15:53:08 +0100 |
commit | b3162755a9053bbb30a83f00928ff0a0852ad32e (patch) | |
tree | 551faedac5f0a060c9cf749cdf45915328f7bfac /syncapi/consumers | |
parent | ac92e047728efc3d50d6dddbe392ca44afd63a38 (diff) |
bugfix: fix race condition when updating presence via /sync (#2470)
* bugfix: fix race condition when updating presence via /sync
Previously when presence is updated via /sync, we would send the presence update
asyncly via NATS. This created a race condition:
- If the presence update is processed quickly, the /sync which triggered the presence
update would see an online presence.
- If the presence update was processed slowly, the /sync which triggered the presence
update would see an offline presence.
This is the root cause behind the flakey sytest: 'User sees their own presence in a sync'.
The fix is to ensure we update the database/advance the stream position synchronously
for local users.
* Bugfix for test
Diffstat (limited to 'syncapi/consumers')
-rw-r--r-- | syncapi/consumers/presence.go | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index 388c08ff..bfd72d60 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -138,9 +138,12 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { presence := msg.Header.Get("presence") timestamp := msg.Header.Get("last_active_ts") fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync")) - logrus.Debugf("syncAPI received presence event: %+v", msg.Header) + if fromSync { // do not process local presence changes; we already did this synchronously. + return true + } + ts, err := strconv.Atoi(timestamp) if err != nil { return true @@ -151,15 +154,19 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { newMsg := msg.Header.Get("status_msg") statusMsg = &newMsg } - // OK is already checked, so no need to do it again + // already checked, so no need to check error p, _ := types.PresenceFromString(presence) - pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync) + + s.EmitPresence(ctx, userID, p, statusMsg, ts, fromSync) + return true +} + +func (s *PresenceConsumer) EmitPresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, ts int, fromSync bool) { + pos, err := s.db.UpdatePresence(ctx, userID, presence, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync) if err != nil { - return true + logrus.WithError(err).WithField("user", userID).WithField("presence", presence).Warn("failed to updated presence for user") + return } - s.stream.Advance(pos) s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID) - - return true } |