diff options
Diffstat (limited to 'internal/consumers.go')
-rw-r--r-- | internal/consumers.go | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/internal/consumers.go b/internal/consumers.go index d7917f23..c000c171 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -58,11 +58,17 @@ var ErrShutdown = fmt.Errorf("shutdown") // Returns nil once all the goroutines are started. // Returns an error if it can't start consuming for any of the partitions. func (c *ContinualConsumer) Start() error { + _, err := c.StartOffsets() + return err +} + +// StartOffsets is the same as Start but returns the loaded offsets as well. +func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) { offsets := map[int32]int64{} partitions, err := c.Consumer.Partitions(c.Topic) if err != nil { - return err + return nil, err } for _, partition := range partitions { // Default all the offsets to the beginning of the stream. @@ -71,7 +77,7 @@ func (c *ContinualConsumer) Start() error { storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) if err != nil { - return err + return nil, err } for _, offset := range storedOffsets { // We've already processed events from this partition so advance the offset to where we got to. @@ -87,7 +93,7 @@ func (c *ContinualConsumer) Start() error { for _, p := range partitionConsumers { p.Close() // nolint: errcheck } - return err + return nil, err } partitionConsumers = append(partitionConsumers, pc) } @@ -95,7 +101,7 @@ func (c *ContinualConsumer) Start() error { go c.consumePartition(pc) } - return nil + return storedOffsets, nil } // consumePartition consumes the room events for a single partition of the kafkaesque stream. |