diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-10-15 13:27:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-15 13:27:13 +0100 |
commit | 49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch) | |
tree | ade4613526d0f6a306cd7117c8f77ab30b151ea0 /appservice | |
parent | 10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff) |
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one
* Fix roomserver unit tests
* Rename to naffkaInstance (@Kegsay review comment)
* Fix import cycle
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/appservice.go | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go index e356f68e..cf9a47b7 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -47,6 +48,8 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { + consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) if err != nil { @@ -86,7 +89,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, appserviceDB, + base.Cfg, consumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { |