aboutsummaryrefslogtreecommitdiff
path: root/internal/consumers.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/consumers.go')
-rw-r--r--internal/consumers.go14
1 files changed, 10 insertions, 4 deletions
diff --git a/internal/consumers.go b/internal/consumers.go
index d7917f23..c000c171 100644
--- a/internal/consumers.go
+++ b/internal/consumers.go
@@ -58,11 +58,17 @@ var ErrShutdown = fmt.Errorf("shutdown")
// Returns nil once all the goroutines are started.
// Returns an error if it can't start consuming for any of the partitions.
func (c *ContinualConsumer) Start() error {
+ _, err := c.StartOffsets()
+ return err
+}
+
+// StartOffsets is the same as Start but returns the loaded offsets as well.
+func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
offsets := map[int32]int64{}
partitions, err := c.Consumer.Partitions(c.Topic)
if err != nil {
- return err
+ return nil, err
}
for _, partition := range partitions {
// Default all the offsets to the beginning of the stream.
@@ -71,7 +77,7 @@ func (c *ContinualConsumer) Start() error {
storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
if err != nil {
- return err
+ return nil, err
}
for _, offset := range storedOffsets {
// We've already processed events from this partition so advance the offset to where we got to.
@@ -87,7 +93,7 @@ func (c *ContinualConsumer) Start() error {
for _, p := range partitionConsumers {
p.Close() // nolint: errcheck
}
- return err
+ return nil, err
}
partitionConsumers = append(partitionConsumers, pc)
}
@@ -95,7 +101,7 @@ func (c *ContinualConsumer) Start() error {
go c.consumePartition(pc)
}
- return nil
+ return storedOffsets, nil
}
// consumePartition consumes the room events for a single partition of the kafkaesque stream.