aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2020-10-27 15:11:37 +0100
committerGitHub <noreply@github.com>2020-10-27 14:11:37 +0000
commitd5675feb96b7e3996ca2768fddb9f534dc268816 (patch)
treef2b458c6bbe9a89d3e28474ffefa38614cdb7bfa /internal
parentc5888bb64cc4b393bccf0fa79d667680144902d0 (diff)
Add possibilty to configure MaxMessageBytes for sarama (#1563)
* Add configuration for max_message_bytes for sarama * Log all errors when sending multiple messages Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Add missing config * - Better comments on what MaxMessageBytes is used for - Also sets the size the consumer may use
Diffstat (limited to 'internal')
-rw-r--r--internal/config/config_kafka.go7
-rw-r--r--internal/setup/kafka/kafka.go9
2 files changed, 14 insertions, 2 deletions
diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go
index e2bd6538..707c92a7 100644
--- a/internal/config/config_kafka.go
+++ b/internal/config/config_kafka.go
@@ -24,6 +24,9 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
+ // The max size a Kafka message passed between consumer/producer can have
+ // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
+ MaxMessageBytes *int `yaml:"max_message_bytes"`
}
func (k *Kafka) TopicFor(name string) string {
@@ -36,6 +39,9 @@ func (c *Kafka) Defaults() {
c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite"
+
+ maxBytes := 1024 * 1024 * 8 // about 8MB
+ c.MaxMessageBytes = &maxBytes
}
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
@@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
}
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
+ checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
}
diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go
index 9855ae15..091025ec 100644
--- a/internal/setup/kafka/kafka.go
+++ b/internal/setup/kafka/kafka.go
@@ -17,12 +17,17 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu
// setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
+ sCfg := sarama.NewConfig()
+ sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
+ sCfg.Producer.Return.Successes = true
+ sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
+
+ consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer")
}
- producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
+ producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers")
}