aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-23 10:20:18 +0000
committerGitHub <noreply@github.com>2022-03-23 10:20:18 +0000
commit98a5e410d7ecae49f525ddabc55a86d8a6731f22 (patch)
treeb957d21ae71c0d298add10417df8ae53808aaae3 /setup/jetstream
parent9572f5ed19abc0b635092108aa6956eaebc60578 (diff)
Per-room consumers (#2293)
* Roomserver input refactoring — again! * Ensure the actor runs again * Preserve consumer after unsubscribe * Another sprinkling of magic * Rename `TopicFor` to `Prefixed` * Recreate the stream if the config is bad * Check streams too * Prefix subjects, preserve inboxes * Recreate if subjects wrong * Remove stream subject * Reconstruct properly * Fix mutex unlock * Comments * Fix tests * Don't drop events * Review comments * Separate `queueInputRoomEvents` function * Re-jig control flow a bit
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/nats.go31
-rw-r--r--setup/jetstream/streams.go14
2 files changed, 40 insertions, 5 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 43cc0331..748c191b 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -1,6 +1,7 @@
package jetstream
import (
+ "reflect"
"strings"
"sync"
"time"
@@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
}
for _, stream := range streams { // streams are defined in streams.go
- name := cfg.TopicFor(stream.Name)
+ name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
+ subjects := stream.Subjects
+ if len(subjects) == 0 {
+ // By default we want each stream to listen for the subjects
+ // that are either an exact match for the stream name, or where
+ // the first part of the subject is the stream name. ">" is a
+ // wildcard in NATS for one or more subject tokens. In the case
+ // that the stream is called "Foo", this will match any message
+ // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
+ subjects = []string{name, name + ".>"}
+ }
+ if info != nil {
+ switch {
+ case !reflect.DeepEqual(info.Config.Subjects, subjects):
+ fallthrough
+ case info.Config.Retention != stream.Retention:
+ fallthrough
+ case info.Config.Storage != stream.Storage:
+ if err = s.DeleteStream(name); err != nil {
+ logrus.WithError(err).Fatal("Unable to delete stream")
+ }
+ info = nil
+ }
+ }
if info == nil {
- stream.Subjects = []string{name}
-
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
@@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
+ namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
- logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
+ logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
}
}
}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index aa3e95cb..aa979924 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -1,6 +1,8 @@
package jetstream
import (
+ "fmt"
+ "regexp"
"time"
"github.com/nats-io/nats.go"
@@ -24,10 +26,20 @@ var (
OutputReadUpdate = "OutputReadUpdate"
)
+var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
+
+func Tokenise(str string) string {
+ return safeCharacters.ReplaceAllString(str, "_")
+}
+
+func InputRoomEventSubj(roomID string) string {
+ return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID))
+}
+
var streams = []*nats.StreamConfig{
{
Name: InputRoomEvent,
- Retention: nats.WorkQueuePolicy,
+ Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{