aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
diff options
context:
space:
mode:
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r--setup/jetstream/nats.go31
1 files changed, 27 insertions, 4 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 43cc0331..748c191b 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -1,6 +1,7 @@
package jetstream
import (
+ "reflect"
"strings"
"sync"
"time"
@@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
}
for _, stream := range streams { // streams are defined in streams.go
- name := cfg.TopicFor(stream.Name)
+ name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
+ subjects := stream.Subjects
+ if len(subjects) == 0 {
+ // By default we want each stream to listen for the subjects
+ // that are either an exact match for the stream name, or where
+ // the first part of the subject is the stream name. ">" is a
+ // wildcard in NATS for one or more subject tokens. In the case
+ // that the stream is called "Foo", this will match any message
+ // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
+ subjects = []string{name, name + ".>"}
+ }
+ if info != nil {
+ switch {
+ case !reflect.DeepEqual(info.Config.Subjects, subjects):
+ fallthrough
+ case info.Config.Retention != stream.Retention:
+ fallthrough
+ case info.Config.Storage != stream.Storage:
+ if err = s.DeleteStream(name); err != nil {
+ logrus.WithError(err).Fatal("Unable to delete stream")
+ }
+ info = nil
+ }
+ }
if info == nil {
- stream.Subjects = []string{name}
-
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
@@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
+ namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
- logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
+ logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
}
}
}