aboutsummaryrefslogtreecommitdiff
path: root/internal/config/config_kafka.go
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2020-10-27 15:11:37 +0100
committerGitHub <noreply@github.com>2020-10-27 14:11:37 +0000
commitd5675feb96b7e3996ca2768fddb9f534dc268816 (patch)
treef2b458c6bbe9a89d3e28474ffefa38614cdb7bfa /internal/config/config_kafka.go
parentc5888bb64cc4b393bccf0fa79d667680144902d0 (diff)
Add possibilty to configure MaxMessageBytes for sarama (#1563)
* Add configuration for max_message_bytes for sarama * Log all errors when sending multiple messages Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Add missing config * - Better comments on what MaxMessageBytes is used for - Also sets the size the consumer may use
Diffstat (limited to 'internal/config/config_kafka.go')
-rw-r--r--internal/config/config_kafka.go7
1 files changed, 7 insertions, 0 deletions
diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go
index e2bd6538..707c92a7 100644
--- a/internal/config/config_kafka.go
+++ b/internal/config/config_kafka.go
@@ -24,6 +24,9 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
+ // The max size a Kafka message passed between consumer/producer can have
+ // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
+ MaxMessageBytes *int `yaml:"max_message_bytes"`
}
func (k *Kafka) TopicFor(name string) string {
@@ -36,6 +39,9 @@ func (c *Kafka) Defaults() {
c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite"
+
+ maxBytes := 1024 * 1024 * 8 // about 8MB
+ c.MaxMessageBytes = &maxBytes
}
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
@@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
}
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
+ checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
}