aboutsummaryrefslogtreecommitdiff
path: root/setup
diff options
context:
space:
mode:
Diffstat (limited to 'setup')
-rw-r--r--setup/base/base.go2
-rw-r--r--setup/config/config.go11
-rw-r--r--setup/config/config_global.go8
-rw-r--r--setup/config/config_jetstream.go40
-rw-r--r--setup/config/config_kafka.go63
-rw-r--r--setup/config/config_test.go2
-rw-r--r--setup/jetstream/helpers.go11
-rw-r--r--setup/jetstream/nats.go93
-rw-r--r--setup/jetstream/streams.go61
-rw-r--r--setup/kafka/kafka.go58
-rw-r--r--setup/mscs/msc2836/msc2836.go2
11 files changed, 219 insertions, 132 deletions
diff --git a/setup/base/base.go b/setup/base/base.go
index 06c97117..819fe1ad 100644
--- a/setup/base/base.go
+++ b/setup/base/base.go
@@ -81,8 +81,6 @@ type BaseDendrite struct {
Cfg *config.Dendrite
Caches *caching.Caches
DNSCache *gomatrixserverlib.DNSCache
- // KafkaConsumer sarama.Consumer
- // KafkaProducer sarama.SyncProducer
}
const NoListener = ""
diff --git a/setup/config/config.go b/setup/config/config.go
index 404b7178..eb371a54 100644
--- a/setup/config/config.go
+++ b/setup/config/config.go
@@ -40,7 +40,7 @@ var keyIDRegexp = regexp.MustCompile("^ed25519:[a-zA-Z0-9_]+$")
// Version is the current version of the config format.
// This will change whenever we make breaking changes to the config format.
-const Version = 1
+const Version = 2
// Dendrite contains all the config used by a dendrite process.
// Relative paths are resolved relative to the current working directory
@@ -292,7 +292,7 @@ func (config *Dendrite) Derive() error {
// SetDefaults sets default config values if they are not explicitly set.
func (c *Dendrite) Defaults(generate bool) {
- c.Version = 1
+ c.Version = Version
c.Global.Defaults(generate)
c.ClientAPI.Defaults(generate)
@@ -325,6 +325,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
}
func (c *Dendrite) Wiring() {
+ c.Global.JetStream.Matrix = &c.Global
c.ClientAPI.Matrix = &c.Global
c.EDUServer.Matrix = &c.Global
c.FederationAPI.Matrix = &c.Global
@@ -420,7 +421,11 @@ func (config *Dendrite) check(_ bool) error { // monolithic
if config.Version != Version {
configErrs.Add(fmt.Sprintf(
- "unknown config version %q, expected %q", config.Version, Version,
+ "config version is %q, expected %q - this means that the format of the configuration "+
+ "file has changed in some significant way, so please revisit the sample config "+
+ "and ensure you are not missing any important options that may have been added "+
+ "or changed recently!",
+ config.Version, Version,
))
return configErrs
}
diff --git a/setup/config/config_global.go b/setup/config/config_global.go
index 20ee6d37..6f2306a6 100644
--- a/setup/config/config_global.go
+++ b/setup/config/config_global.go
@@ -46,8 +46,8 @@ type Global struct {
// Defaults to an empty array.
TrustedIDServers []string `yaml:"trusted_third_party_id_servers"`
- // Kafka/Naffka configuration
- Kafka Kafka `yaml:"kafka"`
+ // JetStream configuration
+ JetStream JetStream `yaml:"jetstream"`
// Metrics configuration
Metrics Metrics `yaml:"metrics"`
@@ -68,7 +68,7 @@ func (c *Global) Defaults(generate bool) {
}
c.KeyValidityPeriod = time.Hour * 24 * 7
- c.Kafka.Defaults(generate)
+ c.JetStream.Defaults(generate)
c.Metrics.Defaults(generate)
c.DNSCache.Defaults()
c.Sentry.Defaults()
@@ -78,7 +78,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotEmpty(configErrs, "global.server_name", string(c.ServerName))
checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath))
- c.Kafka.Verify(configErrs, isMonolith)
+ c.JetStream.Verify(configErrs, isMonolith)
c.Metrics.Verify(configErrs, isMonolith)
c.Sentry.Verify(configErrs, isMonolith)
c.DNSCache.Verify(configErrs, isMonolith)
diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go
new file mode 100644
index 00000000..0bd84899
--- /dev/null
+++ b/setup/config/config_jetstream.go
@@ -0,0 +1,40 @@
+package config
+
+import (
+ "fmt"
+)
+
+type JetStream struct {
+ Matrix *Global `yaml:"-"`
+
+ // Persistent directory to store JetStream streams in.
+ StoragePath Path `yaml:"storage_path"`
+ // A list of NATS addresses to connect to. If none are specified, an
+ // internal NATS server will be used when running in monolith mode only.
+ Addresses []string `yaml:"addresses"`
+ // The prefix to use for stream names for this homeserver - really only
+ // useful if running more than one Dendrite on the same NATS deployment.
+ TopicPrefix string `yaml:"topic_prefix"`
+ // Keep all storage in memory. This is mostly useful for unit tests.
+ InMemory bool `yaml:"in_memory"`
+}
+
+func (c *JetStream) TopicFor(name string) string {
+ return fmt.Sprintf("%s%s", c.TopicPrefix, name)
+}
+
+func (c *JetStream) Defaults(generate bool) {
+ c.Addresses = []string{}
+ c.TopicPrefix = "Dendrite"
+ if generate {
+ c.StoragePath = Path("./")
+ }
+}
+
+func (c *JetStream) Verify(configErrs *ConfigErrors, isMonolith bool) {
+ // If we are running in a polylith deployment then we need at least
+ // one NATS JetStream server to talk to.
+ if !isMonolith {
+ checkNotZero(configErrs, "global.jetstream.addresses", int64(len(c.Addresses)))
+ }
+}
diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go
deleted file mode 100644
index 5a61f17e..00000000
--- a/setup/config/config_kafka.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package config
-
-import "fmt"
-
-// Defined Kafka topics.
-const (
- TopicOutputTypingEvent = "OutputTypingEvent"
- TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent"
- TopicOutputKeyChangeEvent = "OutputKeyChangeEvent"
- TopicOutputRoomEvent = "OutputRoomEvent"
- TopicOutputClientData = "OutputClientData"
- TopicOutputReceiptEvent = "OutputReceiptEvent"
-)
-
-type Kafka struct {
- // A list of kafka addresses to connect to.
- Addresses []string `yaml:"addresses"`
- // The prefix to use for Kafka topic names for this homeserver - really only
- // useful if running more than one Dendrite on the same Kafka deployment.
- TopicPrefix string `yaml:"topic_prefix"`
- // Whether to use naffka instead of kafka.
- // Naffka can only be used when running dendrite as a single monolithic server.
- // Kafka can be used both with a monolithic server and when running the
- // components as separate servers.
- UseNaffka bool `yaml:"use_naffka"`
- // The Naffka database is used internally by the naffka library, if used.
- Database DatabaseOptions `yaml:"naffka_database"`
- // The max size a Kafka message passed between consumer/producer can have
- // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
- MaxMessageBytes *int `yaml:"max_message_bytes"`
-}
-
-func (k *Kafka) TopicFor(name string) string {
- return fmt.Sprintf("%s%s", k.TopicPrefix, name)
-}
-
-func (c *Kafka) Defaults(generate bool) {
- c.UseNaffka = true
- c.Database.Defaults(10)
- if generate {
- c.Addresses = []string{"localhost:2181"}
- c.Database.ConnectionString = DataSource("file:naffka.db")
- }
- c.TopicPrefix = "Dendrite"
-
- maxBytes := 1024 * 1024 * 8 // about 8MB
- c.MaxMessageBytes = &maxBytes
-}
-
-func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
- if c.UseNaffka {
- if !isMonolith {
- configErrs.Add("naffka can only be used in a monolithic server")
- }
- checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString))
- } else {
- // If we aren't using naffka then we need to have at least one kafka
- // server to talk to.
- checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
- }
- checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
- checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
-}
diff --git a/setup/config/config_test.go b/setup/config/config_test.go
index ffe9edab..5aa54929 100644
--- a/setup/config/config_test.go
+++ b/setup/config/config_test.go
@@ -33,7 +33,7 @@ func TestLoadConfigRelative(t *testing.T) {
}
const testConfig = `
-version: 1
+version: 2
global:
server_name: localhost
private_key: matrix_key.pem
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go
new file mode 100644
index 00000000..2d563226
--- /dev/null
+++ b/setup/jetstream/helpers.go
@@ -0,0 +1,11 @@
+package jetstream
+
+import "github.com/nats-io/nats.go"
+
+func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) {
+ if f(msg) {
+ _ = msg.Ack()
+ } else {
+ _ = msg.Nak()
+ }
+}
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
new file mode 100644
index 00000000..eec68d82
--- /dev/null
+++ b/setup/jetstream/nats.go
@@ -0,0 +1,93 @@
+package jetstream
+
+import (
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/sirupsen/logrus"
+
+ saramajs "github.com/S7evinK/saramajetstream"
+ natsserver "github.com/nats-io/nats-server/v2/server"
+ "github.com/nats-io/nats.go"
+ natsclient "github.com/nats-io/nats.go"
+)
+
+var natsServer *natsserver.Server
+var natsServerMutex sync.Mutex
+
+func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+ // check if we need an in-process NATS Server
+ if len(cfg.Addresses) != 0 {
+ return setupNATS(cfg, nil)
+ }
+ natsServerMutex.Lock()
+ if natsServer == nil {
+ var err error
+ natsServer, err = natsserver.NewServer(&natsserver.Options{
+ ServerName: "monolith",
+ DontListen: true,
+ JetStream: true,
+ StoreDir: string(cfg.StoragePath),
+ NoSystemAccount: true,
+ AllowNewAccounts: false,
+ })
+ if err != nil {
+ panic(err)
+ }
+ natsServer.ConfigureLogger()
+ go natsServer.Start()
+ }
+ natsServerMutex.Unlock()
+ if !natsServer.ReadyForConnections(time.Second * 10) {
+ logrus.Fatalln("NATS did not start in time")
+ }
+ nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer))
+ if err != nil {
+ logrus.Fatalln("Failed to create NATS client")
+ }
+ return setupNATS(cfg, nc)
+}
+
+func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
+ if nc == nil {
+ var err error
+ nc, err = nats.Connect(strings.Join(cfg.Addresses, ","))
+ if err != nil {
+ logrus.WithError(err).Panic("Unable to connect to NATS")
+ return nil, nil, nil
+ }
+ }
+
+ s, err := nc.JetStream()
+ if err != nil {
+ logrus.WithError(err).Panic("Unable to get JetStream context")
+ return nil, nil, nil
+ }
+
+ for _, stream := range streams { // streams are defined in streams.go
+ name := cfg.TopicFor(stream.Name)
+ info, err := s.StreamInfo(name)
+ if err != nil && err != natsclient.ErrStreamNotFound {
+ logrus.WithError(err).Fatal("Unable to get stream info")
+ }
+ if info == nil {
+ stream.Subjects = []string{name}
+ // If we're trying to keep everything in memory (e.g. unit tests)
+ // then overwrite the storage policy.
+ if cfg.InMemory {
+ stream.Storage = nats.MemoryStorage
+ }
+
+ if _, err = s.AddStream(stream); err != nil {
+ logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
+ }
+ }
+ }
+
+ consumer := saramajs.NewJetStreamConsumer(nc, s, "")
+ producer := saramajs.NewJetStreamProducer(nc, s, "")
+ return s, consumer, producer
+}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
new file mode 100644
index 00000000..0fd31082
--- /dev/null
+++ b/setup/jetstream/streams.go
@@ -0,0 +1,61 @@
+package jetstream
+
+import (
+ "time"
+
+ "github.com/nats-io/nats.go"
+)
+
+const (
+ UserID = "user_id"
+ RoomID = "room_id"
+)
+
+var (
+ InputRoomEvent = "InputRoomEvent"
+ OutputRoomEvent = "OutputRoomEvent"
+ OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
+ OutputKeyChangeEvent = "OutputKeyChangeEvent"
+ OutputTypingEvent = "OutputTypingEvent"
+ OutputClientData = "OutputClientData"
+ OutputReceiptEvent = "OutputReceiptEvent"
+)
+
+var streams = []*nats.StreamConfig{
+ {
+ Name: InputRoomEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.FileStorage,
+ },
+ {
+ Name: OutputRoomEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.FileStorage,
+ },
+ {
+ Name: OutputSendToDeviceEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.FileStorage,
+ },
+ {
+ Name: OutputKeyChangeEvent,
+ Retention: nats.LimitsPolicy,
+ Storage: nats.FileStorage,
+ },
+ {
+ Name: OutputTypingEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.MemoryStorage,
+ MaxAge: time.Second * 60,
+ },
+ {
+ Name: OutputClientData,
+ Retention: nats.InterestPolicy,
+ Storage: nats.FileStorage,
+ },
+ {
+ Name: OutputReceiptEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.FileStorage,
+ },
+}
diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go
deleted file mode 100644
index a2902c96..00000000
--- a/setup/kafka/kafka.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package kafka
-
-import (
- "github.com/Shopify/sarama"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/naffka"
- naffkaStorage "github.com/matrix-org/naffka/storage"
- "github.com/sirupsen/logrus"
-)
-
-func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- if cfg.UseNaffka {
- return setupNaffka(cfg)
- }
- return setupKafka(cfg)
-}
-
-// setupKafka creates kafka consumer/producer pair from the config.
-func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- sCfg := sarama.NewConfig()
- sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
- sCfg.Producer.Return.Successes = true
- sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
-
- consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
- if err != nil {
- logrus.WithError(err).Panic("failed to start kafka consumer")
- }
-
- producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
- if err != nil {
- logrus.WithError(err).Panic("failed to setup kafka producers")
- }
-
- return consumer, producer
-}
-
-// In monolith mode with Naffka, we don't have the same constraints about
-// consuming the same topic from more than one place like we do with Kafka.
-// Therefore, we will only open one Naffka connection in case Naffka is
-// running on SQLite.
-var naffkaInstance *naffka.Naffka
-
-// setupNaffka creates kafka consumer/producer pair from the config.
-func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
- if naffkaInstance != nil {
- return naffkaInstance, naffkaInstance
- }
- naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka database")
- }
- naffkaInstance, err = naffka.New(naffkaDB)
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka")
- }
- return naffkaInstance, naffkaInstance
-}
diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go
index 7e2ecfb9..e048d736 100644
--- a/setup/mscs/msc2836/msc2836.go
+++ b/setup/mscs/msc2836/msc2836.go
@@ -649,7 +649,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event
})
}
// we've got the data by this point so use a background context
- err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
+ err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires, false)
if err != nil {
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
}