diff options
author | Neil <neil@nats.io> | 2023-07-07 18:59:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-07 19:59:34 +0200 |
commit | e93bdd56fd2c155eaf577e337e565f2054408fd4 (patch) | |
tree | 44852f843d13991e7cf9e9da88ea343c6ae1cebd /setup | |
parent | c08c7405dbe9d88c1364f6f1f2466db5045506cc (diff) |
Set max age for roomserver input stream to avoid excessive interior deletes (#3145)
If old messages build up in the input stream and do not get processed
successfully, this can create a significant drift between the stream
first sequence and the consumer ack floors, which results in a slow and
expensive start-up when interest-based retention is in use.
If a message is sat in the stream for 24 hours, it's probably not going
to get processed successfully, so let NATS drop them instead. Dendrite
can reconcile by fetching missing events later if it needs to.
---------
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'setup')
-rw-r--r-- | setup/jetstream/nats.go | 23 | ||||
-rw-r--r-- | setup/jetstream/streams.go | 1 |
2 files changed, 21 insertions, 3 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index e440879c..8820e86b 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -87,6 +87,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS return js, nc } +// nolint:gocyclo func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error @@ -126,16 +127,32 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc subjects = []string{name, name + ".>"} } if info != nil { + // If the stream config doesn't match what we expect, try to update + // it. If that doesn't work then try to blow it away and we'll then + // recreate it in the next section. + // Each specific option that we set must be checked by hand, as if + // you DeepEqual the whole config struct, it will always show that + // there's a difference because the NATS Server will return defaults + // in the stream info. switch { case !reflect.DeepEqual(info.Config.Subjects, subjects): fallthrough case info.Config.Retention != stream.Retention: fallthrough case info.Config.Storage != stream.Storage: - if err = s.DeleteStream(name); err != nil { - logrus.WithError(err).Fatal("Unable to delete stream") + fallthrough + case info.Config.MaxAge != stream.MaxAge: + // Try updating the stream first, as many things can be updated + // non-destructively. + if info, err = s.UpdateStream(stream); err != nil { + logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) + // We failed to update the stream, this is a last attempt to get + // things working but may result in data loss. + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatalf("Unable to delete stream %q", name) + } + info = nil } - info = nil } } if info == nil { diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 590f0cbd..74140792 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -48,6 +48,7 @@ var streams = []*nats.StreamConfig{ Name: InputRoomEvent, Retention: nats.InterestPolicy, Storage: nats.FileStorage, + MaxAge: time.Hour * 24, }, { Name: InputDeviceListUpdate, |