diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-04-06 13:11:19 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-06 13:11:19 +0200 |
commit | e5e3350ce168a192dfc6b6b654276d5cffbdbf0f (patch) | |
tree | 738e3bd364da85767de8c487d3d9a851ab804e8b /syncapi/syncapi.go | |
parent | 16e2d243fc8f3d433a9d7f428e6f782065dc5e89 (diff) |
Add presence module V2 (#2312)
* Syncapi presence
* Clientapi http presence handler
* Why is this here?
* Missing files
* FederationAPI presence implementation
* Add new presence stream
* Pinecone update
* Pinecone update
* Add passing tests
* Make linter happy
* Add presence producer
* Add presence config option
* Set user to unavailable after x minutes
* Only set currently_active if online
Avoid unneeded presence updates when syncing
* Tweaks
* Query devices for last_active_ts
Fixes & tweaks
* Export SharedUsers/SharedUsers
* Presence stream in MemoryStorage
* Remove status_msg_nil
* Fix sytest crashes
* Make presence types const and use stringer for it
* Change options to allow inbound/outbound presence
* Fix option & typo
* Update configs
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/syncapi.go')
-rw-r--r-- | syncapi/syncapi.go | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index b579467a..384121a8 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -57,13 +57,19 @@ func AddPublicRoutes( } eduCache := caching.NewTypingCache() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache) - notifier := notifier.NewNotifier(streams.Latest(context.Background())) + notifier := notifier.NewNotifier() + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier) + notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) + federationPresenceProducer := &producers.FederationAPIPresenceProducer{ + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + JetStream: js, + } + + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, @@ -75,8 +81,6 @@ func AddPublicRoutes( Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), } - _ = userAPIReadUpdateProducer - keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, @@ -131,5 +135,14 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start receipts consumer") } + presenceConsumer := consumers.NewPresenceConsumer( + process, cfg, js, natsClient, syncDB, + notifier, streams.PresenceStreamProvider, + userAPI, + ) + if err = presenceConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start presence consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } |