aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-05-09 14:15:24 +0100
committerGitHub <noreply@github.com>2022-05-09 14:15:24 +0100
commit09d754cfbf9268044d0f59fbe509640b8d71e011 (patch)
tree23922c3b718c3317651fcd95da85f6f0765662f4 /setup/jetstream/nats.go
parent79e2fbc66368d8f4754b9fff8005d3e77969fcc4 (diff)
One NATS instance per `BaseDendrite` (#2438)
* One NATS instance per `BaseDendrite` * Fix roomserver
Diffstat (limited to 'setup/jetstream/nats.go')
-rw-r--r--setup/jetstream/nats.go36
1 files changed, 14 insertions, 22 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 8d528969..426f02bb 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -13,31 +13,23 @@ import (
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
- "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
-var natsServer *natsserver.Server
-var natsServerMutex sync.Mutex
-
-func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) {
- cfg := &config.Dendrite{}
- cfg.Defaults(true)
- cfg.Global.JetStream.InMemory = true
- pc := process.NewProcessContext()
- js, jc := Prepare(pc, &cfg.Global.JetStream)
- return pc, js, jc
+type NATSInstance struct {
+ *natsserver.Server
+ sync.Mutex
}
-func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
+func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil)
}
- natsServerMutex.Lock()
- if natsServer == nil {
+ s.Lock()
+ if s.Server == nil {
var err error
- natsServer, err = natsserver.NewServer(&natsserver.Options{
+ s.Server, err = natsserver.NewServer(&natsserver.Options{
ServerName: "monolith",
DontListen: true,
JetStream: true,
@@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
if err != nil {
panic(err)
}
- natsServer.ConfigureLogger()
+ s.ConfigureLogger()
go func() {
process.ComponentStarted()
- natsServer.Start()
+ s.Start()
}()
go func() {
<-process.WaitForShutdown()
- natsServer.Shutdown()
- natsServer.WaitForShutdown()
+ s.Shutdown()
+ s.WaitForShutdown()
process.ComponentFinished()
}()
}
- natsServerMutex.Unlock()
- if !natsServer.ReadyForConnections(time.Second * 10) {
+ s.Unlock()
+ if !s.ReadyForConnections(time.Second * 10) {
logrus.Fatalln("NATS did not start in time")
}
- nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer))
+ nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
if err != nil {
logrus.Fatalln("Failed to create NATS client")
}