aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'appservice')
-rw-r--r--appservice/appservice.go2
-rw-r--r--appservice/consumers/roomserver.go3
2 files changed, 4 insertions, 1 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 7a438041..d783c7eb 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -89,7 +89,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, consumer, appserviceDB,
+ base.ProcessContext, base.Cfg, consumer, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 0b251d43..5cbffa35 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/Shopify/sarama"
@@ -41,6 +42,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
+ process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
appserviceDB storage.Database,
@@ -48,6 +50,7 @@ func NewOutputRoomEventConsumer(
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "appservice/roomserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
Consumer: kafkaConsumer,