From 61963a74ae84df94238cf384419acf4d59c311c1 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Mon, 27 Jul 2020 09:19:55 +0100 Subject: Add logic for determining when device lists have changed due to membership changes (#1220) * Flesh out structure for handling device list updates for room membership changes * First cut untested algorithm * Add tests for determining changed/left device lists * Linting * Unbreak tests * Sigh.. linting --- internal/consumers.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'internal') diff --git a/internal/consumers.go b/internal/consumers.go index d7917f23..c000c171 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -58,11 +58,17 @@ var ErrShutdown = fmt.Errorf("shutdown") // Returns nil once all the goroutines are started. // Returns an error if it can't start consuming for any of the partitions. func (c *ContinualConsumer) Start() error { + _, err := c.StartOffsets() + return err +} + +// StartOffsets is the same as Start but returns the loaded offsets as well. +func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) { offsets := map[int32]int64{} partitions, err := c.Consumer.Partitions(c.Topic) if err != nil { - return err + return nil, err } for _, partition := range partitions { // Default all the offsets to the beginning of the stream. @@ -71,7 +77,7 @@ func (c *ContinualConsumer) Start() error { storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) if err != nil { - return err + return nil, err } for _, offset := range storedOffsets { // We've already processed events from this partition so advance the offset to where we got to. @@ -87,7 +93,7 @@ func (c *ContinualConsumer) Start() error { for _, p := range partitionConsumers { p.Close() // nolint: errcheck } - return err + return nil, err } partitionConsumers = append(partitionConsumers, pc) } @@ -95,7 +101,7 @@ func (c *ContinualConsumer) Start() error { go c.consumePartition(pc) } - return nil + return storedOffsets, nil } // consumePartition consumes the room events for a single partition of the kafkaesque stream. -- cgit v1.2.3