diff options
Diffstat (limited to 'setup')
-rw-r--r-- | setup/base/base.go | 2 | ||||
-rw-r--r-- | setup/config/config.go | 11 | ||||
-rw-r--r-- | setup/config/config_global.go | 8 | ||||
-rw-r--r-- | setup/config/config_jetstream.go | 40 | ||||
-rw-r--r-- | setup/config/config_kafka.go | 63 | ||||
-rw-r--r-- | setup/config/config_test.go | 2 | ||||
-rw-r--r-- | setup/jetstream/helpers.go | 11 | ||||
-rw-r--r-- | setup/jetstream/nats.go | 93 | ||||
-rw-r--r-- | setup/jetstream/streams.go | 61 | ||||
-rw-r--r-- | setup/kafka/kafka.go | 58 | ||||
-rw-r--r-- | setup/mscs/msc2836/msc2836.go | 2 |
11 files changed, 219 insertions, 132 deletions
diff --git a/setup/base/base.go b/setup/base/base.go index 06c97117..819fe1ad 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -81,8 +81,6 @@ type BaseDendrite struct { Cfg *config.Dendrite Caches *caching.Caches DNSCache *gomatrixserverlib.DNSCache - // KafkaConsumer sarama.Consumer - // KafkaProducer sarama.SyncProducer } const NoListener = "" diff --git a/setup/config/config.go b/setup/config/config.go index 404b7178..eb371a54 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -40,7 +40,7 @@ var keyIDRegexp = regexp.MustCompile("^ed25519:[a-zA-Z0-9_]+$") // Version is the current version of the config format. // This will change whenever we make breaking changes to the config format. -const Version = 1 +const Version = 2 // Dendrite contains all the config used by a dendrite process. // Relative paths are resolved relative to the current working directory @@ -292,7 +292,7 @@ func (config *Dendrite) Derive() error { // SetDefaults sets default config values if they are not explicitly set. func (c *Dendrite) Defaults(generate bool) { - c.Version = 1 + c.Version = Version c.Global.Defaults(generate) c.ClientAPI.Defaults(generate) @@ -325,6 +325,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { } func (c *Dendrite) Wiring() { + c.Global.JetStream.Matrix = &c.Global c.ClientAPI.Matrix = &c.Global c.EDUServer.Matrix = &c.Global c.FederationAPI.Matrix = &c.Global @@ -420,7 +421,11 @@ func (config *Dendrite) check(_ bool) error { // monolithic if config.Version != Version { configErrs.Add(fmt.Sprintf( - "unknown config version %q, expected %q", config.Version, Version, + "config version is %q, expected %q - this means that the format of the configuration "+ + "file has changed in some significant way, so please revisit the sample config "+ + "and ensure you are not missing any important options that may have been added "+ + "or changed recently!", + config.Version, Version, )) return configErrs } diff --git a/setup/config/config_global.go b/setup/config/config_global.go index 20ee6d37..6f2306a6 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -46,8 +46,8 @@ type Global struct { // Defaults to an empty array. TrustedIDServers []string `yaml:"trusted_third_party_id_servers"` - // Kafka/Naffka configuration - Kafka Kafka `yaml:"kafka"` + // JetStream configuration + JetStream JetStream `yaml:"jetstream"` // Metrics configuration Metrics Metrics `yaml:"metrics"` @@ -68,7 +68,7 @@ func (c *Global) Defaults(generate bool) { } c.KeyValidityPeriod = time.Hour * 24 * 7 - c.Kafka.Defaults(generate) + c.JetStream.Defaults(generate) c.Metrics.Defaults(generate) c.DNSCache.Defaults() c.Sentry.Defaults() @@ -78,7 +78,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotEmpty(configErrs, "global.server_name", string(c.ServerName)) checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath)) - c.Kafka.Verify(configErrs, isMonolith) + c.JetStream.Verify(configErrs, isMonolith) c.Metrics.Verify(configErrs, isMonolith) c.Sentry.Verify(configErrs, isMonolith) c.DNSCache.Verify(configErrs, isMonolith) diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go new file mode 100644 index 00000000..0bd84899 --- /dev/null +++ b/setup/config/config_jetstream.go @@ -0,0 +1,40 @@ +package config + +import ( + "fmt" +) + +type JetStream struct { + Matrix *Global `yaml:"-"` + + // Persistent directory to store JetStream streams in. + StoragePath Path `yaml:"storage_path"` + // A list of NATS addresses to connect to. If none are specified, an + // internal NATS server will be used when running in monolith mode only. + Addresses []string `yaml:"addresses"` + // The prefix to use for stream names for this homeserver - really only + // useful if running more than one Dendrite on the same NATS deployment. + TopicPrefix string `yaml:"topic_prefix"` + // Keep all storage in memory. This is mostly useful for unit tests. + InMemory bool `yaml:"in_memory"` +} + +func (c *JetStream) TopicFor(name string) string { + return fmt.Sprintf("%s%s", c.TopicPrefix, name) +} + +func (c *JetStream) Defaults(generate bool) { + c.Addresses = []string{} + c.TopicPrefix = "Dendrite" + if generate { + c.StoragePath = Path("./") + } +} + +func (c *JetStream) Verify(configErrs *ConfigErrors, isMonolith bool) { + // If we are running in a polylith deployment then we need at least + // one NATS JetStream server to talk to. + if !isMonolith { + checkNotZero(configErrs, "global.jetstream.addresses", int64(len(c.Addresses))) + } +} diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go deleted file mode 100644 index 5a61f17e..00000000 --- a/setup/config/config_kafka.go +++ /dev/null @@ -1,63 +0,0 @@ -package config - -import "fmt" - -// Defined Kafka topics. -const ( - TopicOutputTypingEvent = "OutputTypingEvent" - TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" - TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" - TopicOutputRoomEvent = "OutputRoomEvent" - TopicOutputClientData = "OutputClientData" - TopicOutputReceiptEvent = "OutputReceiptEvent" -) - -type Kafka struct { - // A list of kafka addresses to connect to. - Addresses []string `yaml:"addresses"` - // The prefix to use for Kafka topic names for this homeserver - really only - // useful if running more than one Dendrite on the same Kafka deployment. - TopicPrefix string `yaml:"topic_prefix"` - // Whether to use naffka instead of kafka. - // Naffka can only be used when running dendrite as a single monolithic server. - // Kafka can be used both with a monolithic server and when running the - // components as separate servers. - UseNaffka bool `yaml:"use_naffka"` - // The Naffka database is used internally by the naffka library, if used. - Database DatabaseOptions `yaml:"naffka_database"` - // The max size a Kafka message passed between consumer/producer can have - // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka - MaxMessageBytes *int `yaml:"max_message_bytes"` -} - -func (k *Kafka) TopicFor(name string) string { - return fmt.Sprintf("%s%s", k.TopicPrefix, name) -} - -func (c *Kafka) Defaults(generate bool) { - c.UseNaffka = true - c.Database.Defaults(10) - if generate { - c.Addresses = []string{"localhost:2181"} - c.Database.ConnectionString = DataSource("file:naffka.db") - } - c.TopicPrefix = "Dendrite" - - maxBytes := 1024 * 1024 * 8 // about 8MB - c.MaxMessageBytes = &maxBytes -} - -func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { - if c.UseNaffka { - if !isMonolith { - configErrs.Add("naffka can only be used in a monolithic server") - } - checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) - } else { - // If we aren't using naffka then we need to have at least one kafka - // server to talk to. - checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) - } - checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) - checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) -} diff --git a/setup/config/config_test.go b/setup/config/config_test.go index ffe9edab..5aa54929 100644 --- a/setup/config/config_test.go +++ b/setup/config/config_test.go @@ -33,7 +33,7 @@ func TestLoadConfigRelative(t *testing.T) { } const testConfig = ` -version: 1 +version: 2 global: server_name: localhost private_key: matrix_key.pem diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go new file mode 100644 index 00000000..2d563226 --- /dev/null +++ b/setup/jetstream/helpers.go @@ -0,0 +1,11 @@ +package jetstream + +import "github.com/nats-io/nats.go" + +func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) { + if f(msg) { + _ = msg.Ack() + } else { + _ = msg.Nak() + } +} diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go new file mode 100644 index 00000000..eec68d82 --- /dev/null +++ b/setup/jetstream/nats.go @@ -0,0 +1,93 @@ +package jetstream + +import ( + "strings" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/setup/config" + "github.com/sirupsen/logrus" + + saramajs "github.com/S7evinK/saramajetstream" + natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + natsclient "github.com/nats-io/nats.go" +) + +var natsServer *natsserver.Server +var natsServerMutex sync.Mutex + +func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) { + // check if we need an in-process NATS Server + if len(cfg.Addresses) != 0 { + return setupNATS(cfg, nil) + } + natsServerMutex.Lock() + if natsServer == nil { + var err error + natsServer, err = natsserver.NewServer(&natsserver.Options{ + ServerName: "monolith", + DontListen: true, + JetStream: true, + StoreDir: string(cfg.StoragePath), + NoSystemAccount: true, + AllowNewAccounts: false, + }) + if err != nil { + panic(err) + } + natsServer.ConfigureLogger() + go natsServer.Start() + } + natsServerMutex.Unlock() + if !natsServer.ReadyForConnections(time.Second * 10) { + logrus.Fatalln("NATS did not start in time") + } + nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer)) + if err != nil { + logrus.Fatalln("Failed to create NATS client") + } + return setupNATS(cfg, nc) +} + +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) { + if nc == nil { + var err error + nc, err = nats.Connect(strings.Join(cfg.Addresses, ",")) + if err != nil { + logrus.WithError(err).Panic("Unable to connect to NATS") + return nil, nil, nil + } + } + + s, err := nc.JetStream() + if err != nil { + logrus.WithError(err).Panic("Unable to get JetStream context") + return nil, nil, nil + } + + for _, stream := range streams { // streams are defined in streams.go + name := cfg.TopicFor(stream.Name) + info, err := s.StreamInfo(name) + if err != nil && err != natsclient.ErrStreamNotFound { + logrus.WithError(err).Fatal("Unable to get stream info") + } + if info == nil { + stream.Subjects = []string{name} + // If we're trying to keep everything in memory (e.g. unit tests) + // then overwrite the storage policy. + if cfg.InMemory { + stream.Storage = nats.MemoryStorage + } + + if _, err = s.AddStream(stream); err != nil { + logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") + } + } + } + + consumer := saramajs.NewJetStreamConsumer(nc, s, "") + producer := saramajs.NewJetStreamProducer(nc, s, "") + return s, consumer, producer +} diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go new file mode 100644 index 00000000..0fd31082 --- /dev/null +++ b/setup/jetstream/streams.go @@ -0,0 +1,61 @@ +package jetstream + +import ( + "time" + + "github.com/nats-io/nats.go" +) + +const ( + UserID = "user_id" + RoomID = "room_id" +) + +var ( + InputRoomEvent = "InputRoomEvent" + OutputRoomEvent = "OutputRoomEvent" + OutputSendToDeviceEvent = "OutputSendToDeviceEvent" + OutputKeyChangeEvent = "OutputKeyChangeEvent" + OutputTypingEvent = "OutputTypingEvent" + OutputClientData = "OutputClientData" + OutputReceiptEvent = "OutputReceiptEvent" +) + +var streams = []*nats.StreamConfig{ + { + Name: InputRoomEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputRoomEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputSendToDeviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputKeyChangeEvent, + Retention: nats.LimitsPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputTypingEvent, + Retention: nats.InterestPolicy, + Storage: nats.MemoryStorage, + MaxAge: time.Second * 60, + }, + { + Name: OutputClientData, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputReceiptEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, +} diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go deleted file mode 100644 index a2902c96..00000000 --- a/setup/kafka/kafka.go +++ /dev/null @@ -1,58 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/sirupsen/logrus" -) - -func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if cfg.UseNaffka { - return setupNaffka(cfg) - } - return setupKafka(cfg) -} - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - sCfg := sarama.NewConfig() - sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes - sCfg.Producer.Return.Successes = true - sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes) - - consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// In monolith mode with Naffka, we don't have the same constraints about -// consuming the same topic from more than one place like we do with Kafka. -// Therefore, we will only open one Naffka connection in case Naffka is -// running on SQLite. -var naffkaInstance *naffka.Naffka - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if naffkaInstance != nil { - return naffkaInstance, naffkaInstance - } - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naffkaInstance, err = naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naffkaInstance, naffkaInstance -} diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index 7e2ecfb9..e048d736 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -649,7 +649,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event }) } // we've got the data by this point so use a background context - err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires) + err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires, false) if err != nil { util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver") } |