aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2017-08-16 13:36:41 +0100
committerGitHub <noreply@github.com>2017-08-16 13:36:41 +0100
commitc27d1fdfb40032956ed5d13d767bcf12bfc5a65c (patch)
tree90007b3f79ec509ad3140646f0b11bd86db282d3 /src
parent0d894e3da583d4829534c6852d26890dbf297adb (diff)
Optionally use naffka in the monolithic server (#183)
* dependency injection for the kafka consumers/producers * Optionally use naffka in the monolithic server * remember to call setupKafka() * tweak imports * fix integration tests * Add use_naffka to the example config * Update comment on the listen APIs
Diffstat (limited to 'src')
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go19
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go12
-rw-r--r--src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go12
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go38
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go17
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go88
-rw-r--r--src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go19
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/config.go62
-rw-r--r--src/github.com/matrix-org/dendrite/common/config/config_test.go3
-rw-r--r--src/github.com/matrix-org/dendrite/common/test/config.go1
-rw-r--r--src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go18
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go13
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go17
13 files changed, 189 insertions, 130 deletions
diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go
index 1e232078..314ca42b 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go
@@ -17,12 +17,13 @@ package consumers
import (
"encoding/json"
- log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
+
+ log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
@@ -35,12 +36,12 @@ type OutputRoomEvent struct {
}
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
-func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*OutputRoomEvent, error) {
- kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
- if err != nil {
- return nil, err
- }
- roomServerURL := cfg.RoomServerURL()
+func NewOutputRoomEvent(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ store *accounts.Database,
+ queryAPI api.RoomserverQueryAPI,
+) *OutputRoomEvent {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@@ -50,12 +51,12 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*Output
s := &OutputRoomEvent{
roomServerConsumer: &consumer,
db: store,
- query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
+ query: queryAPI,
serverName: string(cfg.Matrix.ServerName),
}
consumer.ProcessMessage = s.onMessage
- return s, nil
+ return s
}
// Start consuming from room servers
diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go
index 2597089e..dba104f9 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go
@@ -28,18 +28,6 @@ type SyncAPIProducer struct {
Producer sarama.SyncProducer
}
-// NewSyncAPIProducer creates a new SyncAPIProducer
-func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) {
- producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
- if err != nil {
- return nil, err
- }
- return &SyncAPIProducer{
- Topic: topic,
- Producer: producer,
- }, nil
-}
-
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
var m sarama.ProducerMessage
diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go
index f76be0d7..2f2ed756 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go
@@ -34,18 +34,6 @@ type profileUpdate struct {
NewValue string `json:"new_value"` // The attribute's value after the update
}
-// NewUserUpdateProducer creates a new UserUpdateProducer
-func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) {
- producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
- if err != nil {
- return nil, err
- }
- return &UserUpdateProducer{
- Topic: topic,
- Producer: producer,
- }, nil
-}
-
// SendUpdate sends an update using kafka to notify the roomserver of the
// profile update. Returns an error if the update failed to send.
func (p *UserUpdateProducer) SendUpdate(
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go
index 5d195bee..53ebdb93 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go
@@ -33,6 +33,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
)
var (
@@ -50,24 +51,28 @@ func main() {
log.Fatalf("Invalid config file: %s", err)
}
- log.Info("config: ", cfg)
-
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
- userUpdateProducer, err := producers.NewUserUpdateProducer(
- cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
- )
+
+ kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
if err != nil {
- log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ "addresses": cfg.Kafka.Addresses,
+ }).Panic("Failed to setup kafka producers")
}
- syncProducer, err := producers.NewSyncAPIProducer(
- cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData),
- )
- if err != nil {
- log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
+
+ userUpdateProducer := &producers.UserUpdateProducer{
+ Producer: kafkaProducer,
+ Topic: string(cfg.Kafka.Topics.UserUpdates),
+ }
+
+ syncProducer := &producers.SyncAPIProducer{
+ Producer: kafkaProducer,
+ Topic: string(cfg.Kafka.Topics.OutputClientData),
}
federation := gomatrixserverlib.NewFederationClient(
@@ -90,15 +95,20 @@ func main() {
keyRing := gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{
// TODO: Use perspective key fetchers for production.
- &gomatrixserverlib.DirectKeyFetcher{federation.Client},
+ &gomatrixserverlib.DirectKeyFetcher{Client: federation.Client},
},
KeyDatabase: keyDB,
}
- consumer, err := consumers.NewOutputRoomEvent(cfg, accountDB)
+ kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
- log.Panicf("startup: failed to create room server consumer: %s", err)
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ "addresses": cfg.Kafka.Addresses,
+ }).Panic("Failed to setup kafka consumers")
}
+
+ consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI)
if err = consumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer")
}
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go
index 9052c3f8..dfc2dc2f 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go
@@ -25,9 +25,11 @@ import (
"github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
+ "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
)
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
@@ -45,7 +47,15 @@ func main() {
log.Fatalf("Invalid config file: %s", err)
}
- log.Info("config: ", cfg)
+ kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
+ if err != nil {
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ "addresses": cfg.Kafka.Addresses,
+ }).Panic("Failed to setup kafka consumers")
+ }
+
+ queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
if err != nil {
@@ -58,10 +68,7 @@ func main() {
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
- consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db)
- if err != nil {
- log.WithError(err).Panicf("startup: failed to create room server consumer")
- }
+ consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI)
if err = consumer.Start(); err != nil {
log.WithError(err).Panicf("startup: failed to start room server consumer")
}
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go
index f0046d7b..ee86469d 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/naffka"
mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing"
mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage"
@@ -72,7 +73,7 @@ func main() {
if *configPath == "" {
log.Fatal("--config must be supplied")
}
- cfg, err := config.Load(*configPath)
+ cfg, err := config.LoadMonolithic(*configPath)
if err != nil {
log.Fatalf("Invalid config file: %s", err)
}
@@ -80,6 +81,7 @@ func main() {
m := newMonolith(cfg)
m.setupDatabases()
m.setupFederation()
+ m.setupKafka()
m.setupRoomServer()
m.setupProducers()
m.setupNotifiers()
@@ -125,6 +127,9 @@ type monolith struct {
queryAPI *roomserver_query.RoomserverQueryAPI
aliasAPI *roomserver_alias.RoomserverAliasAPI
+ kafkaConsumer sarama.Consumer
+ kafkaProducer sarama.SyncProducer
+
roomServerProducer *producers.RoomserverProducer
userUpdateProducer *producers.UserUpdateProducer
syncProducer *producers.SyncAPIProducer
@@ -182,15 +187,39 @@ func (m *monolith) setupFederation() {
}
}
-func (m *monolith) setupRoomServer() {
- kafkaProducer, err := sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
- if err != nil {
- panic(err)
+func (m *monolith) setupKafka() {
+ var err error
+ if m.cfg.Kafka.UseNaffka {
+ naff, err := naffka.New(&naffka.MemoryDatabase{})
+ if err != nil {
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ }).Panic("Failed to setup naffka")
+ }
+ m.kafkaConsumer = naff
+ m.kafkaProducer = naff
+ } else {
+ m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
+ if err != nil {
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ "addresses": m.cfg.Kafka.Addresses,
+ }).Panic("Failed to setup kafka consumers")
+ }
+ m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
+ if err != nil {
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ "addresses": m.cfg.Kafka.Addresses,
+ }).Panic("Failed to setup kafka producers")
+ }
}
+}
+func (m *monolith) setupRoomServer() {
m.inputAPI = &roomserver_input.RoomserverInputAPI{
DB: m.roomServerDB,
- Producer: kafkaProducer,
+ Producer: m.kafkaProducer,
OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent),
}
@@ -207,19 +236,14 @@ func (m *monolith) setupRoomServer() {
}
func (m *monolith) setupProducers() {
- var err error
m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI)
- m.userUpdateProducer, err = producers.NewUserUpdateProducer(
- m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.UserUpdates),
- )
- if err != nil {
- log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
+ m.userUpdateProducer = &producers.UserUpdateProducer{
+ Producer: m.kafkaProducer,
+ Topic: string(m.cfg.Kafka.Topics.UserUpdates),
}
- m.syncProducer, err = producers.NewSyncAPIProducer(
- m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.OutputClientData),
- )
- if err != nil {
- log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
+ m.syncProducer = &producers.SyncAPIProducer{
+ Producer: m.kafkaProducer,
+ Topic: string(m.cfg.Kafka.Topics.OutputClientData),
}
}
@@ -236,42 +260,34 @@ func (m *monolith) setupNotifiers() {
}
func (m *monolith) setupConsumers() {
- clientAPIConsumer, err := clientapi_consumers.NewOutputRoomEvent(m.cfg, m.accountDB)
- if err != nil {
- log.Panicf("startup: failed to create room server consumer: %s", err)
- }
+ var err error
+
+ clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
+ m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI,
+ )
if err = clientAPIConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer")
}
- syncAPIRoomConsumer, err := syncapi_consumers.NewOutputRoomEvent(
- m.cfg, m.syncAPINotifier, m.syncAPIDB,
+ syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
+ m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
)
- if err != nil {
- log.Panicf("startup: failed to create room server consumer: %s", err)
- }
if err = syncAPIRoomConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer: %s", err)
}
- syncAPIClientConsumer, err := syncapi_consumers.NewOutputClientData(
- m.cfg, m.syncAPINotifier, m.syncAPIDB,
+ syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
+ m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB,
)
- if err != nil {
- log.Panicf("startup: failed to create client API server consumer: %s", err)
- }
if err = syncAPIClientConsumer.Start(); err != nil {
log.Panicf("startup: failed to start client API server consumer: %s", err)
}
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
- federationSenderRoomConsumer, err := federationsender_consumers.NewOutputRoomEvent(
- m.cfg, federationSenderQueues, m.federationSenderDB,
+ federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
+ m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI,
)
- if err != nil {
- log.WithError(err).Panicf("startup: failed to create room server consumer")
- }
if err = federationSenderRoomConsumer.Start(); err != nil {
log.WithError(err).Panicf("startup: failed to start room server consumer")
}
diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go
index 7db4d4ca..7e9e4c12 100644
--- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go
@@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
@@ -31,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/Sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
)
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
@@ -48,7 +50,7 @@ func main() {
log.Fatalf("Invalid config file: %s", err)
}
- log.Info("config: ", cfg)
+ queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI))
if err != nil {
@@ -74,17 +76,20 @@ func main() {
if err = n.Load(db); err != nil {
log.Panicf("startup: failed to set up notifier: %s", err)
}
- roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
+
+ kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
- log.Panicf("startup: failed to create room server consumer: %s", err)
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ "addresses": cfg.Kafka.Addresses,
+ }).Panic("Failed to setup kafka consumers")
}
+
+ roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI)
if err = roomConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer: %s", err)
}
- clientConsumer, err := consumers.NewOutputClientData(cfg, n, db)
- if err != nil {
- log.Panicf("startup: failed to create client API server consumer: %s", err)
- }
+ clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db)
if err = clientConsumer.Start(); err != nil {
log.Panicf("startup: failed to start client API server consumer: %s", err)
}
diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go
index 324561f6..ae0fe62c 100644
--- a/src/github.com/matrix-org/dendrite/common/config/config.go
+++ b/src/github.com/matrix-org/dendrite/common/config/config.go
@@ -94,6 +94,11 @@ type Dendrite struct {
Kafka struct {
// A list of kafka addresses to connect to.
Addresses []string `yaml:"addresses"`
+ // Whether to use naffka instead of kafka.
+ // Naffka can only be used when running dendrite as a single monolithic server.
+ // Kafka can be used both with a monolithic server and when running the
+ // components as separate servers.
+ UseNaffka bool `yaml:"use_naffka,omitempty"`
// The names of the topics to use when reading and writing from kafka.
Topics struct {
// Topic for roomserver/api.OutputRoomEvent events.
@@ -169,7 +174,10 @@ type ThumbnailSize struct {
ResizeMethod string `yaml:"method,omitempty"`
}
-// Load a yaml config file
+// Load a yaml config file for a server run as multiple processes.
+// Checks the config to ensure that it is valid.
+// The checks are different if the server is run as a monolithic process instead
+// of being split into multiple components
func Load(configPath string) (*Dendrite, error) {
configData, err := ioutil.ReadFile(configPath)
if err != nil {
@@ -181,7 +189,27 @@ func Load(configPath string) (*Dendrite, error) {
}
// Pass the current working directory and ioutil.ReadFile so that they can
// be mocked in the tests
- return loadConfig(basePath, configData, ioutil.ReadFile)
+ monolithic := false
+ return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
+}
+
+// LoadMonolithic loads a yaml config file for a server run as a single monolith.
+// Checks the config to ensure that it is valid.
+// The checks are different if the server is run as a monolithic process instead
+// of being split into multiple components
+func LoadMonolithic(configPath string) (*Dendrite, error) {
+ configData, err := ioutil.ReadFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+ basePath, err := filepath.Abs(".")
+ if err != nil {
+ return nil, err
+ }
+ // Pass the current working directory and ioutil.ReadFile so that they can
+ // be mocked in the tests
+ monolithic := true
+ return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
}
// An Error indicates a problem parsing the config.
@@ -194,6 +222,7 @@ func loadConfig(
basePath string,
configData []byte,
readFile func(string) ([]byte, error),
+ monolithic bool,
) (*Dendrite, error) {
var config Dendrite
var err error
@@ -203,7 +232,7 @@ func loadConfig(
config.setDefaults()
- if err = config.check(); err != nil {
+ if err = config.check(monolithic); err != nil {
return nil, err
}
@@ -259,7 +288,7 @@ func (e Error) Error() string {
)
}
-func (config *Dendrite) check() error {
+func (config *Dendrite) check(monolithic bool) error {
var problems []string
if config.Version != Version {
@@ -297,21 +326,32 @@ func (config *Dendrite) check() error {
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width))
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height))
}
-
- checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
+ if config.Kafka.UseNaffka {
+ if !monolithic {
+ problems = append(problems, fmt.Sprintf("naffka can only be used in a monolithic server"))
+ }
+ } else {
+ // If we aren't using naffka then we need to have at least one kafka
+ // server to talk to.
+ checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
+ }
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
+ checkNotEmpty("kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
checkNotEmpty("database.account", string(config.Database.Account))
checkNotEmpty("database.device", string(config.Database.Device))
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
checkNotEmpty("database.media_api", string(config.Database.MediaAPI))
checkNotEmpty("database.sync_api", string(config.Database.SyncAPI))
checkNotEmpty("database.room_server", string(config.Database.RoomServer))
- checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
- checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI))
- checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI))
- checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI))
- checkNotEmpty("listen.room_server", string(config.Listen.RoomServer))
+
+ if !monolithic {
+ checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
+ checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI))
+ checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI))
+ checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI))
+ checkNotEmpty("listen.room_server", string(config.Listen.RoomServer))
+ }
if problems != nil {
return Error{problems}
diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go
index 4275e3d4..24b0dfc1 100644
--- a/src/github.com/matrix-org/dendrite/common/config/config_test.go
+++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go
@@ -25,6 +25,7 @@ func TestLoadConfigRelative(t *testing.T) {
"/my/config/dir/matrix_key.pem": testKey,
"/my/config/dir/tls_cert.pem": testCert,
}.readFile,
+ false,
)
if err != nil {
t.Error("failed to load config:", err)
@@ -42,9 +43,9 @@ media:
kafka:
addresses: ["localhost:9092"]
topics:
- input_room_event: input.room
output_room_event: output.room
output_client_data: output.client
+ user_updates: output.user
database:
media_api: "postgresql:///media_api"
account: "postgresql:///account"
diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go
index a28a08d5..948c60f1 100644
--- a/src/github.com/matrix-org/dendrite/common/test/config.go
+++ b/src/github.com/matrix-org/dendrite/common/test/config.go
@@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
// Make this configurable somehow?
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
+ cfg.Kafka.Topics.UserUpdates = "test.user.output"
// TODO: Use different databases for the different schemas.
// Using the same database for every schema currently works because
diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go
index 7f133d30..4ebc36c7 100644
--- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go
+++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go
@@ -38,13 +38,13 @@ type OutputRoomEvent struct {
}
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
-func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) {
- kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
- if err != nil {
- return nil, err
- }
- roomServerURL := cfg.RoomServerURL()
-
+func NewOutputRoomEvent(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ queues *queue.OutgoingQueues,
+ store *storage.Database,
+ queryAPI api.RoomserverQueryAPI,
+) *OutputRoomEvent {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
@@ -54,11 +54,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, stor
roomServerConsumer: &consumer,
db: store,
queues: queues,
- query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
+ query: queryAPI,
}
consumer.ProcessMessage = s.onMessage
- return s, nil
+ return s
}
// Start consuming from room servers
diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go
index a2a240ff..7cc38b81 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go
@@ -33,11 +33,12 @@ type OutputClientData struct {
}
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
-func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) {
- kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
- if err != nil {
- return nil, err
- }
+func NewOutputClientData(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store *storage.SyncServerDatabase,
+) *OutputClientData {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputClientData),
@@ -51,7 +52,7 @@ func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.
}
consumer.ProcessMessage = s.onMessage
- return s, nil
+ return s
}
// Start consuming from room servers
diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go
index c846705f..37357789 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go
@@ -44,12 +44,13 @@ type prevEventRef struct {
}
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
-func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
- kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
- if err != nil {
- return nil, err
- }
- roomServerURL := cfg.RoomServerURL()
+func NewOutputRoomEvent(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ n *sync.Notifier,
+ store *storage.SyncServerDatabase,
+ queryAPI api.RoomserverQueryAPI,
+) *OutputRoomEvent {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@@ -60,11 +61,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
roomServerConsumer: &consumer,
db: store,
notifier: n,
- query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
+ query: queryAPI,
}
consumer.ProcessMessage = s.onMessage
- return s, nil
+ return s
}
// Start consuming from room servers