aboutsummaryrefslogtreecommitdiff
path: root/setup/kafka/kafka.go
diff options
context:
space:
mode:
Diffstat (limited to 'setup/kafka/kafka.go')
-rw-r--r--setup/kafka/kafka.go58
1 files changed, 0 insertions, 58 deletions
diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go
deleted file mode 100644
index a2902c96..00000000
--- a/setup/kafka/kafka.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package kafka
-
-import (
- "github.com/Shopify/sarama"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/naffka"
- naffkaStorage "github.com/matrix-org/naffka/storage"
- "github.com/sirupsen/logrus"
-)
-
-func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- if cfg.UseNaffka {
- return setupNaffka(cfg)
- }
- return setupKafka(cfg)
-}
-
-// setupKafka creates kafka consumer/producer pair from the config.
-func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- sCfg := sarama.NewConfig()
- sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
- sCfg.Producer.Return.Successes = true
- sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
-
- consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
- if err != nil {
- logrus.WithError(err).Panic("failed to start kafka consumer")
- }
-
- producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
- if err != nil {
- logrus.WithError(err).Panic("failed to setup kafka producers")
- }
-
- return consumer, producer
-}
-
-// In monolith mode with Naffka, we don't have the same constraints about
-// consuming the same topic from more than one place like we do with Kafka.
-// Therefore, we will only open one Naffka connection in case Naffka is
-// running on SQLite.
-var naffkaInstance *naffka.Naffka
-
-// setupNaffka creates kafka consumer/producer pair from the config.
-func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- if naffkaInstance != nil {
- return naffkaInstance, naffkaInstance
- }
- naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka database")
- }
- naffkaInstance, err = naffka.New(naffkaDB)
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka")
- }
- return naffkaInstance, naffkaInstance
-}