aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
diff options
context:
space:
mode:
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r--setup/jetstream/nats.go43
1 files changed, 39 insertions, 4 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 748c191b..328cf915 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -1,11 +1,13 @@
package jetstream
import (
+ "fmt"
"reflect"
"strings"
"sync"
"time"
+ "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
@@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
- return setupNATS(cfg, nil)
+ return setupNATS(process, cfg, nil)
}
natsServerMutex.Lock()
if natsServer == nil {
@@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
if err != nil {
logrus.Fatalln("Failed to create NATS client")
}
- return setupNATS(cfg, nc)
+ return setupNATS(process, cfg, nc)
}
-func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
+func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
@@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
namespaced.Name = name
namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
- logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
+ logger := logrus.WithError(err).WithFields(logrus.Fields{
+ "stream": namespaced.Name,
+ "subjects": namespaced.Subjects,
+ })
+
+ // If the stream was supposed to be in-memory to begin with
+ // then an error here is fatal so we'll give up.
+ if namespaced.Storage == natsclient.MemoryStorage {
+ logger.WithError(err).Fatal("Unable to add in-memory stream")
+ }
+
+ // The stream was supposed to be on disk. Let's try starting
+ // Dendrite with the stream in-memory instead. That'll mean that
+ // we can't recover anything that was queued on the disk but we
+ // will still be able to start and run hopefully in the meantime.
+ logger.WithError(err).Error("Unable to add stream")
+ sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err))
+
+ namespaced.Storage = natsclient.MemoryStorage
+ if _, err = s.AddStream(&namespaced); err != nil {
+ // We tried to add the stream in-memory instead but something
+ // went wrong. That's an unrecoverable situation so we will
+ // give up at this point.
+ logger.WithError(err).Fatal("Unable to add in-memory stream")
+ }
+
+ if stream.Storage != namespaced.Storage {
+ // We've managed to add the stream in memory. What's on the
+ // disk will be left alone, but our ability to recover from a
+ // future crash will be limited. Yell about it.
+ sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name))
+ logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible")
+ process.Degraded()
+ }
}
}
}