aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
diff options
context:
space:
mode:
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r--setup/jetstream/nats.go19
1 files changed, 7 insertions, 12 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 5d7937b5..77ad2b72 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -5,20 +5,17 @@ import (
"sync"
"time"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/setup/config"
"github.com/sirupsen/logrus"
- saramajs "github.com/S7evinK/saramajetstream"
natsserver "github.com/nats-io/nats-server/v2/server"
- "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
var natsServer *natsserver.Server
var natsServerMutex sync.Mutex
-func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+func Prepare(cfg *config.JetStream) natsclient.JetStreamContext {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
@@ -52,20 +49,20 @@ func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sar
return setupNATS(cfg, nc)
}
-func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext {
if nc == nil {
var err error
- nc, err = nats.Connect(strings.Join(cfg.Addresses, ","))
+ nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
if err != nil {
logrus.WithError(err).Panic("Unable to connect to NATS")
- return nil, nil, nil
+ return nil
}
}
s, err := nc.JetStream()
if err != nil {
logrus.WithError(err).Panic("Unable to get JetStream context")
- return nil, nil, nil
+ return nil
}
for _, stream := range streams { // streams are defined in streams.go
@@ -80,7 +77,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
- stream.Storage = nats.MemoryStorage
+ stream.Storage = natsclient.MemoryStorage
}
// Namespace the streams without modifying the original streams
@@ -93,7 +90,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
}
}
- consumer := saramajs.NewJetStreamConsumer(nc, s, "")
- producer := saramajs.NewJetStreamProducer(nc, s, "")
- return s, consumer, producer
+ return s
}