aboutsummaryrefslogtreecommitdiff
path: root/setup
diff options
context:
space:
mode:
authorNeil <neil@nats.io>2023-07-07 18:59:34 +0100
committerGitHub <noreply@github.com>2023-07-07 19:59:34 +0200
commite93bdd56fd2c155eaf577e337e565f2054408fd4 (patch)
tree44852f843d13991e7cf9e9da88ea343c6ae1cebd /setup
parentc08c7405dbe9d88c1364f6f1f2466db5045506cc (diff)
Set max age for roomserver input stream to avoid excessive interior deletes (#3145)
If old messages build up in the input stream and do not get processed successfully, this can create a significant drift between the stream first sequence and the consumer ack floors, which results in a slow and expensive start-up when interest-based retention is in use. If a message is sat in the stream for 24 hours, it's probably not going to get processed successfully, so let NATS drop them instead. Dendrite can reconcile by fetching missing events later if it needs to. --------- Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'setup')
-rw-r--r--setup/jetstream/nats.go23
-rw-r--r--setup/jetstream/streams.go1
2 files changed, 21 insertions, 3 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index e440879c..8820e86b 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -87,6 +87,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
return js, nc
}
+// nolint:gocyclo
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
@@ -126,16 +127,32 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
subjects = []string{name, name + ".>"}
}
if info != nil {
+ // If the stream config doesn't match what we expect, try to update
+ // it. If that doesn't work then try to blow it away and we'll then
+ // recreate it in the next section.
+ // Each specific option that we set must be checked by hand, as if
+ // you DeepEqual the whole config struct, it will always show that
+ // there's a difference because the NATS Server will return defaults
+ // in the stream info.
switch {
case !reflect.DeepEqual(info.Config.Subjects, subjects):
fallthrough
case info.Config.Retention != stream.Retention:
fallthrough
case info.Config.Storage != stream.Storage:
- if err = s.DeleteStream(name); err != nil {
- logrus.WithError(err).Fatal("Unable to delete stream")
+ fallthrough
+ case info.Config.MaxAge != stream.MaxAge:
+ // Try updating the stream first, as many things can be updated
+ // non-destructively.
+ if info, err = s.UpdateStream(stream); err != nil {
+ logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name)
+ // We failed to update the stream, this is a last attempt to get
+ // things working but may result in data loss.
+ if err = s.DeleteStream(name); err != nil {
+ logrus.WithError(err).Fatalf("Unable to delete stream %q", name)
+ }
+ info = nil
}
- info = nil
}
}
if info == nil {
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index 590f0cbd..74140792 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -48,6 +48,7 @@ var streams = []*nats.StreamConfig{
Name: InputRoomEvent,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
+ MaxAge: time.Hour * 24,
},
{
Name: InputDeviceListUpdate,