aboutsummaryrefslogtreecommitdiff
path: root/setup
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-04 14:08:13 +0100
committerGitHub <noreply@github.com>2022-02-04 13:08:13 +0000
commit9de7efa0b095f40457f0e348632c77326dcb4a42 (patch)
treec543823b7d4ce25371b975a88bc5270c3a9d7352 /setup
parent532f445c4e31396fc3aa4f52e0e078cd499bc39a (diff)
Remove sarama/saramajetstream dependencies (#2138)
* Remove dependency on saramajetstream & sarama Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Remove internal.ContinualConsumer from federationapi * Remove internal.ContinualConsumer from syncapi * Remove internal.ContinualConsumer from keyserver * Move to new Prepare function * Remove saramajetstream & sarama dependency * Delete unneeded file * Remove duplicate import * Log error instead of silently irgnoring it * Move `OffsetNewest` and `OffsetOldest` into keyserver types, change them to be more sane values * Fix comments Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'setup')
-rw-r--r--setup/jetstream/nats.go19
1 files changed, 7 insertions, 12 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 5d7937b5..77ad2b72 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -5,20 +5,17 @@ import (
"sync"
"time"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/setup/config"
"github.com/sirupsen/logrus"
- saramajs "github.com/S7evinK/saramajetstream"
natsserver "github.com/nats-io/nats-server/v2/server"
- "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
var natsServer *natsserver.Server
var natsServerMutex sync.Mutex
-func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+func Prepare(cfg *config.JetStream) natsclient.JetStreamContext {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
@@ -52,20 +49,20 @@ func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sar
return setupNATS(cfg, nc)
}
-func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext {
if nc == nil {
var err error
- nc, err = nats.Connect(strings.Join(cfg.Addresses, ","))
+ nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
if err != nil {
logrus.WithError(err).Panic("Unable to connect to NATS")
- return nil, nil, nil
+ return nil
}
}
s, err := nc.JetStream()
if err != nil {
logrus.WithError(err).Panic("Unable to get JetStream context")
- return nil, nil, nil
+ return nil
}
for _, stream := range streams { // streams are defined in streams.go
@@ -80,7 +77,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
- stream.Storage = nats.MemoryStorage
+ stream.Storage = natsclient.MemoryStorage
}
// Namespace the streams without modifying the original streams
@@ -93,7 +90,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
}
}
- consumer := saramajs.NewJetStreamConsumer(nc, s, "")
- producer := saramajs.NewJetStreamProducer(nc, s, "")
- return s, consumer, producer
+ return s
}