diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2023-12-12 12:13:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 12:13:55 +0100 |
commit | 1555b3542d7da8243abc21e6a29758ebe419e5b5 (patch) | |
tree | fb717176258a440eb05d4101338fba867b62210e /syncapi/consumers | |
parent | 185ad6b00de8079fb6a638ed63e392dfe9f2b49f (diff) |
Introduce a new stream for the appservice consumer (#3277)
This introduces a new stream the syncAPI produces to once it processed a
`OutputRoomEvent` and the appservices consumes.
This is to work around a race condition where appservices receive an
event before the syncAPI has handled it, this can result in e.g. calls
to `/joined_members` returning a wrong membership list.
Diffstat (limited to 'syncapi/consumers')
-rw-r--r-- | syncapi/consumers/roomserver.go | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 666f900d..81c532f1 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct { inviteStream streams.StreamProvider notifier *notifier.Notifier fts fulltext.Indexer + asProducer *producers.AppserviceEventProducer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer( inviteStream streams.StreamProvider, rsAPI api.SyncRoomserverAPI, fts *fulltext.Search, + asProducer *producers.AppserviceEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer( inviteStream: inviteStream, rsAPI: rsAPI, fts: fts, + asProducer: asProducer, } } @@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms } } err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + if err == nil && s.asProducer != nil { + if err = s.asProducer.ProduceRoomEvents(msg); err != nil { + log.WithError(err).Warn("failed to produce OutputAppserviceEvent") + } + } case api.OutputTypeOldRoomEvent: err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: |