diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-10 15:18:37 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-10 15:18:37 +0100 |
commit | 52eeeb16279497e24ed6b1e34a7a16fc69b587d1 (patch) | |
tree | cfcf9c482afe230ae251bd1d472cb59d9cc9e906 /internal | |
parent | 4b09f445c992fd0a389efc34d75aaa7e5bd50e9c (diff) |
Prefix-defined Kafka topics (#1254)
* Prefix-defined Kafka topics
* Fix current state server test
Diffstat (limited to 'internal')
-rw-r--r-- | internal/config/config_global.go | 54 | ||||
-rw-r--r-- | internal/config/config_kafka.go | 52 |
2 files changed, 52 insertions, 54 deletions
diff --git a/internal/config/config_global.go b/internal/config/config_global.go index 9456dd3f..785a8033 100644 --- a/internal/config/config_global.go +++ b/internal/config/config_global.go @@ -60,60 +60,6 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { c.Metrics.Verify(configErrs, isMonolith) } -type Kafka struct { - // A list of kafka addresses to connect to. - Addresses []string `yaml:"addresses"` - // Whether to use naffka instead of kafka. - // Naffka can only be used when running dendrite as a single monolithic server. - // Kafka can be used both with a monolithic server and when running the - // components as separate servers. - UseNaffka bool `yaml:"use_naffka"` - // The Naffka database is used internally by the naffka library, if used. - Database DatabaseOptions `yaml:"naffka_database"` - // The names of the topics to use when reading and writing from kafka. - Topics struct { - // Topic for roomserver/api.OutputRoomEvent events. - OutputRoomEvent Topic `yaml:"output_room_event"` - // Topic for sending account data from client API to sync API - OutputClientData Topic `yaml:"output_client_data"` - // Topic for eduserver/api.OutputTypingEvent events. - OutputTypingEvent Topic `yaml:"output_typing_event"` - // Topic for eduserver/api.OutputSendToDeviceEvent events. - OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"` - // Topic for keyserver when new device keys are added. - OutputKeyChangeEvent Topic `yaml:"output_key_change_event"` - } -} - -func (c *Kafka) Defaults() { - c.UseNaffka = true - c.Database.Defaults() - c.Database.ConnectionString = DataSource("file:naffka.db") - c.Topics.OutputRoomEvent = "OutputRoomEventTopic" - c.Topics.OutputClientData = "OutputClientDataTopic" - c.Topics.OutputTypingEvent = "OutputTypingEventTopic" - c.Topics.OutputSendToDeviceEvent = "OutputSendToDeviceEventTopic" - c.Topics.OutputKeyChangeEvent = "OutputKeyChangeEventTopic" -} - -func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { - if c.UseNaffka { - if !isMonolith { - configErrs.Add("naffka can only be used in a monolithic server") - } - checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) - } else { - // If we aren't using naffka then we need to have at least one kafka - // server to talk to. - checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) - } - checkNotEmpty(configErrs, "global.kafka.topics.output_room_event", string(c.Topics.OutputRoomEvent)) - checkNotEmpty(configErrs, "global.kafka.topics.output_client_data", string(c.Topics.OutputClientData)) - checkNotEmpty(configErrs, "global.kafka.topics.output_typing_event", string(c.Topics.OutputTypingEvent)) - checkNotEmpty(configErrs, "global.kafka.topics.output_send_to_device_event", string(c.Topics.OutputSendToDeviceEvent)) - checkNotEmpty(configErrs, "global.kafka.topics.output_key_change_event", string(c.Topics.OutputKeyChangeEvent)) -} - // The configuration to use for Prometheus metrics type Metrics struct { // Whether or not the metrics are enabled diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go new file mode 100644 index 00000000..43a27cf2 --- /dev/null +++ b/internal/config/config_kafka.go @@ -0,0 +1,52 @@ +package config + +import "fmt" + +// Defined Kafka topics. +const ( + TopicOutputTypingEvent = "OutputTypingEvent" + TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" + TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" + TopicOutputRoomEvent = "OutputRoomEvent" + TopicOutputClientData = "OutputClientData" +) + +type Kafka struct { + // A list of kafka addresses to connect to. + Addresses []string `yaml:"addresses"` + // Whether to use naffka instead of kafka. + // Naffka can only be used when running dendrite as a single monolithic server. + // Kafka can be used both with a monolithic server and when running the + // components as separate servers. + UseNaffka bool `yaml:"use_naffka"` + // The Naffka database is used internally by the naffka library, if used. + Database DatabaseOptions `yaml:"naffka_database"` + // The prefix to use for Kafka topic names for this homeserver - really only + // useful if running more than one Dendrite on the same Kafka deployment. + TopicPrefix string `yaml:"topic_prefix"` +} + +func (k *Kafka) TopicFor(name string) string { + return fmt.Sprintf("%s%s", k.TopicPrefix, name) +} + +func (c *Kafka) Defaults() { + c.UseNaffka = true + c.Database.Defaults() + c.Database.ConnectionString = DataSource("file:naffka.db") + c.TopicPrefix = "Dendrite" +} + +func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { + if c.UseNaffka { + if !isMonolith { + configErrs.Add("naffka can only be used in a monolithic server") + } + checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) + } else { + // If we aren't using naffka then we need to have at least one kafka + // server to talk to. + checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) + } + checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) +} |