aboutsummaryrefslogtreecommitdiff
path: root/setup/config
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2022-01-05 18:44:49 +0100
committerGitHub <noreply@github.com>2022-01-05 17:44:49 +0000
commit161f14517669410d3e8207dc41eea5c9695f7e17 (patch)
tree20db8ed83d92c688206242f84880ff2e35a1d5eb /setup/config
parenta47b12dc7d692e0ddd4aaa0801dafc9bb462aad9 (diff)
Add NATS JetStream support (#1866)
* Add NATS JetStream support Update shopify/sarama * Fix addresses * Don't change Addresses in Defaults * Update saramajetstream * Add missing error check Keep typing events for at least one minute * Use all configured NATS addresses * Update saramajetstream * Try setting up with NATS * Make sure NATS uses own persistent directory (TODO: make this configurable) * Update go.mod/go.sum * Jetstream package * Various other refactoring * Build fixes * Config tweaks, make random jetstream storage path for CI * Disable interest policies * Try to sane default on jetstream base path * Try to use in-memory for CI * Restore storage/retention * Update nats.go dependency * Adapt changes to config * Remove unneeded TopicFor * Dep update * Revert "Remove unneeded TopicFor" This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1. * Revert changes made to streams * Fix build problems * Update nats-server * Update go.mod/go.sum * Roomserver input API queuing using NATS * Fix topic naming * Prometheus metrics * More refactoring to remove saramajetstream * Add missing topic * Don't try to populate map that doesn't exist * Roomserver output topic * Update go.mod/go.sum * Message acknowledgements * Ack tweaks * Try to resume transaction re-sends * Try to resume transaction re-sends * Update to matrix-org/gomatrixserverlib@91dadfb * Remove internal.PartitionStorer from components that don't consume keychanges * Try to reduce re-allocations a bit in resolveConflictsV2 * Tweak delivery options on RS input * Publish send-to-device messages into correct JetStream subject * Async and sync roomserver input * Update dendrite-config.yaml * Remove roomserver tests for now (they need rewriting) * Remove roomserver test again (was merged back in) * Update documentation * Docker updates * More Docker updates * Update Docker readme again * Fix lint issues * Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that * Go 1.16 instead of Go 1.13 for upgrade tests and Complement * Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. * Don't report any errors on `/send` to see what fun that creates * Fix panics on closed channel sends * Enforce state key matches sender * Do the same for leave * Various tweaks to make tests happier Squashed commit of the following: commit 13f9028e7a63662759ce7c55504a9d2423058668 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:47:14 2022 +0000 Do the same for leave commit e6be7f05c349fafbdddfe818337a17a60c867be1 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:33:42 2022 +0000 Enforce state key matches sender commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 14:07:04 2022 +0000 Fix panics on closed channel sends commit 9755494a98bed62450f8001d8128e40481d27e15 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:38:22 2022 +0000 Don't report any errors on `/send` to see what fun that creates commit 3bb4f87b5dd56882febb4db5621db484c8789b7c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:00:26 2022 +0000 Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. commit fe2673ed7be9559eaca134424e403a4faca100b0 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 12:09:34 2022 +0000 Go 1.16 instead of Go 1.13 for upgrade tests and Complement commit 368675283fc44501f227639811bdb16dd5deef8c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 11:51:45 2022 +0000 Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that commit b028dfc08577bcf52e6cb498026e15fa5d46d07c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 10:29:08 2022 +0000 Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork * Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers * Fix consumer component name in federation API * Add comment explaining where streams are defined * Tweaks to roomserver input with comments * Finish that sentence that I apparently forgot to finish in INSTALL.md * Bump version number of config to 2 * Add comments around asynchronous sends to roomserver in processEventWithMissingState * More useful error message when the config version does not match * Set version in generate-config * Fix version in config.Defaults Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'setup/config')
-rw-r--r--setup/config/config.go11
-rw-r--r--setup/config/config_global.go8
-rw-r--r--setup/config/config_jetstream.go40
-rw-r--r--setup/config/config_kafka.go63
-rw-r--r--setup/config/config_test.go2
5 files changed, 53 insertions, 71 deletions
diff --git a/setup/config/config.go b/setup/config/config.go
index 404b7178..eb371a54 100644
--- a/setup/config/config.go
+++ b/setup/config/config.go
@@ -40,7 +40,7 @@ var keyIDRegexp = regexp.MustCompile("^ed25519:[a-zA-Z0-9_]+$")
// Version is the current version of the config format.
// This will change whenever we make breaking changes to the config format.
-const Version = 1
+const Version = 2
// Dendrite contains all the config used by a dendrite process.
// Relative paths are resolved relative to the current working directory
@@ -292,7 +292,7 @@ func (config *Dendrite) Derive() error {
// SetDefaults sets default config values if they are not explicitly set.
func (c *Dendrite) Defaults(generate bool) {
- c.Version = 1
+ c.Version = Version
c.Global.Defaults(generate)
c.ClientAPI.Defaults(generate)
@@ -325,6 +325,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
}
func (c *Dendrite) Wiring() {
+ c.Global.JetStream.Matrix = &c.Global
c.ClientAPI.Matrix = &c.Global
c.EDUServer.Matrix = &c.Global
c.FederationAPI.Matrix = &c.Global
@@ -420,7 +421,11 @@ func (config *Dendrite) check(_ bool) error { // monolithic
if config.Version != Version {
configErrs.Add(fmt.Sprintf(
- "unknown config version %q, expected %q", config.Version, Version,
+ "config version is %q, expected %q - this means that the format of the configuration "+
+ "file has changed in some significant way, so please revisit the sample config "+
+ "and ensure you are not missing any important options that may have been added "+
+ "or changed recently!",
+ config.Version, Version,
))
return configErrs
}
diff --git a/setup/config/config_global.go b/setup/config/config_global.go
index 20ee6d37..6f2306a6 100644
--- a/setup/config/config_global.go
+++ b/setup/config/config_global.go
@@ -46,8 +46,8 @@ type Global struct {
// Defaults to an empty array.
TrustedIDServers []string `yaml:"trusted_third_party_id_servers"`
- // Kafka/Naffka configuration
- Kafka Kafka `yaml:"kafka"`
+ // JetStream configuration
+ JetStream JetStream `yaml:"jetstream"`
// Metrics configuration
Metrics Metrics `yaml:"metrics"`
@@ -68,7 +68,7 @@ func (c *Global) Defaults(generate bool) {
}
c.KeyValidityPeriod = time.Hour * 24 * 7
- c.Kafka.Defaults(generate)
+ c.JetStream.Defaults(generate)
c.Metrics.Defaults(generate)
c.DNSCache.Defaults()
c.Sentry.Defaults()
@@ -78,7 +78,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotEmpty(configErrs, "global.server_name", string(c.ServerName))
checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath))
- c.Kafka.Verify(configErrs, isMonolith)
+ c.JetStream.Verify(configErrs, isMonolith)
c.Metrics.Verify(configErrs, isMonolith)
c.Sentry.Verify(configErrs, isMonolith)
c.DNSCache.Verify(configErrs, isMonolith)
diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go
new file mode 100644
index 00000000..0bd84899
--- /dev/null
+++ b/setup/config/config_jetstream.go
@@ -0,0 +1,40 @@
+package config
+
+import (
+ "fmt"
+)
+
+type JetStream struct {
+ Matrix *Global `yaml:"-"`
+
+ // Persistent directory to store JetStream streams in.
+ StoragePath Path `yaml:"storage_path"`
+ // A list of NATS addresses to connect to. If none are specified, an
+ // internal NATS server will be used when running in monolith mode only.
+ Addresses []string `yaml:"addresses"`
+ // The prefix to use for stream names for this homeserver - really only
+ // useful if running more than one Dendrite on the same NATS deployment.
+ TopicPrefix string `yaml:"topic_prefix"`
+ // Keep all storage in memory. This is mostly useful for unit tests.
+ InMemory bool `yaml:"in_memory"`
+}
+
+func (c *JetStream) TopicFor(name string) string {
+ return fmt.Sprintf("%s%s", c.TopicPrefix, name)
+}
+
+func (c *JetStream) Defaults(generate bool) {
+ c.Addresses = []string{}
+ c.TopicPrefix = "Dendrite"
+ if generate {
+ c.StoragePath = Path("./")
+ }
+}
+
+func (c *JetStream) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ // If we are running in a polylith deployment then we need at least
+ // one NATS JetStream server to talk to.
+ if !isMonolith {
+ checkNotZero(configErrs, "global.jetstream.addresses", int64(len(c.Addresses)))
+ }
+}
diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go
deleted file mode 100644
index 5a61f17e..00000000
--- a/setup/config/config_kafka.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package config
-
-import "fmt"
-
-// Defined Kafka topics.
-const (
- TopicOutputTypingEvent = "OutputTypingEvent"
- TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent"
- TopicOutputKeyChangeEvent = "OutputKeyChangeEvent"
- TopicOutputRoomEvent = "OutputRoomEvent"
- TopicOutputClientData = "OutputClientData"
- TopicOutputReceiptEvent = "OutputReceiptEvent"
-)
-
-type Kafka struct {
- // A list of kafka addresses to connect to.
- Addresses []string `yaml:"addresses"`
- // The prefix to use for Kafka topic names for this homeserver - really only
- // useful if running more than one Dendrite on the same Kafka deployment.
- TopicPrefix string `yaml:"topic_prefix"`
- // Whether to use naffka instead of kafka.
- // Naffka can only be used when running dendrite as a single monolithic server.
- // Kafka can be used both with a monolithic server and when running the
- // components as separate servers.
- UseNaffka bool `yaml:"use_naffka"`
- // The Naffka database is used internally by the naffka library, if used.
- Database DatabaseOptions `yaml:"naffka_database"`
- // The max size a Kafka message passed between consumer/producer can have
- // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
- MaxMessageBytes *int `yaml:"max_message_bytes"`
-}
-
-func (k *Kafka) TopicFor(name string) string {
- return fmt.Sprintf("%s%s", k.TopicPrefix, name)
-}
-
-func (c *Kafka) Defaults(generate bool) {
- c.UseNaffka = true
- c.Database.Defaults(10)
- if generate {
- c.Addresses = []string{"localhost:2181"}
- c.Database.ConnectionString = DataSource("file:naffka.db")
- }
- c.TopicPrefix = "Dendrite"
-
- maxBytes := 1024 * 1024 * 8 // about 8MB
- c.MaxMessageBytes = &maxBytes
-}
-
-func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
- if c.UseNaffka {
- if !isMonolith {
- configErrs.Add("naffka can only be used in a monolithic server")
- }
- checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString))
- } else {
- // If we aren't using naffka then we need to have at least one kafka
- // server to talk to.
- checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
- }
- checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
- checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
-}
diff --git a/setup/config/config_test.go b/setup/config/config_test.go
index ffe9edab..5aa54929 100644
--- a/setup/config/config_test.go
+++ b/setup/config/config_test.go
@@ -33,7 +33,7 @@ func TestLoadConfigRelative(t *testing.T) {
}
const testConfig = `
-version: 1
+version: 2
global:
server_name: localhost
private_key: matrix_key.pem