diff options
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r-- | setup/jetstream/nats.go | 43 |
1 files changed, 39 insertions, 4 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 748c191b..328cf915 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -1,11 +1,13 @@ package jetstream import ( + "fmt" "reflect" "strings" "sync" "time" + "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" @@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { - return setupNATS(cfg, nil) + return setupNATS(process, cfg, nil) } natsServerMutex.Lock() if natsServer == nil { @@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient if err != nil { logrus.Fatalln("Failed to create NATS client") } - return setupNATS(cfg, nc) + return setupNATS(process, cfg, nc) } -func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { +func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ",")) @@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream namespaced.Name = name namespaced.Subjects = subjects if _, err = s.AddStream(&namespaced); err != nil { - logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream") + logger := logrus.WithError(err).WithFields(logrus.Fields{ + "stream": namespaced.Name, + "subjects": namespaced.Subjects, + }) + + // If the stream was supposed to be in-memory to begin with + // then an error here is fatal so we'll give up. + if namespaced.Storage == natsclient.MemoryStorage { + logger.WithError(err).Fatal("Unable to add in-memory stream") + } + + // The stream was supposed to be on disk. Let's try starting + // Dendrite with the stream in-memory instead. That'll mean that + // we can't recover anything that was queued on the disk but we + // will still be able to start and run hopefully in the meantime. + logger.WithError(err).Error("Unable to add stream") + sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err)) + + namespaced.Storage = natsclient.MemoryStorage + if _, err = s.AddStream(&namespaced); err != nil { + // We tried to add the stream in-memory instead but something + // went wrong. That's an unrecoverable situation so we will + // give up at this point. + logger.WithError(err).Fatal("Unable to add in-memory stream") + } + + if stream.Storage != namespaced.Storage { + // We've managed to add the stream in memory. What's on the + // disk will be left alone, but our ability to recover from a + // future crash will be limited. Yell about it. + sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name)) + logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible") + process.Degraded() + } } } } |