diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-03-25 12:24:21 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-25 12:24:21 +0000 |
commit | e6d4bdeed5a05f26677f81c02f7a43c84a4a947e (patch) | |
tree | d82d880e797695d48090ffe6423d2759d4413c0b | |
parent | 5e780d3ca232af08079c7bebb6166519dda1906c (diff) |
Try to recover from corrupted NATS streams in memory temporarily (#2301)
-rw-r--r-- | setup/jetstream/nats.go | 43 | ||||
-rw-r--r-- | setup/process/process.go | 17 |
2 files changed, 56 insertions, 4 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 748c191b..328cf915 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -1,11 +1,13 @@ package jetstream import ( + "fmt" "reflect" "strings" "sync" "time" + "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" @@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { - return setupNATS(cfg, nil) + return setupNATS(process, cfg, nil) } natsServerMutex.Lock() if natsServer == nil { @@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient if err != nil { logrus.Fatalln("Failed to create NATS client") } - return setupNATS(cfg, nc) + return setupNATS(process, cfg, nc) } -func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { +func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ",")) @@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream namespaced.Name = name namespaced.Subjects = subjects if _, err = s.AddStream(&namespaced); err != nil { - logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream") + logger := logrus.WithError(err).WithFields(logrus.Fields{ + "stream": namespaced.Name, + "subjects": namespaced.Subjects, + }) + + // If the stream was supposed to be in-memory to begin with + // then an error here is fatal so we'll give up. + if namespaced.Storage == natsclient.MemoryStorage { + logger.WithError(err).Fatal("Unable to add in-memory stream") + } + + // The stream was supposed to be on disk. Let's try starting + // Dendrite with the stream in-memory instead. That'll mean that + // we can't recover anything that was queued on the disk but we + // will still be able to start and run hopefully in the meantime. + logger.WithError(err).Error("Unable to add stream") + sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err)) + + namespaced.Storage = natsclient.MemoryStorage + if _, err = s.AddStream(&namespaced); err != nil { + // We tried to add the stream in-memory instead but something + // went wrong. That's an unrecoverable situation so we will + // give up at this point. + logger.WithError(err).Fatal("Unable to add in-memory stream") + } + + if stream.Storage != namespaced.Storage { + // We've managed to add the stream in memory. What's on the + // disk will be left alone, but our ability to recover from a + // future crash will be limited. Yell about it. + sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name)) + logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible") + process.Degraded() + } } } } diff --git a/setup/process/process.go b/setup/process/process.go index d55751d7..01eb26e2 100644 --- a/setup/process/process.go +++ b/setup/process/process.go @@ -2,13 +2,19 @@ package process import ( "context" + "fmt" "sync" + + "github.com/getsentry/sentry-go" + "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type ProcessContext struct { wg *sync.WaitGroup // used to wait for components to shutdown ctx context.Context // cancelled when Stop is called shutdown context.CancelFunc // shut down Dendrite + degraded atomic.Bool } func NewProcessContext() *ProcessContext { @@ -43,3 +49,14 @@ func (b *ProcessContext) WaitForShutdown() <-chan struct{} { func (b *ProcessContext) WaitForComponentsToFinish() { b.wg.Wait() } + +func (b *ProcessContext) Degraded() { + if b.degraded.CAS(false, true) { + logrus.Warn("Dendrite is running in a degraded state") + sentry.CaptureException(fmt.Errorf("Process is running in a degraded state")) + } +} + +func (b *ProcessContext) IsDegraded() bool { + return b.degraded.Load() +} |