diff options
author | S7evinK <tfaelligen@gmail.com> | 2020-10-27 15:11:37 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-27 14:11:37 +0000 |
commit | d5675feb96b7e3996ca2768fddb9f534dc268816 (patch) | |
tree | f2b458c6bbe9a89d3e28474ffefa38614cdb7bfa /internal/config/config_kafka.go | |
parent | c5888bb64cc4b393bccf0fa79d667680144902d0 (diff) |
Add possibilty to configure MaxMessageBytes for sarama (#1563)
* Add configuration for max_message_bytes for sarama
* Log all errors when sending multiple messages
Signed-off-by: Till Faelligen <tfaelligen@gmail.com>
* Add missing config
* - Better comments on what MaxMessageBytes is used for
- Also sets the size the consumer may use
Diffstat (limited to 'internal/config/config_kafka.go')
-rw-r--r-- | internal/config/config_kafka.go | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go index e2bd6538..707c92a7 100644 --- a/internal/config/config_kafka.go +++ b/internal/config/config_kafka.go @@ -24,6 +24,9 @@ type Kafka struct { UseNaffka bool `yaml:"use_naffka"` // The Naffka database is used internally by the naffka library, if used. Database DatabaseOptions `yaml:"naffka_database"` + // The max size a Kafka message passed between consumer/producer can have + // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka + MaxMessageBytes *int `yaml:"max_message_bytes"` } func (k *Kafka) TopicFor(name string) string { @@ -36,6 +39,9 @@ func (c *Kafka) Defaults() { c.Addresses = []string{"localhost:2181"} c.Database.ConnectionString = DataSource("file:naffka.db") c.TopicPrefix = "Dendrite" + + maxBytes := 1024 * 1024 * 8 // about 8MB + c.MaxMessageBytes = &maxBytes } func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { @@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) } checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) + checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) } |