aboutsummaryrefslogtreecommitdiff
path: root/userapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-23 10:20:18 +0000
committerGitHub <noreply@github.com>2022-03-23 10:20:18 +0000
commit98a5e410d7ecae49f525ddabc55a86d8a6731f22 (patch)
treeb957d21ae71c0d298add10417df8ae53808aaae3 /userapi
parent9572f5ed19abc0b635092108aa6956eaebc60578 (diff)
Per-room consumers (#2293)
* Roomserver input refactoring — again! * Ensure the actor runs again * Preserve consumer after unsubscribe * Another sprinkling of magic * Rename `TopicFor` to `Prefixed` * Recreate the stream if the config is bad * Check streams too * Prefix subjects, preserve inboxes * Recreate if subjects wrong * Remove stream subject * Reconstruct properly * Fix mutex unlock * Comments * Fix tests * Don't drop events * Review comments * Separate `queueInputRoomEvents` function * Re-jig control flow a bit
Diffstat (limited to 'userapi')
-rw-r--r--userapi/consumers/syncapi_readupdate.go2
-rw-r--r--userapi/consumers/syncapi_streamevent.go2
-rw-r--r--userapi/userapi.go4
3 files changed, 4 insertions, 4 deletions
diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go
index 2e58020b..067f9333 100644
--- a/userapi/consumers/syncapi_readupdate.go
+++ b/userapi/consumers/syncapi_readupdate.go
@@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer(
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
pgClient: pgClient,
userAPI: userAPI,
syncProducer: syncProducer,
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go
index 11081327..da3cd393 100644
--- a/userapi/consumers/syncapi_streamevent.go
+++ b/userapi/consumers/syncapi_streamevent.go
@@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer(
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient,
userAPI: userAPI,
rsAPI: rsAPI,
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 97bdf7b2..e91ce3a7 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -54,8 +54,8 @@ func NewInternalAPI(
// it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from
// here.
- cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
- cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
+ cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
+ cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
)
userAPI := &internal.UserInternalAPI{