aboutsummaryrefslogtreecommitdiff
path: root/internal/setup/kafka/kafka.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/setup/kafka/kafka.go')
-rw-r--r--internal/setup/kafka/kafka.go53
1 files changed, 53 insertions, 0 deletions
diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go
new file mode 100644
index 00000000..9855ae15
--- /dev/null
+++ b/internal/setup/kafka/kafka.go
@@ -0,0 +1,53 @@
+package kafka
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/naffka"
+ naffkaStorage "github.com/matrix-org/naffka/storage"
+ "github.com/sirupsen/logrus"
+)
+
+func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if cfg.UseNaffka {
+ return setupNaffka(cfg)
+ }
+ return setupKafka(cfg)
+}
+
+// setupKafka creates kafka consumer/producer pair from the config.
+func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to start kafka consumer")
+ }
+
+ producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to setup kafka producers")
+ }
+
+ return consumer, producer
+}
+
+// In monolith mode with Naffka, we don't have the same constraints about
+// consuming the same topic from more than one place like we do with Kafka.
+// Therefore, we will only open one Naffka connection in case Naffka is
+// running on SQLite.
+var naffkaInstance *naffka.Naffka
+
+// setupNaffka creates kafka consumer/producer pair from the config.
+func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if naffkaInstance != nil {
+ return naffkaInstance, naffkaInstance
+ }
+ naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ naffkaInstance, err = naffka.New(naffkaDB)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka")
+ }
+ return naffkaInstance, naffkaInstance
+}