aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/roomserver.go')
-rw-r--r--syncapi/consumers/roomserver.go9
1 files changed, 9 insertions, 0 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 666f900d..81c532f1 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct {
inviteStream streams.StreamProvider
notifier *notifier.Notifier
fts fulltext.Indexer
+ asProducer *producers.AppserviceEventProducer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer(
inviteStream streams.StreamProvider,
rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search,
+ asProducer *producers.AppserviceEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer(
inviteStream: inviteStream,
rsAPI: rsAPI,
fts: fts,
+ asProducer: asProducer,
}
}
@@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
+ if err == nil && s.asProducer != nil {
+ if err = s.asProducer.ProduceRoomEvents(msg); err != nil {
+ log.WithError(err).Warn("failed to produce OutputAppserviceEvent")
+ }
+ }
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent: