diff options
Diffstat (limited to 'internal/setup/kafka/kafka.go')
-rw-r--r-- | internal/setup/kafka/kafka.go | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go new file mode 100644 index 00000000..9855ae15 --- /dev/null +++ b/internal/setup/kafka/kafka.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} |