aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-12-12 12:13:55 +0100
committerGitHub <noreply@github.com>2023-12-12 12:13:55 +0100
commit1555b3542d7da8243abc21e6a29758ebe419e5b5 (patch)
treefb717176258a440eb05d4101338fba867b62210e /syncapi
parent185ad6b00de8079fb6a638ed63e392dfe9f2b49f (diff)
Introduce a new stream for the appservice consumer (#3277)
This introduces a new stream the syncAPI produces to once it processed a `OutputRoomEvent` and the appservices consumes. This is to work around a race condition where appservices receive an event before the syncAPI has handled it, this can result in e.g. calls to `/joined_members` returning a wrong membership list.
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go9
-rw-r--r--syncapi/producers/appservices.go33
-rw-r--r--syncapi/syncapi.go9
3 files changed, 50 insertions, 1 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 666f900d..81c532f1 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/synctypes"
@@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct {
inviteStream streams.StreamProvider
notifier *notifier.Notifier
fts fulltext.Indexer
+ asProducer *producers.AppserviceEventProducer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer(
inviteStream streams.StreamProvider,
rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search,
+ asProducer *producers.AppserviceEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer(
inviteStream: inviteStream,
rsAPI: rsAPI,
fts: fts,
+ asProducer: asProducer,
}
}
@@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
+ if err == nil && s.asProducer != nil {
+ if err = s.asProducer.ProduceRoomEvents(msg); err != nil {
+ log.WithError(err).Warn("failed to produce OutputAppserviceEvent")
+ }
+ }
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
diff --git a/syncapi/producers/appservices.go b/syncapi/producers/appservices.go
new file mode 100644
index 00000000..bb0dbb9c
--- /dev/null
+++ b/syncapi/producers/appservices.go
@@ -0,0 +1,33 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "github.com/nats-io/nats.go"
+)
+
+// AppserviceEventProducer produces events for the appservice API to consume
+type AppserviceEventProducer struct {
+ Topic string
+ JetStream nats.JetStreamContext
+}
+
+func (a *AppserviceEventProducer) ProduceRoomEvents(
+ msg *nats.Msg,
+) error {
+ msg.Subject = a.Topic
+ _, err := a.JetStream.PublishMsg(msg)
+ return err
+}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 091e3db4..0418ffc0 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -100,9 +100,16 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start key change consumer")
}
+ var asProducer *producers.AppserviceEventProducer
+ if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 {
+ asProducer = &producers.AppserviceEventProducer{
+ JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
+ }
+ }
+
roomConsumer := consumers.NewOutputRoomEventConsumer(
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
- streams.InviteStreamProvider, rsAPI, fts,
+ streams.InviteStreamProvider, rsAPI, fts, asProducer,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")