diff options
Diffstat (limited to 'federationapi/federationapi.go')
-rw-r--r-- | federationapi/federationapi.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 02c4cfdb..0b181606 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -30,7 +30,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -92,7 +92,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, @@ -106,7 +106,7 @@ func NewInternalAPI( ) rsConsumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, cfg, consumer, queues, + base.ProcessContext, cfg, js, queues, federationDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -114,7 +114,7 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - base.ProcessContext, cfg, consumer, queues, federationDB, + base.ProcessContext, cfg, js, queues, federationDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") |