aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/appservice.go2
-rw-r--r--appservice/consumers/roomserver.go3
-rw-r--r--build/gobind/monolith.go1
-rw-r--r--cmd/dendrite-demo-libp2p/main.go3
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go4
-rw-r--r--cmd/dendrite-monolith-server/main.go3
-rw-r--r--cmd/dendrite-polylith-multi/main.go3
-rw-r--r--cmd/dendrite-polylith-multi/personalities/syncapi.go1
-rw-r--r--cmd/dendritejs/main.go1
-rw-r--r--federationsender/consumers/eduserver.go5
-rw-r--r--federationsender/consumers/keychange.go3
-rw-r--r--federationsender/consumers/roomserver.go3
-rw-r--r--federationsender/federationsender.go9
-rw-r--r--federationsender/queue/destinationqueue.go4
-rw-r--r--federationsender/queue/queue.go5
-rw-r--r--internal/consumers.go14
-rw-r--r--setup/base.go64
-rw-r--r--setup/monolith.go5
-rw-r--r--setup/process/process.go45
-rw-r--r--syncapi/consumers/clientapi.go4
-rw-r--r--syncapi/consumers/eduserver_receipts.go3
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go3
-rw-r--r--syncapi/consumers/eduserver_typing.go3
-rw-r--r--syncapi/consumers/keychange.go3
-rw-r--r--syncapi/consumers/roomserver.go3
-rw-r--r--syncapi/syncapi.go14
26 files changed, 187 insertions, 24 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 7a438041..d783c7eb 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -89,7 +89,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, consumer, appserviceDB,
+ base.ProcessContext, base.Cfg, consumer, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 0b251d43..5cbffa35 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/Shopify/sarama"
@@ -41,6 +42,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
+ process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
appserviceDB storage.Database,
@@ -48,6 +50,7 @@ func NewOutputRoomEventConsumer(
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "appservice/roomserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
Consumer: kafkaConsumer,
diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go
index 8cd5cb8b..332d156b 100644
--- a/build/gobind/monolith.go
+++ b/build/gobind/monolith.go
@@ -166,6 +166,7 @@ func (m *DendriteMonolith) Start() {
),
}
monolith.AddAllPublicRoutes(
+ base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index 31e7739a..0610ec77 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -192,6 +192,7 @@ func main() {
ExtPublicRoomsProvider: provider,
}
monolith.AddAllPublicRoutes(
+ base.Base.ProcessContext,
base.Base.PublicClientAPIMux,
base.Base.PublicFederationAPIMux,
base.Base.PublicKeyAPIMux,
@@ -234,5 +235,5 @@ func main() {
}
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
- select {}
+ base.Base.WaitForShutdown()
}
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index aea6f7c4..8091298b 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -150,6 +150,7 @@ func main() {
),
}
monolith.AddAllPublicRoutes(
+ base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
@@ -200,5 +201,6 @@ func main() {
}
}()
- select {}
+ // We want to block forever to let the HTTP and HTTPS handler serve the APIs
+ base.WaitForShutdown()
}
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 55bac6fe..b82f7321 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -144,6 +144,7 @@ func main() {
KeyAPI: keyAPI,
}
monolith.AddAllPublicRoutes(
+ base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
@@ -176,5 +177,5 @@ func main() {
}
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
- select {}
+ base.WaitForShutdown()
}
diff --git a/cmd/dendrite-polylith-multi/main.go b/cmd/dendrite-polylith-multi/main.go
index 979ab436..d3c52967 100644
--- a/cmd/dendrite-polylith-multi/main.go
+++ b/cmd/dendrite-polylith-multi/main.go
@@ -74,5 +74,6 @@ func main() {
base := setup.NewBaseDendrite(cfg, component, false) // TODO
defer base.Close() // nolint: errcheck
- start(base, cfg)
+ go start(base, cfg)
+ base.WaitForShutdown()
}
diff --git a/cmd/dendrite-polylith-multi/personalities/syncapi.go b/cmd/dendrite-polylith-multi/personalities/syncapi.go
index 1c33286e..b9b20229 100644
--- a/cmd/dendrite-polylith-multi/personalities/syncapi.go
+++ b/cmd/dendrite-polylith-multi/personalities/syncapi.go
@@ -27,6 +27,7 @@ func SyncAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(
+ base.ProcessContext,
base.PublicClientAPIMux, userAPI, rsAPI,
base.KeyServerHTTPClient(),
federation, &cfg.SyncAPI,
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index b77d141d..0dfa4681 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -231,6 +231,7 @@ func main() {
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}
monolith.AddAllPublicRoutes(
+ base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go
index 6d11eb88..639cd731 100644
--- a/federationsender/consumers/eduserver.go
+++ b/federationsender/consumers/eduserver.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
@@ -44,6 +45,7 @@ type OutputEDUConsumer struct {
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputEDUConsumer(
+ process *process.ProcessContext,
cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
@@ -51,18 +53,21 @@ func NewOutputEDUConsumer(
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
+ Process: process,
ComponentName: "eduserver/typing",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
+ Process: process,
ComponentName: "eduserver/sendtodevice",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
receiptConsumer: &internal.ContinualConsumer{
+ Process: process,
ComponentName: "eduserver/receipt",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
Consumer: kafkaConsumer,
diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go
index 5006ac28..9e146390 100644
--- a/federationsender/consumers/keychange.go
+++ b/federationsender/consumers/keychange.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@@ -41,6 +42,7 @@ type KeyChangeConsumer struct {
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
+ process *process.ProcessContext,
cfg *config.KeyServer,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
@@ -49,6 +51,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
+ Process: process,
ComponentName: "federationsender/keychange",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
Consumer: kafkaConsumer,
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 846468fa..f9c4a5c2 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@@ -41,6 +42,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
+ process *process.ProcessContext,
cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
@@ -48,6 +50,7 @@ func NewOutputRoomEventConsumer(
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "federationsender/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index a24e0f48..9aab91d4 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -59,7 +59,8 @@ func NewInternalAPI(
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
queues := queue.NewOutgoingQueues(
- federationSenderDB, cfg.Matrix.DisableFederation,
+ federationSenderDB, base.ProcessContext,
+ cfg.Matrix.DisableFederation,
cfg.Matrix.ServerName, federation, rsAPI, stats,
&queue.SigningInfo{
KeyID: cfg.Matrix.KeyID,
@@ -69,7 +70,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, queues,
+ base.ProcessContext, cfg, consumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@@ -77,13 +78,13 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
- cfg, consumer, queues, federationSenderDB,
+ base.ProcessContext, cfg, consumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
- &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
+ base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index c8b0bf65..99b9e449 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@@ -46,6 +47,7 @@ const (
// at a time.
type destinationQueue struct {
db storage.Database
+ process *process.ProcessContext
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
@@ -411,7 +413,7 @@ func (oq *destinationQueue) nextTransaction(
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
- ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
+ ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
defer cancel()
_, err := oq.client.SendTransaction(ctx, t)
switch err.(type) {
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 8054856e..4453ddb0 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
@@ -36,6 +37,7 @@ import (
// matrix servers
type OutgoingQueues struct {
db storage.Database
+ process *process.ProcessContext
disabled bool
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
@@ -80,6 +82,7 @@ var destinationQueueBackingOff = prometheus.NewGauge(
// NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(
db storage.Database,
+ process *process.ProcessContext,
disabled bool,
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
@@ -89,6 +92,7 @@ func NewOutgoingQueues(
) *OutgoingQueues {
queues := &OutgoingQueues{
disabled: disabled,
+ process: process,
db: db,
rsAPI: rsAPI,
origin: origin,
@@ -151,6 +155,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destinationQueueTotal.Inc()
oq = &destinationQueue{
db: oqs.db,
+ process: oqs.process,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,
diff --git a/internal/consumers.go b/internal/consumers.go
index 807cf589..3a4e0b7f 100644
--- a/internal/consumers.go
+++ b/internal/consumers.go
@@ -20,6 +20,8 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/sirupsen/logrus"
)
// A PartitionStorer has the storage APIs needed by the consumer.
@@ -33,6 +35,9 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
+ // The parent context for the listener, stop consuming when this context is done
+ Process *process.ProcessContext
+ // The component name
ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
@@ -100,6 +105,15 @@ func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
}
for _, pc := range partitionConsumers {
go c.consumePartition(pc)
+ if c.Process != nil {
+ c.Process.ComponentStarted()
+ go func(pc sarama.PartitionConsumer) {
+ <-c.Process.WaitForShutdown()
+ _ = pc.Close()
+ c.Process.ComponentFinished()
+ logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic)
+ }(pc)
+ }
}
return storedOffsets, nil
diff --git a/setup/base.go b/setup/base.go
index c30e6910..6522426c 100644
--- a/setup/base.go
+++ b/setup/base.go
@@ -15,22 +15,28 @@
package setup
import (
+ "context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
+ "os"
+ "os/signal"
+ "syscall"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
+ "go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/gorilla/mux"
@@ -61,6 +67,7 @@ import (
// should only be used during start up.
// Must be closed when shutting down.
type BaseDendrite struct {
+ *process.ProcessContext
componentName string
tracerCloser io.Closer
PublicClientAPIMux *mux.Router
@@ -161,7 +168,9 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
// We need to be careful with media APIs if they read from a filesystem to make sure they
// are not inadvertently reading paths without cleaning, else this could introduce a
// directory traversal attack e.g /../../../etc/passwd
+
return &BaseDendrite{
+ ProcessContext: process.NewProcessContext(),
componentName: componentName,
UseHTTPAPIs: useHTTPAPIs,
tracerCloser: closer,
@@ -354,14 +363,26 @@ func (b *BaseDendrite) SetupAndServeHTTP(
if internalAddr != NoListener && internalAddr != externalAddr {
go func() {
+ var internalShutdown atomic.Bool // RegisterOnShutdown can be called more than once
logrus.Infof("Starting internal %s listener on %s", b.componentName, internalServ.Addr)
+ b.ProcessContext.ComponentStarted()
+ internalServ.RegisterOnShutdown(func() {
+ if internalShutdown.CAS(false, true) {
+ b.ProcessContext.ComponentFinished()
+ logrus.Infof("Stopped internal HTTP listener")
+ }
+ })
if certFile != nil && keyFile != nil {
if err := internalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil {
- logrus.WithError(err).Fatal("failed to serve HTTPS")
+ if err != http.ErrServerClosed {
+ logrus.WithError(err).Fatal("failed to serve HTTPS")
+ }
}
} else {
if err := internalServ.ListenAndServe(); err != nil {
- logrus.WithError(err).Fatal("failed to serve HTTP")
+ if err != http.ErrServerClosed {
+ logrus.WithError(err).Fatal("failed to serve HTTP")
+ }
}
}
logrus.Infof("Stopped internal %s listener on %s", b.componentName, internalServ.Addr)
@@ -370,19 +391,52 @@ func (b *BaseDendrite) SetupAndServeHTTP(
if externalAddr != NoListener {
go func() {
+ var externalShutdown atomic.Bool // RegisterOnShutdown can be called more than once
logrus.Infof("Starting external %s listener on %s", b.componentName, externalServ.Addr)
+ b.ProcessContext.ComponentStarted()
+ externalServ.RegisterOnShutdown(func() {
+ if externalShutdown.CAS(false, true) {
+ b.ProcessContext.ComponentFinished()
+ logrus.Infof("Stopped external HTTP listener")
+ }
+ })
if certFile != nil && keyFile != nil {
if err := externalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil {
- logrus.WithError(err).Fatal("failed to serve HTTPS")
+ if err != http.ErrServerClosed {
+ logrus.WithError(err).Fatal("failed to serve HTTPS")
+ }
}
} else {
if err := externalServ.ListenAndServe(); err != nil {
- logrus.WithError(err).Fatal("failed to serve HTTP")
+ if err != http.ErrServerClosed {
+ logrus.WithError(err).Fatal("failed to serve HTTP")
+ }
}
}
logrus.Infof("Stopped external %s listener on %s", b.componentName, externalServ.Addr)
}()
}
- select {}
+ <-b.ProcessContext.WaitForShutdown()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ _ = internalServ.Shutdown(ctx)
+ _ = externalServ.Shutdown(ctx)
+ logrus.Infof("Stopped HTTP listeners")
+}
+
+func (b *BaseDendrite) WaitForShutdown() {
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ <-sigs
+ signal.Reset(syscall.SIGINT, syscall.SIGTERM)
+
+ logrus.Warnf("Shutdown signal received")
+
+ b.ProcessContext.ShutdownDendrite()
+ b.ProcessContext.WaitForComponentsToFinish()
+
+ logrus.Warnf("Dendrite is exiting now")
}
diff --git a/setup/monolith.go b/setup/monolith.go
index fd84ef53..a740ebb7 100644
--- a/setup/monolith.go
+++ b/setup/monolith.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/mediaapi"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
serverKeyAPI "github.com/matrix-org/dendrite/signingkeyserver/api"
"github.com/matrix-org/dendrite/syncapi"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -56,7 +57,7 @@ type Monolith struct {
}
// AddAllPublicRoutes attaches all public paths to the given router
-func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
+func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, mediaMux *mux.Router) {
clientapi.AddPublicRoutes(
csMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
@@ -71,7 +72,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
- csMux, m.UserAPI, m.RoomserverAPI,
+ process, csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}
diff --git a/setup/process/process.go b/setup/process/process.go
new file mode 100644
index 00000000..d55751d7
--- /dev/null
+++ b/setup/process/process.go
@@ -0,0 +1,45 @@
+package process
+
+import (
+ "context"
+ "sync"
+)
+
+type ProcessContext struct {
+ wg *sync.WaitGroup // used to wait for components to shutdown
+ ctx context.Context // cancelled when Stop is called
+ shutdown context.CancelFunc // shut down Dendrite
+}
+
+func NewProcessContext() *ProcessContext {
+ ctx, shutdown := context.WithCancel(context.Background())
+ return &ProcessContext{
+ ctx: ctx,
+ shutdown: shutdown,
+ wg: &sync.WaitGroup{},
+ }
+}
+
+func (b *ProcessContext) Context() context.Context {
+ return context.WithValue(b.ctx, "scope", "process") // nolint:staticcheck
+}
+
+func (b *ProcessContext) ComponentStarted() {
+ b.wg.Add(1)
+}
+
+func (b *ProcessContext) ComponentFinished() {
+ b.wg.Done()
+}
+
+func (b *ProcessContext) ShutdownDendrite() {
+ b.shutdown()
+}
+
+func (b *ProcessContext) WaitForShutdown() <-chan struct{} {
+ return b.ctx.Done()
+}
+
+func (b *ProcessContext) WaitForComponentsToFinish() {
+ b.wg.Wait()
+}
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 4958f221..8dab513c 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -38,14 +39,15 @@ type OutputClientDataConsumer struct {
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputClientDataConsumer {
-
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index bd538eff..59890807 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -39,6 +40,7 @@ type OutputReceiptEventConsumer struct {
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputReceiptEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -47,6 +49,7 @@ func NewOutputReceiptEventConsumer(
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/eduserver/receipt",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 6e774b5b..668d3078 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -42,6 +43,7 @@ type OutputSendToDeviceEventConsumer struct {
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputSendToDeviceEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -50,6 +52,7 @@ func NewOutputSendToDeviceEventConsumer(
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 3edf6675..7d7ab3bf 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -39,6 +40,7 @@ type OutputTypingEventConsumer struct {
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputTypingEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -48,6 +50,7 @@ func NewOutputTypingEventConsumer(
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index af7b280f..0e1a790d 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -46,6 +47,7 @@ type OutputKeyChangeEventConsumer struct {
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
+ process *process.ProcessContext,
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
@@ -57,6 +59,7 @@ func NewOutputKeyChangeEventConsumer(
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/keychange",
Topic: topic,
Consumer: kafkaConsumer,
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index a8cc5f71..85e73df6 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -43,6 +44,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
+ process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@@ -53,6 +55,7 @@ func NewOutputRoomEventConsumer(
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ Process: process,
ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 4a09940d..84c7140c 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -39,6 +40,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
// component.
func AddPublicRoutes(
+ process *process.ProcessContext,
router *mux.Router,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
@@ -63,7 +65,7 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
+ process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
@@ -71,7 +73,7 @@ func AddPublicRoutes(
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
@@ -79,28 +81,28 @@ func AddPublicRoutes(
}
clientConsumer := consumers.NewOutputClientDataConsumer(
- cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
- cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
+ process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
- cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
- cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
+ process, cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")