aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-09-01 16:53:38 +0100
committerGitHub <noreply@github.com>2020-09-01 16:53:38 +0100
commit89c772fb782361bf4cb533ae7287f7d1dea947f6 (patch)
tree88df5d3b46b28a50e48f6355b80a952f2faf10aa
parent6d79f043541f522207f298d0f585c01e74e1177d (diff)
Report which component failed to consume (#1375)
-rw-r--r--appservice/consumers/roomserver.go1
-rw-r--r--currentstateserver/consumers/roomserver.go1
-rw-r--r--federationsender/consumers/eduserver.go2
-rw-r--r--federationsender/consumers/keychange.go1
-rw-r--r--federationsender/consumers/roomserver.go1
-rw-r--r--internal/consumers.go3
-rw-r--r--syncapi/consumers/clientapi.go1
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go1
-rw-r--r--syncapi/consumers/eduserver_typing.go1
-rw-r--r--syncapi/consumers/keychange.go1
-rw-r--r--syncapi/consumers/roomserver.go1
11 files changed, 13 insertions, 1 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 3c9e52da..560cd237 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer(
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "appservice/roomserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: appserviceDB,
diff --git a/currentstateserver/consumers/roomserver.go b/currentstateserver/consumers/roomserver.go
index 23495b24..cb054481 100644
--- a/currentstateserver/consumers/roomserver.go
+++ b/currentstateserver/consumers/roomserver.go
@@ -36,6 +36,7 @@ type OutputRoomEventConsumer struct {
func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database, acls *acls.ServerACLs) *OutputRoomEventConsumer {
consumer := &internal.ContinualConsumer{
+ ComponentName: "currentstateserver/roomserver",
Topic: topicName,
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go
index e1a42f07..d9ac41b3 100644
--- a/federationsender/consumers/eduserver.go
+++ b/federationsender/consumers/eduserver.go
@@ -50,11 +50,13 @@ func NewOutputEDUConsumer(
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
+ ComponentName: "eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
+ ComponentName: "eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go
index c1136f10..4f206f5f 100644
--- a/federationsender/consumers/keychange.go
+++ b/federationsender/consumers/keychange.go
@@ -49,6 +49,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
+ ComponentName: "federationsender/keychange",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 92b4d6f4..efeb53fa 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer(
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "federationsender/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/internal/consumers.go b/internal/consumers.go
index c000c171..807cf589 100644
--- a/internal/consumers.go
+++ b/internal/consumers.go
@@ -33,6 +33,7 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
+ ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
Topic string
@@ -111,7 +112,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
msgErr := c.ProcessMessage(message)
// Advance our position in the stream so that we will start at the right position after a restart.
if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
- panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err))
+ panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err))
}
// Shutdown if we were told to do so.
if msgErr == ErrShutdown {
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 6a1e590a..d03dd2c4 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -44,6 +44,7 @@ func NewOutputClientDataConsumer(
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 90bfe3e5..f880f3f2 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -48,6 +48,7 @@ func NewOutputSendToDeviceEventConsumer(
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 523728cd..80d1d000 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -44,6 +44,7 @@ func NewOutputTypingEventConsumer(
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index ee95e09d..93fa822d 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -56,6 +56,7 @@ func NewOutputKeyChangeEventConsumer(
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/keychange",
Topic: topic,
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index bf231d09..67e656c9 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -49,6 +49,7 @@ func NewOutputRoomEventConsumer(
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,