aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-23 10:20:18 +0000
committerGitHub <noreply@github.com>2022-03-23 10:20:18 +0000
commit98a5e410d7ecae49f525ddabc55a86d8a6731f22 (patch)
treeb957d21ae71c0d298add10417df8ae53808aaae3 /setup/jetstream/nats.go
parent9572f5ed19abc0b635092108aa6956eaebc60578 (diff)
Per-room consumers (#2293)
* Roomserver input refactoring — again! * Ensure the actor runs again * Preserve consumer after unsubscribe * Another sprinkling of magic * Rename `TopicFor` to `Prefixed` * Recreate the stream if the config is bad * Check streams too * Prefix subjects, preserve inboxes * Recreate if subjects wrong * Remove stream subject * Reconstruct properly * Fix mutex unlock * Comments * Fix tests * Don't drop events * Review comments * Separate `queueInputRoomEvents` function * Re-jig control flow a bit
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r--setup/jetstream/nats.go31
1 files changed, 27 insertions, 4 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 43cc0331..748c191b 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -1,6 +1,7 @@
package jetstream
import (
+ "reflect"
"strings"
"sync"
"time"
@@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
}
for _, stream := range streams { // streams are defined in streams.go
- name := cfg.TopicFor(stream.Name)
+ name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
+ subjects := stream.Subjects
+ if len(subjects) == 0 {
+ // By default we want each stream to listen for the subjects
+ // that are either an exact match for the stream name, or where
+ // the first part of the subject is the stream name. ">" is a
+ // wildcard in NATS for one or more subject tokens. In the case
+ // that the stream is called "Foo", this will match any message
+ // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
+ subjects = []string{name, name + ".>"}
+ }
+ if info != nil {
+ switch {
+ case !reflect.DeepEqual(info.Config.Subjects, subjects):
+ fallthrough
+ case info.Config.Retention != stream.Retention:
+ fallthrough
+ case info.Config.Storage != stream.Storage:
+ if err = s.DeleteStream(name); err != nil {
+ logrus.WithError(err).Fatal("Unable to delete stream")
+ }
+ info = nil
+ }
+ }
if info == nil {
- stream.Subjects = []string{name}
-
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
@@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
+ namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
- logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
+ logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
}
}
}