diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-03-23 10:20:18 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-23 10:20:18 +0000 |
commit | 98a5e410d7ecae49f525ddabc55a86d8a6731f22 (patch) | |
tree | b957d21ae71c0d298add10417df8ae53808aaae3 /setup/jetstream | |
parent | 9572f5ed19abc0b635092108aa6956eaebc60578 (diff) |
Per-room consumers (#2293)
* Roomserver input refactoring — again!
* Ensure the actor runs again
* Preserve consumer after unsubscribe
* Another sprinkling of magic
* Rename `TopicFor` to `Prefixed`
* Recreate the stream if the config is bad
* Check streams too
* Prefix subjects, preserve inboxes
* Recreate if subjects wrong
* Remove stream subject
* Reconstruct properly
* Fix mutex unlock
* Comments
* Fix tests
* Don't drop events
* Review comments
* Separate `queueInputRoomEvents` function
* Re-jig control flow a bit
Diffstat (limited to 'setup/jetstream')
-rw-r--r-- | setup/jetstream/nats.go | 31 | ||||
-rw-r--r-- | setup/jetstream/streams.go | 14 |
2 files changed, 40 insertions, 5 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 43cc0331..748c191b 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -1,6 +1,7 @@ package jetstream import ( + "reflect" "strings" "sync" "time" @@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream } for _, stream := range streams { // streams are defined in streams.go - name := cfg.TopicFor(stream.Name) + name := cfg.Prefixed(stream.Name) info, err := s.StreamInfo(name) if err != nil && err != natsclient.ErrStreamNotFound { logrus.WithError(err).Fatal("Unable to get stream info") } + subjects := stream.Subjects + if len(subjects) == 0 { + // By default we want each stream to listen for the subjects + // that are either an exact match for the stream name, or where + // the first part of the subject is the stream name. ">" is a + // wildcard in NATS for one or more subject tokens. In the case + // that the stream is called "Foo", this will match any message + // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. + subjects = []string{name, name + ".>"} + } + if info != nil { + switch { + case !reflect.DeepEqual(info.Config.Subjects, subjects): + fallthrough + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatal("Unable to delete stream") + } + info = nil + } + } if info == nil { - stream.Subjects = []string{name} - // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { @@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream // array, otherwise we end up with namespaces on namespaces. namespaced := *stream namespaced.Name = name + namespaced.Subjects = subjects if _, err = s.AddStream(&namespaced); err != nil { - logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") + logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream") } } } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index aa3e95cb..aa979924 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -1,6 +1,8 @@ package jetstream import ( + "fmt" + "regexp" "time" "github.com/nats-io/nats.go" @@ -24,10 +26,20 @@ var ( OutputReadUpdate = "OutputReadUpdate" ) +var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+") + +func Tokenise(str string) string { + return safeCharacters.ReplaceAllString(str, "_") +} + +func InputRoomEventSubj(roomID string) string { + return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID)) +} + var streams = []*nats.StreamConfig{ { Name: InputRoomEvent, - Retention: nats.WorkQueuePolicy, + Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, { |