aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-15 13:27:13 +0100
committerGitHub <noreply@github.com>2020-10-15 13:27:13 +0100
commit49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch)
treeade4613526d0f6a306cd7117c8f77ab30b151ea0 /federationsender
parent10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff)
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one * Fix roomserver unit tests * Rename to naffkaInstance (@Kegsay review comment) * Fix import cycle
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/federationsender.go9
1 files changed, 6 insertions, 3 deletions
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index 2f122328..78791140 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal/setup"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@@ -55,6 +56,8 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
+ consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
queues := queue.NewOutgoingQueues(
federationSenderDB, cfg.Matrix.ServerName, federation,
rsAPI, stats,
@@ -66,7 +69,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, base.KafkaConsumer, queues,
+ cfg, consumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@@ -74,13 +77,13 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
- cfg, base.KafkaConsumer, queues, federationSenderDB,
+ cfg, consumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
- &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI,
+ &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")