diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-01-26 12:56:20 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-26 12:56:20 +0000 |
commit | 9f443317bc578e1897c7eab9b4911f952f39fdbc (patch) | |
tree | 1c758596b56fcf9042c688d9f0204d731dbc216e /appservice | |
parent | 64fb6de6d4f0860cc2b7503cfc36eb743552395b (diff) |
Graceful shutdowns (#1734)
* Initial graceful stop
* Fix dendritejs
* Use process context for outbound federation requests in destination queues
* Reduce logging
* Fix log level
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/appservice.go | 2 | ||||
-rw-r--r-- | appservice/consumers/roomserver.go | 3 |
2 files changed, 4 insertions, 1 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go index 7a438041..d783c7eb 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -89,7 +89,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, consumer, appserviceDB, + base.ProcessContext, base.Cfg, consumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 0b251d43..5cbffa35 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" "github.com/Shopify/sarama" @@ -41,6 +42,7 @@ type OutputRoomEventConsumer struct { // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call // Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( + process *process.ProcessContext, cfg *config.Dendrite, kafkaConsumer sarama.Consumer, appserviceDB storage.Database, @@ -48,6 +50,7 @@ func NewOutputRoomEventConsumer( workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ + Process: process, ComponentName: "appservice/roomserver", Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), Consumer: kafkaConsumer, |