diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-09-01 16:53:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-01 16:53:38 +0100 |
commit | 89c772fb782361bf4cb533ae7287f7d1dea947f6 (patch) | |
tree | 88df5d3b46b28a50e48f6355b80a952f2faf10aa /internal | |
parent | 6d79f043541f522207f298d0f585c01e74e1177d (diff) |
Report which component failed to consume (#1375)
Diffstat (limited to 'internal')
-rw-r--r-- | internal/consumers.go | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/internal/consumers.go b/internal/consumers.go index c000c171..807cf589 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -33,6 +33,7 @@ type PartitionStorer interface { // A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to // remember the offset it reached. type ContinualConsumer struct { + ComponentName string // The kafkaesque topic to consume events from. // This is the name used in kafka to identify the stream to consume events from. Topic string @@ -111,7 +112,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { msgErr := c.ProcessMessage(message) // Advance our position in the stream so that we will start at the right position after a restart. if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil { - panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err)) + panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err)) } // Shutdown if we were told to do so. if msgErr == ErrShutdown { |