diff options
Diffstat (limited to 'syncapi/syncapi.go')
-rw-r--r-- | syncapi/syncapi.go | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 72462459..cb9890ff 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" @@ -64,6 +65,18 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) + userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ + JetStream: js, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), + } + + userAPIReadUpdateProducer := &producers.UserAPIReadProducer{ + JetStream: js, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + } + + _ = userAPIReadUpdateProducer + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, @@ -75,7 +88,7 @@ func AddPublicRoutes( roomConsumer := consumers.NewOutputRoomEventConsumer( process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, + streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -83,11 +96,19 @@ func AddPublicRoutes( clientConsumer := consumers.NewOutputClientDataConsumer( process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, + userAPIReadUpdateProducer, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } + notificationConsumer := consumers.NewOutputNotificationDataConsumer( + process, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, + ) + if err = notificationConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start notification data consumer") + } + typingConsumer := consumers.NewOutputTypingEventConsumer( process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider, ) @@ -104,6 +125,7 @@ func AddPublicRoutes( receiptConsumer := consumers.NewOutputReceiptEventConsumer( process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, + userAPIReadUpdateProducer, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") |