aboutsummaryrefslogtreecommitdiff
path: root/internal/config/config_kafka.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/config/config_kafka.go')
-rw-r--r--internal/config/config_kafka.go7
1 files changed, 7 insertions, 0 deletions
diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go
index e2bd6538..707c92a7 100644
--- a/internal/config/config_kafka.go
+++ b/internal/config/config_kafka.go
@@ -24,6 +24,9 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
+ // The max size a Kafka message passed between consumer/producer can have
+ // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
+ MaxMessageBytes *int `yaml:"max_message_bytes"`
}
func (k *Kafka) TopicFor(name string) string {
@@ -36,6 +39,9 @@ func (c *Kafka) Defaults() {
c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite"
+
+ maxBytes := 1024 * 1024 * 8 // about 8MB
+ c.MaxMessageBytes = &maxBytes
}
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
@@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
}
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
+ checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
}