aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-15 13:27:13 +0100
committerGitHub <noreply@github.com>2020-10-15 13:27:13 +0100
commit49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch)
treeade4613526d0f6a306cd7117c8f77ab30b151ea0 /appservice
parent10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff)
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one * Fix roomserver unit tests * Rename to naffkaInstance (@Kegsay review comment) * Fix import cycle
Diffstat (limited to 'appservice')
-rw-r--r--appservice/appservice.go5
1 files changed, 4 insertions, 1 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index e356f68e..cf9a47b7 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
@@ -47,6 +48,8 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceQueryAPI {
+ consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
+
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
if err != nil {
@@ -86,7 +89,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, base.KafkaConsumer, appserviceDB,
+ base.Cfg, consumer, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {