aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-01-07 17:31:57 +0000
committerGitHub <noreply@github.com>2022-01-07 17:31:57 +0000
commit16035b97373849d74961e15616f3f1449f0a5abd (patch)
treeac99aeba1814aa2e9df950912e08ef595148a969 /setup/jetstream/nats.go
parenta42232143526de360309b112b57cf0d95adf47cb (diff)
NATS JetStream tweaks (#2086)
* Use named NATS durable consumers * Build fixes * Remove dupe call to SetFederationAPI * Use namespaced consumer name * Fix namespacing * Fix unit tests hopefully
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r--setup/jetstream/nats.go7
1 files changed, 6 insertions, 1 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index eec68d82..6dbbd1f4 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -75,13 +75,18 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
}
if info == nil {
stream.Subjects = []string{name}
+
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
stream.Storage = nats.MemoryStorage
}
- if _, err = s.AddStream(stream); err != nil {
+ // Namespace the streams without modifying the original streams
+ // array, otherwise we end up with namespaces on namespaces.
+ namespaced := *stream
+ namespaced.Name = name
+ if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
}
}