diff options
author | Kegsay <kegan@matrix.org> | 2020-07-27 09:19:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-27 09:19:55 +0100 |
commit | 61963a74ae84df94238cf384419acf4d59c311c1 (patch) | |
tree | b079ce2234a79734d60da8f495218e4d4dee0960 /internal/consumers.go | |
parent | abef9bc04f17fb1f0cfaf98f305588878fe9ee21 (diff) |
Add logic for determining when device lists have changed due to membership changes (#1220)
* Flesh out structure for handling device list updates for room membership changes
* First cut untested algorithm
* Add tests for determining changed/left device lists
* Linting
* Unbreak tests
* Sigh.. linting
Diffstat (limited to 'internal/consumers.go')
-rw-r--r-- | internal/consumers.go | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/internal/consumers.go b/internal/consumers.go index d7917f23..c000c171 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -58,11 +58,17 @@ var ErrShutdown = fmt.Errorf("shutdown") // Returns nil once all the goroutines are started. // Returns an error if it can't start consuming for any of the partitions. func (c *ContinualConsumer) Start() error { + _, err := c.StartOffsets() + return err +} + +// StartOffsets is the same as Start but returns the loaded offsets as well. +func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) { offsets := map[int32]int64{} partitions, err := c.Consumer.Partitions(c.Topic) if err != nil { - return err + return nil, err } for _, partition := range partitions { // Default all the offsets to the beginning of the stream. @@ -71,7 +77,7 @@ func (c *ContinualConsumer) Start() error { storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) if err != nil { - return err + return nil, err } for _, offset := range storedOffsets { // We've already processed events from this partition so advance the offset to where we got to. @@ -87,7 +93,7 @@ func (c *ContinualConsumer) Start() error { for _, p := range partitionConsumers { p.Close() // nolint: errcheck } - return err + return nil, err } partitionConsumers = append(partitionConsumers, pc) } @@ -95,7 +101,7 @@ func (c *ContinualConsumer) Start() error { go c.consumePartition(pc) } - return nil + return storedOffsets, nil } // consumePartition consumes the room events for a single partition of the kafkaesque stream. |