aboutsummaryrefslogtreecommitdiff
path: root/syncapi/syncapi.go
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-04-06 13:11:19 +0200
committerGitHub <noreply@github.com>2022-04-06 13:11:19 +0200
commite5e3350ce168a192dfc6b6b654276d5cffbdbf0f (patch)
tree738e3bd364da85767de8c487d3d9a851ab804e8b /syncapi/syncapi.go
parent16e2d243fc8f3d433a9d7f428e6f782065dc5e89 (diff)
Add presence module V2 (#2312)
* Syncapi presence * Clientapi http presence handler * Why is this here? * Missing files * FederationAPI presence implementation * Add new presence stream * Pinecone update * Pinecone update * Add passing tests * Make linter happy * Add presence producer * Add presence config option * Set user to unavailable after x minutes * Only set currently_active if online Avoid unneeded presence updates when syncing * Tweaks * Query devices for last_active_ts Fixes & tweaks * Export SharedUsers/SharedUsers * Presence stream in MemoryStorage * Remove status_msg_nil * Fix sytest crashes * Make presence types const and use stringer for it * Change options to allow inbound/outbound presence * Fix option & typo * Update configs Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/syncapi.go')
-rw-r--r--syncapi/syncapi.go25
1 files changed, 19 insertions, 6 deletions
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index b579467a..384121a8 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -49,7 +49,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
- js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
+ js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@@ -57,13 +57,19 @@ func AddPublicRoutes(
}
eduCache := caching.NewTypingCache()
- streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
- notifier := notifier.NewNotifier(streams.Latest(context.Background()))
+ notifier := notifier.NewNotifier()
+ streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
+ notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
- requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
+ federationPresenceProducer := &producers.FederationAPIPresenceProducer{
+ Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ JetStream: js,
+ }
+
+ requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
@@ -75,8 +81,6 @@ func AddPublicRoutes(
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
}
- _ = userAPIReadUpdateProducer
-
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,
@@ -131,5 +135,14 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
+ presenceConsumer := consumers.NewPresenceConsumer(
+ process, cfg, js, natsClient, syncDB,
+ notifier, streams.PresenceStreamProvider,
+ userAPI,
+ )
+ if err = presenceConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start presence consumer")
+ }
+
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}