diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-05-09 14:15:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-09 14:15:24 +0100 |
commit | 09d754cfbf9268044d0f59fbe509640b8d71e011 (patch) | |
tree | 23922c3b718c3317651fcd95da85f6f0765662f4 /setup/jetstream/nats.go | |
parent | 79e2fbc66368d8f4754b9fff8005d3e77969fcc4 (diff) |
One NATS instance per `BaseDendrite` (#2438)
* One NATS instance per `BaseDendrite`
* Fix roomserver
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r-- | setup/jetstream/nats.go | 36 |
1 files changed, 14 insertions, 22 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 8d528969..426f02bb 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -13,31 +13,23 @@ import ( "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) -var natsServer *natsserver.Server -var natsServerMutex sync.Mutex - -func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) { - cfg := &config.Dendrite{} - cfg.Defaults(true) - cfg.Global.JetStream.InMemory = true - pc := process.NewProcessContext() - js, jc := Prepare(pc, &cfg.Global.JetStream) - return pc, js, jc +type NATSInstance struct { + *natsserver.Server + sync.Mutex } -func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { +func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(process, cfg, nil) } - natsServerMutex.Lock() - if natsServer == nil { + s.Lock() + if s.Server == nil { var err error - natsServer, err = natsserver.NewServer(&natsserver.Options{ + s.Server, err = natsserver.NewServer(&natsserver.Options{ ServerName: "monolith", DontListen: true, JetStream: true, @@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient if err != nil { panic(err) } - natsServer.ConfigureLogger() + s.ConfigureLogger() go func() { process.ComponentStarted() - natsServer.Start() + s.Start() }() go func() { <-process.WaitForShutdown() - natsServer.Shutdown() - natsServer.WaitForShutdown() + s.Shutdown() + s.WaitForShutdown() process.ComponentFinished() }() } - natsServerMutex.Unlock() - if !natsServer.ReadyForConnections(time.Second * 10) { + s.Unlock() + if !s.ReadyForConnections(time.Second * 10) { logrus.Fatalln("NATS did not start in time") } - nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer)) + nc, err := natsclient.Connect("", natsclient.InProcessServer(s)) if err != nil { logrus.Fatalln("Failed to create NATS client") } |