diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-01-07 17:31:57 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-07 17:31:57 +0000 |
commit | 16035b97373849d74961e15616f3f1449f0a5abd (patch) | |
tree | ac99aeba1814aa2e9df950912e08ef595148a969 /appservice | |
parent | a42232143526de360309b112b57cf0d95adf47cb (diff) |
NATS JetStream tweaks (#2086)
* Use named NATS durable consumers
* Build fixes
* Remove dupe call to SetFederationAPI
* Use namespaced consumer name
* Fix namespacing
* Fix unit tests hopefully
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/consumers/roomserver.go | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 139b5724..8aea5c34 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,6 +34,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string asDB storage.Database rsAPI api.RoomserverInternalAPI @@ -54,6 +55,7 @@ func NewOutputRoomEventConsumer( return &OutputRoomEventConsumer{ ctx: process.Context(), jetstream: js, + durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, @@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } |