aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-15 13:27:13 +0100
committerGitHub <noreply@github.com>2020-10-15 13:27:13 +0100
commit49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch)
treeade4613526d0f6a306cd7117c8f77ab30b151ea0 /internal
parent10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff)
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one * Fix roomserver unit tests * Rename to naffkaInstance (@Kegsay review comment) * Fix import cycle
Diffstat (limited to 'internal')
-rw-r--r--internal/setup/base.go46
-rw-r--r--internal/setup/kafka/kafka.go53
-rw-r--r--internal/setup/monolith.go17
3 files changed, 62 insertions, 54 deletions
diff --git a/internal/setup/base.go b/internal/setup/base.go
index 24a0d6aa..8bc4ae17 100644
--- a/internal/setup/base.go
+++ b/internal/setup/base.go
@@ -26,13 +26,9 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/matrix-org/naffka"
- naffkaStorage "github.com/matrix-org/naffka/storage"
-
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
@@ -73,8 +69,8 @@ type BaseDendrite struct {
httpClient *http.Client
Cfg *config.Dendrite
Caches *caching.Caches
- KafkaConsumer sarama.Consumer
- KafkaProducer sarama.SyncProducer
+ // KafkaConsumer sarama.Consumer
+ // KafkaProducer sarama.SyncProducer
}
const HTTPServerTimeout = time.Minute * 5
@@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
logrus.WithError(err).Panicf("failed to start opentracing")
}
- var kafkaConsumer sarama.Consumer
- var kafkaProducer sarama.SyncProducer
- if cfg.Global.Kafka.UseNaffka {
- kafkaConsumer, kafkaProducer = setupNaffka(cfg)
- } else {
- kafkaConsumer, kafkaProducer = setupKafka(cfg)
- }
-
cache, err := caching.NewInMemoryLRUCache(true)
if err != nil {
logrus.WithError(err).Warnf("Failed to create cache")
@@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient,
httpClient: &client,
- KafkaConsumer: kafkaConsumer,
- KafkaProducer: kafkaProducer,
}
}
@@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP(
select {}
}
-
-// setupKafka creates kafka consumer/producer pair from the config.
-func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
- consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil)
- if err != nil {
- logrus.WithError(err).Panic("failed to start kafka consumer")
- }
-
- producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil)
- if err != nil {
- logrus.WithError(err).Panic("failed to setup kafka producers")
- }
-
- return consumer, producer
-}
-
-// setupNaffka creates kafka consumer/producer pair from the config.
-func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
- naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka database")
- }
- naff, err := naffka.New(naffkaDB)
- if err != nil {
- logrus.WithError(err).Panic("Failed to setup naffka")
- }
- return naff, naff
-}
diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go
new file mode 100644
index 00000000..9855ae15
--- /dev/null
+++ b/internal/setup/kafka/kafka.go
@@ -0,0 +1,53 @@
+package kafka
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/naffka"
+ naffkaStorage "github.com/matrix-org/naffka/storage"
+ "github.com/sirupsen/logrus"
+)
+
+func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if cfg.UseNaffka {
+ return setupNaffka(cfg)
+ }
+ return setupKafka(cfg)
+}
+
+// setupKafka creates kafka consumer/producer pair from the config.
+func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to start kafka consumer")
+ }
+
+ producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
+ if err != nil {
+ logrus.WithError(err).Panic("failed to setup kafka producers")
+ }
+
+ return consumer, producer
+}
+
+// In monolith mode with Naffka, we don't have the same constraints about
+// consuming the same topic from more than one place like we do with Kafka.
+// Therefore, we will only open one Naffka connection in case Naffka is
+// running on SQLite.
+var naffkaInstance *naffka.Naffka
+
+// setupNaffka creates kafka consumer/producer pair from the config.
+func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
+ if naffkaInstance != nil {
+ return naffkaInstance, naffkaInstance
+ }
+ naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka database")
+ }
+ naffkaInstance, err = naffka.New(naffkaDB)
+ if err != nil {
+ logrus.WithError(err).Panic("Failed to setup naffka")
+ }
+ return naffkaInstance, naffkaInstance
+}
diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go
index a0675d61..9d3625d2 100644
--- a/internal/setup/monolith.go
+++ b/internal/setup/monolith.go
@@ -15,7 +15,6 @@
package setup
import (
- "github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi"
@@ -38,13 +37,11 @@ import (
// Monolith represents an instantiation of all dependencies required to build
// all components of Dendrite, for use in monolith mode.
type Monolith struct {
- Config *config.Dendrite
- AccountDB accounts.Database
- KeyRing *gomatrixserverlib.KeyRing
- Client *gomatrixserverlib.Client
- FedClient *gomatrixserverlib.FederationClient
- KafkaConsumer sarama.Consumer
- KafkaProducer sarama.SyncProducer
+ Config *config.Dendrite
+ AccountDB accounts.Database
+ KeyRing *gomatrixserverlib.KeyRing
+ Client *gomatrixserverlib.Client
+ FedClient *gomatrixserverlib.FederationClient
AppserviceAPI appserviceAPI.AppServiceQueryAPI
EDUInternalAPI eduServerAPI.EDUServerInputAPI
@@ -61,7 +58,7 @@ type Monolith struct {
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
clientapi.AddPublicRoutes(
- csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
+ csMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
@@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
- csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
+ csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}