diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-10-15 13:27:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-15 13:27:13 +0100 |
commit | 49abe359e6a2b0c3f214190b73404c5cf9a0e051 (patch) | |
tree | ade4613526d0f6a306cd7117c8f77ab30b151ea0 /internal | |
parent | 10f1beb0de7a52ccdd122b05b4adffdbdab4ea2e (diff) |
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one
* Fix roomserver unit tests
* Rename to naffkaInstance (@Kegsay review comment)
* Fix import cycle
Diffstat (limited to 'internal')
-rw-r--r-- | internal/setup/base.go | 46 | ||||
-rw-r--r-- | internal/setup/kafka/kafka.go | 53 | ||||
-rw-r--r-- | internal/setup/monolith.go | 17 |
3 files changed, 62 insertions, 54 deletions
diff --git a/internal/setup/base.go b/internal/setup/base.go index 24a0d6aa..8bc4ae17 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -26,13 +26,9 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -73,8 +69,8 @@ type BaseDendrite struct { httpClient *http.Client Cfg *config.Dendrite Caches *caching.Caches - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + // KafkaConsumer sarama.Consumer + // KafkaProducer sarama.SyncProducer } const HTTPServerTimeout = time.Minute * 5 @@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo logrus.WithError(err).Panicf("failed to start opentracing") } - var kafkaConsumer sarama.Consumer - var kafkaProducer sarama.SyncProducer - if cfg.Global.Kafka.UseNaffka { - kafkaConsumer, kafkaProducer = setupNaffka(cfg) - } else { - kafkaConsumer, kafkaProducer = setupKafka(cfg) - } - cache, err := caching.NewInMemoryLRUCache(true) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") @@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, httpClient: &client, - KafkaConsumer: kafkaConsumer, - KafkaProducer: kafkaProducer, } } @@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP( select {} } - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naff, err := naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naff, naff -} diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go new file mode 100644 index 00000000..9855ae15 --- /dev/null +++ b/internal/setup/kafka/kafka.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index a0675d61..9d3625d2 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -15,7 +15,6 @@ package setup import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" @@ -38,13 +37,11 @@ import ( // Monolith represents an instantiation of all dependencies required to build // all components of Dendrite, for use in monolith mode. type Monolith struct { - Config *config.Dendrite - AccountDB accounts.Database - KeyRing *gomatrixserverlib.KeyRing - Client *gomatrixserverlib.Client - FedClient *gomatrixserverlib.FederationClient - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + Config *config.Dendrite + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + Client *gomatrixserverlib.Client + FedClient *gomatrixserverlib.FederationClient AppserviceAPI appserviceAPI.AppServiceQueryAPI EDUInternalAPI eduServerAPI.EDUServerInputAPI @@ -61,7 +58,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB, + csMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, @@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + csMux, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, &m.Config.SyncAPI, ) } |