diff options
Diffstat (limited to 'syncapi/syncapi.go')
-rw-r--r-- | syncapi/syncapi.go | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140c..39bc233a 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -24,7 +24,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -48,7 +48,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -65,15 +65,16 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, + process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + consumer, keyAPI, rsAPI, syncDB, notifier, + streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") } roomConsumer := consumers.NewOutputRoomEventConsumer( - process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, + process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, ) if err = roomConsumer.Start(); err != nil { @@ -81,28 +82,28 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - process, cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider, + process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, + process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - process, cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider, + process, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - process, cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider, + process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") |