aboutsummaryrefslogtreecommitdiff
path: root/internal/consumers.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/consumers.go')
-rw-r--r--internal/consumers.go14
1 files changed, 14 insertions, 0 deletions
diff --git a/internal/consumers.go b/internal/consumers.go
index 807cf589..3a4e0b7f 100644
--- a/internal/consumers.go
+++ b/internal/consumers.go
@@ -20,6 +20,8 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/sirupsen/logrus"
)
// A PartitionStorer has the storage APIs needed by the consumer.
@@ -33,6 +35,9 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
+ // The parent context for the listener, stop consuming when this context is done
+ Process *process.ProcessContext
+ // The component name
ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
@@ -100,6 +105,15 @@ func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
}
for _, pc := range partitionConsumers {
go c.consumePartition(pc)
+ if c.Process != nil {
+ c.Process.ComponentStarted()
+ go func(pc sarama.PartitionConsumer) {
+ <-c.Process.WaitForShutdown()
+ _ = pc.Close()
+ c.Process.ComponentFinished()
+ logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic)
+ }(pc)
+ }
}
return storedOffsets, nil