diff options
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r-- | setup/jetstream/nats.go | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 5d7937b5..77ad2b72 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -5,20 +5,17 @@ import ( "sync" "time" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/setup/config" "github.com/sirupsen/logrus" - saramajs "github.com/S7evinK/saramajetstream" natsserver "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) var natsServer *natsserver.Server var natsServerMutex sync.Mutex -func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) { +func Prepare(cfg *config.JetStream) natsclient.JetStreamContext { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(cfg, nil) @@ -52,20 +49,20 @@ func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sar return setupNATS(cfg, nc) } -func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) { +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext { if nc == nil { var err error - nc, err = nats.Connect(strings.Join(cfg.Addresses, ",")) + nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ",")) if err != nil { logrus.WithError(err).Panic("Unable to connect to NATS") - return nil, nil, nil + return nil } } s, err := nc.JetStream() if err != nil { logrus.WithError(err).Panic("Unable to get JetStream context") - return nil, nil, nil + return nil } for _, stream := range streams { // streams are defined in streams.go @@ -80,7 +77,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { - stream.Storage = nats.MemoryStorage + stream.Storage = natsclient.MemoryStorage } // Namespace the streams without modifying the original streams @@ -93,7 +90,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex } } - consumer := saramajs.NewJetStreamConsumer(nc, s, "") - producer := saramajs.NewJetStreamProducer(nc, s, "") - return s, consumer, producer + return s } |