aboutsummaryrefslogtreecommitdiff
path: root/appservice/appservice.go
diff options
context:
space:
mode:
Diffstat (limited to 'appservice/appservice.go')
-rw-r--r--appservice/appservice.go6
1 files changed, 3 insertions, 3 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 5f16c10b..924a609e 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -32,7 +32,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
)
@@ -58,7 +58,7 @@ func NewInternalAPI(
},
},
}
- consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
+ js, _, _ := jetstream.Prepare(&base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
@@ -97,7 +97,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, base.Cfg, consumer, appserviceDB,
+ base.ProcessContext, base.Cfg, js, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {