diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-01-07 17:31:57 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-07 17:31:57 +0000 |
commit | 16035b97373849d74961e15616f3f1449f0a5abd (patch) | |
tree | ac99aeba1814aa2e9df950912e08ef595148a969 /setup | |
parent | a42232143526de360309b112b57cf0d95adf47cb (diff) |
NATS JetStream tweaks (#2086)
* Use named NATS durable consumers
* Build fixes
* Remove dupe call to SetFederationAPI
* Use namespaced consumer name
* Fix namespacing
* Fix unit tests hopefully
Diffstat (limited to 'setup')
-rw-r--r-- | setup/config/config_jetstream.go | 6 | ||||
-rw-r--r-- | setup/jetstream/nats.go | 7 |
2 files changed, 12 insertions, 1 deletions
diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index 0bd84899..94e2d88b 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -2,6 +2,8 @@ package config import ( "fmt" + + "github.com/nats-io/nats.go" ) type JetStream struct { @@ -23,6 +25,10 @@ func (c *JetStream) TopicFor(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } +func (c *JetStream) Durable(name string) nats.SubOpt { + return nats.Durable(c.TopicFor(name)) +} + func (c *JetStream) Defaults(generate bool) { c.Addresses = []string{} c.TopicPrefix = "Dendrite" diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index eec68d82..6dbbd1f4 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -75,13 +75,18 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex } if info == nil { stream.Subjects = []string{name} + // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { stream.Storage = nats.MemoryStorage } - if _, err = s.AddStream(stream); err != nil { + // Namespace the streams without modifying the original streams + // array, otherwise we end up with namespaces on namespaces. + namespaced := *stream + namespaced.Name = name + if _, err = s.AddStream(&namespaced); err != nil { logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") } } |