aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-01-07 17:31:57 +0000
committerGitHub <noreply@github.com>2022-01-07 17:31:57 +0000
commit16035b97373849d74961e15616f3f1449f0a5abd (patch)
treeac99aeba1814aa2e9df950912e08ef595148a969 /appservice
parenta42232143526de360309b112b57cf0d95adf47cb (diff)
NATS JetStream tweaks (#2086)
* Use named NATS durable consumers * Build fixes * Remove dupe call to SetFederationAPI * Use namespaced consumer name * Fix namespacing * Fix unit tests hopefully
Diffstat (limited to 'appservice')
-rw-r--r--appservice/consumers/roomserver.go4
1 files changed, 3 insertions, 1 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 139b5724..8aea5c34 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -34,6 +34,7 @@ import (
type OutputRoomEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
+ durable nats.SubOpt
topic string
asDB storage.Database
rsAPI api.RoomserverInternalAPI
@@ -54,6 +55,7 @@ func NewOutputRoomEventConsumer(
return &OutputRoomEventConsumer{
ctx: process.Context(),
jetstream: js,
+ durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
asDB: appserviceDB,
rsAPI: rsAPI,
@@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}