diff options
Diffstat (limited to 'syncapi/sync')
-rw-r--r-- | syncapi/sync/requestpool.go | 15 | ||||
-rw-r--r-- | syncapi/sync/requestpool_test.go | 8 |
2 files changed, 22 insertions, 1 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index fdf46cdd..ad151f70 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -53,19 +53,24 @@ type RequestPool struct { streams *streams.Streams Notifier *notifier.Notifier producer PresencePublisher + consumer PresenceConsumer } type PresencePublisher interface { SendPresence(userID string, presence types.Presence, statusMsg *string) error } +type PresenceConsumer interface { + EmitPresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, ts int, fromSync bool) +} + // NewRequestPool makes a new RequestPool func NewRequestPool( db storage.Database, cfg *config.SyncAPI, userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, streams *streams.Streams, notifier *notifier.Notifier, - producer PresencePublisher, enableMetrics bool, + producer PresencePublisher, consumer PresenceConsumer, enableMetrics bool, ) *RequestPool { if enableMetrics { prometheus.MustRegister( @@ -83,6 +88,7 @@ func NewRequestPool( streams: streams, Notifier: notifier, producer: producer, + consumer: consumer, } go rp.cleanLastSeen() go rp.cleanPresence(db, time.Minute*5) @@ -160,6 +166,13 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user logrus.WithError(err).Error("Unable to publish presence message from sync") return } + + // now synchronously update our view of the world. It's critical we do this before calculating + // the /sync response else we may not return presence: online immediately. + rp.consumer.EmitPresence( + context.Background(), userID, presenceID, newPresence.ClientFields.StatusMsg, + int(gomatrixserverlib.AsTimestamp(time.Now())), true, + ) } func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) { diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go index 5e52bc7c..0c720952 100644 --- a/syncapi/sync/requestpool_test.go +++ b/syncapi/sync/requestpool_test.go @@ -38,6 +38,12 @@ func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.Stream return 0, nil } +type dummyConsumer struct{} + +func (d dummyConsumer) EmitPresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, ts int, fromSync bool) { + +} + func TestRequestPool_updatePresence(t *testing.T) { type args struct { presence string @@ -45,6 +51,7 @@ func TestRequestPool_updatePresence(t *testing.T) { sleep time.Duration } publisher := &dummyPublisher{} + consumer := &dummyConsumer{} syncMap := sync.Map{} tests := []struct { @@ -101,6 +108,7 @@ func TestRequestPool_updatePresence(t *testing.T) { rp := &RequestPool{ presence: &syncMap, producer: publisher, + consumer: consumer, cfg: &config.SyncAPI{ Matrix: &config.Global{ JetStream: config.JetStream{ |