aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-09-01 16:53:38 +0100
committerGitHub <noreply@github.com>2020-09-01 16:53:38 +0100
commit89c772fb782361bf4cb533ae7287f7d1dea947f6 (patch)
tree88df5d3b46b28a50e48f6355b80a952f2faf10aa /federationsender
parent6d79f043541f522207f298d0f585c01e74e1177d (diff)
Report which component failed to consume (#1375)
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/consumers/eduserver.go2
-rw-r--r--federationsender/consumers/keychange.go1
-rw-r--r--federationsender/consumers/roomserver.go1
3 files changed, 4 insertions, 0 deletions
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go
index e1a42f07..d9ac41b3 100644
--- a/federationsender/consumers/eduserver.go
+++ b/federationsender/consumers/eduserver.go
@@ -50,11 +50,13 @@ func NewOutputEDUConsumer(
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
+ ComponentName: "eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
+ ComponentName: "eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go
index c1136f10..4f206f5f 100644
--- a/federationsender/consumers/keychange.go
+++ b/federationsender/consumers/keychange.go
@@ -49,6 +49,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
+ ComponentName: "federationsender/keychange",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 92b4d6f4..efeb53fa 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer(
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
+ ComponentName: "federationsender/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,