aboutsummaryrefslogtreecommitdiff
path: root/federationapi/federationapi.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/federationapi.go')
-rw-r--r--federationapi/federationapi.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 02c4cfdb..0b181606 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -30,7 +30,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
@@ -92,7 +92,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
- consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+ js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,
@@ -106,7 +106,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, cfg, consumer, queues,
+ base.ProcessContext, cfg, js, queues,
federationDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@@ -114,7 +114,7 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
- base.ProcessContext, cfg, consumer, queues, federationDB,
+ base.ProcessContext, cfg, js, queues, federationDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")