aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-26 12:56:20 +0000
committerGitHub <noreply@github.com>2021-01-26 12:56:20 +0000
commit9f443317bc578e1897c7eab9b4911f952f39fdbc (patch)
tree1c758596b56fcf9042c688d9f0204d731dbc216e /syncapi
parent64fb6de6d4f0860cc2b7503cfc36eb743552395b (diff)
Graceful shutdowns (#1734)
* Initial graceful stop * Fix dendritejs * Use process context for outbound federation requests in destination queues * Reduce logging * Fix log level
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/clientapi.go4
-rw-r--r--syncapi/consumers/eduserver_receipts.go3
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go3
-rw-r--r--syncapi/consumers/eduserver_typing.go3
-rw-r--r--syncapi/consumers/keychange.go3
-rw-r--r--syncapi/consumers/roomserver.go3
-rw-r--r--syncapi/syncapi.go14
7 files changed, 26 insertions, 7 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 4958f221..8dab513c 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -38,14 +39,15 @@ type OutputClientDataConsumer struct {
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputClientDataConsumer {
-
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index bd538eff..59890807 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -39,6 +40,7 @@ type OutputReceiptEventConsumer struct {
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputReceiptEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -47,6 +49,7 @@ func NewOutputReceiptEventConsumer(
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/eduserver/receipt",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 6e774b5b..668d3078 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -42,6 +43,7 @@ type OutputSendToDeviceEventConsumer struct {
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputSendToDeviceEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -50,6 +52,7 @@ func NewOutputSendToDeviceEventConsumer(
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 3edf6675..7d7ab3bf 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -39,6 +40,7 @@ type OutputTypingEventConsumer struct {
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputTypingEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -48,6 +50,7 @@ func NewOutputTypingEventConsumer(
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index af7b280f..0e1a790d 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -46,6 +47,7 @@ type OutputKeyChangeEventConsumer struct {
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
+ process *process.ProcessContext,
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
@@ -57,6 +59,7 @@ func NewOutputKeyChangeEventConsumer(
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/keychange",
Topic: topic,
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index a8cc5f71..85e73df6 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -43,6 +44,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -53,6 +55,7 @@ func NewOutputRoomEventConsumer(
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 4a09940d..84c7140c 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -39,6 +40,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
// component.
func AddPublicRoutes(
+ process *process.ProcessContext,
router *mux.Router,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
@@ -63,7 +65,7 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
+ process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
@@ -71,7 +73,7 @@ func AddPublicRoutes(
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
@@ -79,28 +81,28 @@ func AddPublicRoutes(
}
clientConsumer := consumers.NewOutputClientDataConsumer(
- cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
- cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
+ process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
- cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
- cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")