aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-05-09 14:15:24 +0100
committerGitHub <noreply@github.com>2022-05-09 14:15:24 +0100
commit09d754cfbf9268044d0f59fbe509640b8d71e011 (patch)
tree23922c3b718c3317651fcd95da85f6f0765662f4
parent79e2fbc66368d8f4754b9fff8005d3e77969fcc4 (diff)
One NATS instance per `BaseDendrite` (#2438)
* One NATS instance per `BaseDendrite` * Fix roomserver
-rw-r--r--appservice/appservice.go3
-rw-r--r--clientapi/clientapi.go2
-rw-r--r--federationapi/federationapi.go4
-rw-r--r--keyserver/keyserver.go2
-rw-r--r--roomserver/internal/input/input_test.go12
-rw-r--r--roomserver/roomserver.go2
-rw-r--r--setup/base/base.go3
-rw-r--r--setup/jetstream/nats.go36
-rw-r--r--syncapi/syncapi.go2
-rw-r--r--test/base.go18
-rw-r--r--userapi/userapi.go2
11 files changed, 49 insertions, 37 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index bd292767..c5ae9ceb 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -32,7 +32,6 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -55,7 +54,7 @@ func NewInternalAPI(
gomatrixserverlib.WithSkipVerify(base.Cfg.AppServiceAPI.DisableTLSValidation),
)
- js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database)
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index c1e86114..f550c29b 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -44,7 +44,7 @@ func AddPublicRoutes(
) {
cfg := &base.Cfg.ClientAPI
mscCfg := &base.Cfg.MSCs
- js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index e52377c9..bec9ac77 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -56,7 +56,7 @@ func AddPublicRoutes(
) {
cfg := &base.Cfg.FederationAPI
mscCfg := &base.Cfg.MSCs
- js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
producer := &producers.SyncAPIProducer{
JetStream: js,
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
@@ -115,7 +115,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
- js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 007a48a5..47d7f57f 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
- js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
db, err := storage.NewDatabase(base, &cfg.Database)
if err != nil {
diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go
index 5d34842b..a95c1355 100644
--- a/roomserver/internal/input/input_test.go
+++ b/roomserver/internal/input/input_test.go
@@ -10,9 +10,9 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
)
@@ -21,11 +21,11 @@ var js nats.JetStreamContext
var jc *nats.Conn
func TestMain(m *testing.M) {
- var pc *process.ProcessContext
- pc, js, jc = jetstream.PrepareForTests()
+ var b *base.BaseDendrite
+ b, js, jc = test.Base(nil)
code := m.Run()
- pc.ShutdownDendrite()
- pc.WaitForComponentsToFinish()
+ b.ShutdownDendrite()
+ b.WaitForComponentsToFinish()
os.Exit(code)
}
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index 46261eb3..1480e894 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -50,7 +50,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
- js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc,
diff --git a/setup/base/base.go b/setup/base/base.go
index ef449cc3..0e7528a0 100644
--- a/setup/base/base.go
+++ b/setup/base/base.go
@@ -41,6 +41,7 @@ import (
"golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/gorilla/mux"
@@ -77,6 +78,7 @@ type BaseDendrite struct {
InternalAPIMux *mux.Router
DendriteAdminMux *mux.Router
SynapseAdminMux *mux.Router
+ NATS *jetstream.NATSInstance
UseHTTPAPIs bool
apiHttpClient *http.Client
Cfg *config.Dendrite
@@ -240,6 +242,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(),
SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(),
+ NATS: &jetstream.NATSInstance{},
apiHttpClient: &apiClient,
Database: db, // set if monolith with global connection pool only
DatabaseWriter: writer, // set if monolith with global connection pool only
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 8d528969..426f02bb 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -13,31 +13,23 @@ import (
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
- "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
-var natsServer *natsserver.Server
-var natsServerMutex sync.Mutex
-
-func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) {
- cfg := &config.Dendrite{}
- cfg.Defaults(true)
- cfg.Global.JetStream.InMemory = true
- pc := process.NewProcessContext()
- js, jc := Prepare(pc, &cfg.Global.JetStream)
- return pc, js, jc
+type NATSInstance struct {
+ *natsserver.Server
+ sync.Mutex
}
-func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
+func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil)
}
- natsServerMutex.Lock()
- if natsServer == nil {
+ s.Lock()
+ if s.Server == nil {
var err error
- natsServer, err = natsserver.NewServer(&natsserver.Options{
+ s.Server, err = natsserver.NewServer(&natsserver.Options{
ServerName: "monolith",
DontListen: true,
JetStream: true,
@@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
if err != nil {
panic(err)
}
- natsServer.ConfigureLogger()
+ s.ConfigureLogger()
go func() {
process.ComponentStarted()
- natsServer.Start()
+ s.Start()
}()
go func() {
<-process.WaitForShutdown()
- natsServer.Shutdown()
- natsServer.WaitForShutdown()
+ s.Shutdown()
+ s.WaitForShutdown()
process.ComponentFinished()
}()
}
- natsServerMutex.Unlock()
- if !natsServer.ReadyForConnections(time.Second * 10) {
+ s.Unlock()
+ if !s.ReadyForConnections(time.Second * 10) {
logrus.Fatalln("NATS did not start in time")
}
- nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer))
+ nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
if err != nil {
logrus.Fatalln("Failed to create NATS client")
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 6da8ce6d..dbc6e240 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -45,7 +45,7 @@ func AddPublicRoutes(
) {
cfg := &base.Cfg.SyncAPI
- js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
if err != nil {
diff --git a/test/base.go b/test/base.go
new file mode 100644
index 00000000..32fc8dc5
--- /dev/null
+++ b/test/base.go
@@ -0,0 +1,18 @@
+package test
+
+import (
+ "github.com/matrix-org/dendrite/setup/base"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/nats-io/nats.go"
+)
+
+func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) {
+ if cfg == nil {
+ cfg = &config.Dendrite{}
+ cfg.Defaults(true)
+ }
+ cfg.Global.JetStream.InMemory = true
+ base := base.NewBaseDendrite(cfg, "Tests")
+ js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream)
+ return base, js, jc
+}
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 03a46807..603b416b 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -47,7 +47,7 @@ func NewInternalAPI(
appServices []config.ApplicationService, keyAPI keyapi.UserKeyAPI,
rsAPI rsapi.UserRoomserverAPI, pgClient pushgateway.Client,
) api.UserInternalAPI {
- js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
+ js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
db, err := storage.NewUserAPIDatabase(
base,