aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-25 12:24:21 +0000
committerGitHub <noreply@github.com>2022-03-25 12:24:21 +0000
commite6d4bdeed5a05f26677f81c02f7a43c84a4a947e (patch)
treed82d880e797695d48090ffe6423d2759d4413c0b
parent5e780d3ca232af08079c7bebb6166519dda1906c (diff)
Try to recover from corrupted NATS streams in memory temporarily (#2301)
-rw-r--r--setup/jetstream/nats.go43
-rw-r--r--setup/process/process.go17
2 files changed, 56 insertions, 4 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 748c191b..328cf915 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -1,11 +1,13 @@
package jetstream
import (
+ "fmt"
"reflect"
"strings"
"sync"
"time"
+ "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
@@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
- return setupNATS(cfg, nil)
+ return setupNATS(process, cfg, nil)
}
natsServerMutex.Lock()
if natsServer == nil {
@@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
if err != nil {
logrus.Fatalln("Failed to create NATS client")
}
- return setupNATS(cfg, nc)
+ return setupNATS(process, cfg, nc)
}
-func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
+func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
@@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
namespaced.Name = name
namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
- logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
+ logger := logrus.WithError(err).WithFields(logrus.Fields{
+ "stream": namespaced.Name,
+ "subjects": namespaced.Subjects,
+ })
+
+ // If the stream was supposed to be in-memory to begin with
+ // then an error here is fatal so we'll give up.
+ if namespaced.Storage == natsclient.MemoryStorage {
+ logger.WithError(err).Fatal("Unable to add in-memory stream")
+ }
+
+ // The stream was supposed to be on disk. Let's try starting
+ // Dendrite with the stream in-memory instead. That'll mean that
+ // we can't recover anything that was queued on the disk but we
+ // will still be able to start and run hopefully in the meantime.
+ logger.WithError(err).Error("Unable to add stream")
+ sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err))
+
+ namespaced.Storage = natsclient.MemoryStorage
+ if _, err = s.AddStream(&namespaced); err != nil {
+ // We tried to add the stream in-memory instead but something
+ // went wrong. That's an unrecoverable situation so we will
+ // give up at this point.
+ logger.WithError(err).Fatal("Unable to add in-memory stream")
+ }
+
+ if stream.Storage != namespaced.Storage {
+ // We've managed to add the stream in memory. What's on the
+ // disk will be left alone, but our ability to recover from a
+ // future crash will be limited. Yell about it.
+ sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name))
+ logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible")
+ process.Degraded()
+ }
}
}
}
diff --git a/setup/process/process.go b/setup/process/process.go
index d55751d7..01eb26e2 100644
--- a/setup/process/process.go
+++ b/setup/process/process.go
@@ -2,13 +2,19 @@ package process
import (
"context"
+ "fmt"
"sync"
+
+ "github.com/getsentry/sentry-go"
+ "github.com/sirupsen/logrus"
+ "go.uber.org/atomic"
)
type ProcessContext struct {
wg *sync.WaitGroup // used to wait for components to shutdown
ctx context.Context // cancelled when Stop is called
shutdown context.CancelFunc // shut down Dendrite
+ degraded atomic.Bool
}
func NewProcessContext() *ProcessContext {
@@ -43,3 +49,14 @@ func (b *ProcessContext) WaitForShutdown() <-chan struct{} {
func (b *ProcessContext) WaitForComponentsToFinish() {
b.wg.Wait()
}
+
+func (b *ProcessContext) Degraded() {
+ if b.degraded.CAS(false, true) {
+ logrus.Warn("Dendrite is running in a degraded state")
+ sentry.CaptureException(fmt.Errorf("Process is running in a degraded state"))
+ }
+}
+
+func (b *ProcessContext) IsDegraded() bool {
+ return b.degraded.Load()
+}