aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-09-01 16:53:38 +0100
committerGitHub <noreply@github.com>2020-09-01 16:53:38 +0100
commit89c772fb782361bf4cb533ae7287f7d1dea947f6 (patch)
tree88df5d3b46b28a50e48f6355b80a952f2faf10aa /internal
parent6d79f043541f522207f298d0f585c01e74e1177d (diff)
Report which component failed to consume (#1375)
Diffstat (limited to 'internal')
-rw-r--r--internal/consumers.go3
1 files changed, 2 insertions, 1 deletions
diff --git a/internal/consumers.go b/internal/consumers.go
index c000c171..807cf589 100644
--- a/internal/consumers.go
+++ b/internal/consumers.go
@@ -33,6 +33,7 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
+ ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
Topic string
@@ -111,7 +112,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
msgErr := c.ProcessMessage(message)
// Advance our position in the stream so that we will start at the right position after a restart.
if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
- panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err))
+ panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err))
}
// Shutdown if we were told to do so.
if msgErr == ErrShutdown {