aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-15 13:27:13 +0100
committerGitHub <noreply@github.com>2020-10-15 13:27:13 +0100
commit49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch)
treeade4613526d0f6a306cd7117c8f77ab30b151ea0
parent10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff)
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one * Fix roomserver unit tests * Rename to naffkaInstance (@Kegsay review comment) * Fix import cycle
-rw-r--r--appservice/appservice.go5
-rw-r--r--build/docker/docker-compose.deps.yml2
-rw-r--r--build/gobind/monolith.go14
-rw-r--r--clientapi/clientapi.go5
-rw-r--r--cmd/dendrite-client-api-server/main.go2
-rw-r--r--cmd/dendrite-demo-libp2p/main.go14
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go14
-rw-r--r--cmd/dendrite-key-server/main.go2
-rw-r--r--cmd/dendrite-monolith-server/main.go14
-rw-r--r--cmd/dendrite-sync-api-server/main.go3
-rw-r--r--cmd/dendritejs/main.go14
-rw-r--r--eduserver/eduserver.go6
-rw-r--r--federationsender/federationsender.go9
-rw-r--r--internal/setup/base.go46
-rw-r--r--internal/setup/kafka/kafka.go53
-rw-r--r--internal/setup/monolith.go17
-rw-r--r--keyserver/keyserver.go6
-rw-r--r--roomserver/roomserver.go5
-rw-r--r--roomserver/roomserver_test.go22
-rw-r--r--syncapi/syncapi.go5
20 files changed, 143 insertions, 115 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index e356f68e..cf9a47b7 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
@@ -47,6 +48,8 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceQueryAPI {
+ consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
+
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
if err != nil {
@@ -86,7 +89,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, base.KafkaConsumer, appserviceDB,
+ base.Cfg, consumer, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {
diff --git a/build/docker/docker-compose.deps.yml b/build/docker/docker-compose.deps.yml
index afc572d0..74e478a8 100644
--- a/build/docker/docker-compose.deps.yml
+++ b/build/docker/docker-compose.deps.yml
@@ -29,6 +29,8 @@ services:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
+ ports:
+ - 9092:9092
depends_on:
- zookeeper
networks:
diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go
index 7d10b87e..fd010809 100644
--- a/build/gobind/monolith.go
+++ b/build/gobind/monolith.go
@@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
- keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
+ keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
@@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() {
rsAPI.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{
- Config: base.Cfg,
- AccountDB: accountDB,
- Client: ygg.CreateClient(base),
- FedClient: federation,
- KeyRing: keyRing,
- KafkaConsumer: base.KafkaConsumer,
- KafkaProducer: base.KafkaProducer,
+ Config: base.Cfg,
+ AccountDB: accountDB,
+ Client: ygg.CreateClient(base),
+ FedClient: federation,
+ KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 2ab92ed4..ebe55aec 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -15,7 +15,6 @@
package clientapi
import (
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/api"
@@ -24,6 +23,7 @@ import (
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@@ -36,7 +36,6 @@ import (
func AddPublicRoutes(
router *mux.Router,
cfg *config.ClientAPI,
- producer sarama.SyncProducer,
accountsDB accounts.Database,
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
@@ -48,6 +47,8 @@ func AddPublicRoutes(
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
+ _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go
index 0fdc6679..0061de74 100644
--- a/cmd/dendrite-client-api-server/main.go
+++ b/cmd/dendrite-client-api-server/main.go
@@ -37,7 +37,7 @@ func main() {
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
- base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation,
+ base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
)
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index b5386325..61fdd801 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -139,7 +139,7 @@ func main() {
accountDB := base.Base.CreateAccountsDB()
federation := createFederationClient(base)
- keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer)
+ keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
@@ -169,13 +169,11 @@ func main() {
}
monolith := setup.Monolith{
- Config: base.Base.Cfg,
- AccountDB: accountDB,
- Client: createClient(base),
- FedClient: federation,
- KeyRing: keyRing,
- KafkaConsumer: base.Base.KafkaConsumer,
- KafkaProducer: base.Base.KafkaProducer,
+ Config: base.Base.Cfg,
+ AccountDB: accountDB,
+ Client: createClient(base),
+ FedClient: federation,
+ KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index 5e8b9231..a4097363 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -96,7 +96,7 @@ func main() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
- keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
+ keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
@@ -129,13 +129,11 @@ func main() {
rsComponent.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{
- Config: base.Cfg,
- AccountDB: accountDB,
- Client: ygg.CreateClient(base),
- FedClient: federation,
- KeyRing: keyRing,
- KafkaConsumer: base.KafkaConsumer,
- KafkaProducer: base.KafkaProducer,
+ Config: base.Cfg,
+ AccountDB: accountDB,
+ Client: ygg.CreateClient(base),
+ FedClient: federation,
+ KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go
index 92d18ac3..ff5b2223 100644
--- a/cmd/dendrite-key-server/main.go
+++ b/cmd/dendrite-key-server/main.go
@@ -24,7 +24,7 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck
- intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer)
+ intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient())
intAPI.SetUserAPI(base.UserAPIClient())
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 0fe70ca8..e935805f 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -108,7 +108,7 @@ func main() {
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationSenderAPI(fsAPI)
- keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer)
+ keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
@@ -127,13 +127,11 @@ func main() {
}
monolith := setup.Monolith{
- Config: base.Cfg,
- AccountDB: accountDB,
- Client: base.CreateClient(),
- FedClient: federation,
- KeyRing: keyRing,
- KafkaConsumer: base.KafkaConsumer,
- KafkaProducer: base.KafkaProducer,
+ Config: base.Cfg,
+ AccountDB: accountDB,
+ Client: base.CreateClient(),
+ FedClient: federation,
+ KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go
index b879f842..351dbc5f 100644
--- a/cmd/dendrite-sync-api-server/main.go
+++ b/cmd/dendrite-sync-api-server/main.go
@@ -21,6 +21,7 @@ import (
func main() {
cfg := setup.ParseFlags(false)
+
base := setup.NewBaseDendrite(cfg, "SyncAPI", true)
defer base.Close() // nolint: errcheck
@@ -30,7 +31,7 @@ func main() {
rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(
- base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI,
+ base.PublicClientAPIMux, userAPI, rsAPI,
base.KeyServerHTTPClient(),
federation, &cfg.SyncAPI,
)
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 2d7f8b02..85cc8a9f 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -190,7 +190,7 @@ func main() {
accountDB := base.CreateAccountsDB()
federation := createFederationClient(cfg, node)
- keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
+ keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
@@ -212,13 +212,11 @@ func main() {
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
monolith := setup.Monolith{
- Config: base.Cfg,
- AccountDB: accountDB,
- Client: createClient(node),
- FedClient: federation,
- KeyRing: &keyRing,
- KafkaConsumer: base.KafkaConsumer,
- KafkaProducer: base.KafkaProducer,
+ Config: base.Cfg,
+ AccountDB: accountDB,
+ Client: createClient(node),
+ FedClient: federation,
+ KeyRing: &keyRing,
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
index b6196c26..098ac024 100644
--- a/eduserver/eduserver.go
+++ b/eduserver/eduserver.go
@@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/inthttp"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
@@ -41,10 +42,13 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI,
) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer
+
+ _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
return &input.EDUServerInputAPI{
Cache: eduCache,
UserAPI: userAPI,
- Producer: base.KafkaProducer,
+ Producer: producer,
OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
ServerName: cfg.Matrix.ServerName,
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index 2f122328..78791140 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal/setup"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@@ -55,6 +56,8 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
+ consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
queues := queue.NewOutgoingQueues(
federationSenderDB, cfg.Matrix.ServerName, federation,
rsAPI, stats,
@@ -66,7 +69,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, base.KafkaConsumer, queues,
+ cfg, consumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@@ -74,13 +77,13 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
- cfg, base.KafkaConsumer, queues, federationSenderDB,
+ cfg, consumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
- &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI,
+ &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
diff --git a/internal/setup/base.go b/internal/setup/base.go
index 24a0d6aa..8bc4ae17 100644
--- a/internal/setup/base.go
+++ b/internal/setup/base.go
@@ -26,13 +26,9 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/matrix-org/naffka"
- naffkaStorage "github.com/matrix-org/naffka/storage"
-
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
@@ -73,8 +69,8 @@ type BaseDendrite struct {
httpClient *http.Client
Cfg *config.Dendrite
Caches *caching.Caches
- KafkaConsumer sarama.Consumer
- KafkaProducer sarama.SyncProducer
+ // KafkaConsumer sarama.Consumer
+ // KafkaProducer sarama.SyncProducer
}
const HTTPServerTimeout = time.Minute * 5
@@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
logrus.WithError(err).Panicf("failed to start opentracing")
}
- var kafkaConsumer sarama.Consumer
- var kafkaProducer sarama.SyncProducer
- if cfg.Global.Kafka.UseNaffka {
- kafkaConsumer, kafkaProducer = setupNaffka(cfg)
- } else {
- kafkaConsumer, kafkaProducer = setupKafka(cfg)
- }
-
cache, err := caching.NewInMemoryLRUCache(true)
if err != nil {
logrus.WithError(err).Warnf("Failed to create cache")
@@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient,
httpClient: &client,
- KafkaConsumer: kafkaConsumer,
- KafkaProducer: kafkaProducer,
}
}
@@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP(
select {}
}
-
-// setupKafka creates kafka consumer/producer pair from the config.
-func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
- consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil)
- if err != nil {
- logrus.WithError(err).Panic("failed to start kafka consumer")
- }
-
- producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil)
- if err != nil {
- logrus.WithError(err).Panic("failed to setup kafka producers")
- }
-
- return consumer, producer
-}
-
-// setupNaffka creates kafka consumer/producer pair from the config.
-func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
- naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka database")
- }
- naff, err := naffka.New(naffkaDB)
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka")
- }
- return naff, naff
-}
diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go
new file mode 100644
index 00000000..9855ae15
--- /dev/null
+++ b/internal/setup/kafka/kafka.go
@@ -0,0 +1,53 @@
+package kafka
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/naffka"
+ naffkaStorage "github.com/matrix-org/naffka/storage"
+ "github.com/sirupsen/logrus"
+)
+
+func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if cfg.UseNaffka {
+ return setupNaffka(cfg)
+ }
+ return setupKafka(cfg)
+}
+
+// setupKafka creates kafka consumer/producer pair from the config.
+func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to start kafka consumer")
+ }
+
+ producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to setup kafka producers")
+ }
+
+ return consumer, producer
+}
+
+// In monolith mode with Naffka, we don't have the same constraints about
+// consuming the same topic from more than one place like we do with Kafka.
+// Therefore, we will only open one Naffka connection in case Naffka is
+// running on SQLite.
+var naffkaInstance *naffka.Naffka
+
+// setupNaffka creates kafka consumer/producer pair from the config.
+func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if naffkaInstance != nil {
+ return naffkaInstance, naffkaInstance
+ }
+ naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ naffkaInstance, err = naffka.New(naffkaDB)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka")
+ }
+ return naffkaInstance, naffkaInstance
+}
diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go
index a0675d61..9d3625d2 100644
--- a/internal/setup/monolith.go
+++ b/internal/setup/monolith.go
@@ -15,7 +15,6 @@
package setup
import (
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi"
@@ -38,13 +37,11 @@ import (
// Monolith represents an instantiation of all dependencies required to build
// all components of Dendrite, for use in monolith mode.
type Monolith struct {
- Config *config.Dendrite
- AccountDB accounts.Database
- KeyRing *gomatrixserverlib.KeyRing
- Client *gomatrixserverlib.Client
- FedClient *gomatrixserverlib.FederationClient
- KafkaConsumer sarama.Consumer
- KafkaProducer sarama.SyncProducer
+ Config *config.Dendrite
+ AccountDB accounts.Database
+ KeyRing *gomatrixserverlib.KeyRing
+ Client *gomatrixserverlib.Client
+ FedClient *gomatrixserverlib.FederationClient
AppserviceAPI appserviceAPI.AppServiceQueryAPI
EDUInternalAPI eduServerAPI.EDUServerInputAPI
@@ -61,7 +58,7 @@ type Monolith struct {
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
clientapi.AddPublicRoutes(
- csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
+ csMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
@@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
- csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
+ csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 78420db1..6c54d2a0 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -15,10 +15,10 @@
package keyserver
import (
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
@@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
- cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer,
+ cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
+ _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index 4c138116..b2cc0728 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
"github.com/matrix-org/dendrite/roomserver/internal"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/sirupsen/logrus"
@@ -41,6 +42,8 @@ func NewInternalAPI(
) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer
+ _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
@@ -52,7 +55,7 @@ func NewInternalAPI(
}
return internal.NewRoomserverAPI(
- cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
+ cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
base.Caches, keyRing, perspectiveServerNames,
)
}
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index 2a03195c..1b692a09 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -17,7 +17,10 @@ import (
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/internal/test"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal"
+ "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
)
const (
@@ -160,7 +163,9 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
cfg.Defaults()
cfg.Global.ServerName = testOrigin
cfg.Global.Kafka.UseNaffka = true
- cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI)
+ cfg.RoomServer.Database = config.DatabaseOptions{
+ ConnectionString: roomserverDBFileURI,
+ }
dp := &dummyProducer{
topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
}
@@ -169,12 +174,17 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
t.Fatalf("failed to make caches: %s", err)
}
base := &setup.BaseDendrite{
- KafkaProducer: dp,
- Caches: cache,
- Cfg: cfg,
+ Caches: cache,
+ Cfg: cfg,
}
-
- return NewInternalAPI(base, &test.NopJSONVerifier{}), dp
+ roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to room server db")
+ }
+ return internal.NewRoomserverAPI(
+ &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)),
+ base.Caches, &test.NopJSONVerifier{}, nil,
+ ), dp
}
func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) {
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 43e2455b..de0bb434 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -17,11 +17,11 @@ package syncapi
import (
"context"
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/internal/setup/kafka"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -37,13 +37,14 @@ import (
// component.
func AddPublicRoutes(
router *mux.Router,
- consumer sarama.Consumer,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
+ consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")