diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-01-26 12:56:20 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-26 12:56:20 +0000 |
commit | 9f443317bc578e1897c7eab9b4911f952f39fdbc (patch) | |
tree | 1c758596b56fcf9042c688d9f0204d731dbc216e /internal | |
parent | 64fb6de6d4f0860cc2b7503cfc36eb743552395b (diff) |
Graceful shutdowns (#1734)
* Initial graceful stop
* Fix dendritejs
* Use process context for outbound federation requests in destination queues
* Reduce logging
* Fix log level
Diffstat (limited to 'internal')
-rw-r--r-- | internal/consumers.go | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/internal/consumers.go b/internal/consumers.go index 807cf589..3a4e0b7f 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -20,6 +20,8 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/process" + "github.com/sirupsen/logrus" ) // A PartitionStorer has the storage APIs needed by the consumer. @@ -33,6 +35,9 @@ type PartitionStorer interface { // A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to // remember the offset it reached. type ContinualConsumer struct { + // The parent context for the listener, stop consuming when this context is done + Process *process.ProcessContext + // The component name ComponentName string // The kafkaesque topic to consume events from. // This is the name used in kafka to identify the stream to consume events from. @@ -100,6 +105,15 @@ func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) { } for _, pc := range partitionConsumers { go c.consumePartition(pc) + if c.Process != nil { + c.Process.ComponentStarted() + go func(pc sarama.PartitionConsumer) { + <-c.Process.WaitForShutdown() + _ = pc.Close() + c.Process.ComponentFinished() + logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic) + }(pc) + } } return storedOffsets, nil |