diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-10-15 13:27:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-15 13:27:13 +0100 |
commit | 49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch) | |
tree | ade4613526d0f6a306cd7117c8f77ab30b151ea0 /federationsender | |
parent | 10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff) |
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one
* Fix roomserver unit tests
* Rename to naffkaInstance (@Kegsay review comment)
* Fix import cycle
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/federationsender.go | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 2f122328..78791140 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -55,6 +56,8 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + queues := queue.NewOutgoingQueues( federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats, @@ -66,7 +69,7 @@ func NewInternalAPI( ) rsConsumer := consumers.NewOutputRoomEventConsumer( - cfg, base.KafkaConsumer, queues, + cfg, consumer, queues, federationSenderDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -74,13 +77,13 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - cfg, base.KafkaConsumer, queues, federationSenderDB, + cfg, consumer, queues, federationSenderDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI, + &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") |