aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorBrian Meek <brian@hntlabs.com>2022-08-05 01:19:33 -0700
committerGitHub <noreply@github.com>2022-08-05 09:19:33 +0100
commitde78eab63a99653edf68f783e263688ad4b701d8 (patch)
treedbc9170d76689f97bb684745e253c01248630d1e /setup/jetstream
parent9a655cb5e7f8b96a0b02203e69d866dfb1a184e2 (diff)
Add race testing to tests, and fix a few small race conditions in the tests (#2587)
* Add race testing to tests, and fix a few small race conditions in the tests * Enable run-sytest on MacOS * Remove deadlock detecting mutex, per code review feedback * Remove autoformatting related changes and a closure that is not needed * Adjust to importing nats client as 'natsclient' Signed-off-by: Brian Meek <brian@hntlabs.com> * Clarify the use of gooseMutex to proect goose internal state Signed-off-by: Brian Meek <brian@hntlabs.com> * Remove no longer needed mutex for guarding goose Signed-off-by: Brian Meek <brian@hntlabs.com>
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/nats.go14
1 files changed, 7 insertions, 7 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index be216a02..051d55a3 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -14,16 +14,16 @@ import (
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
- "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
type NATSInstance struct {
*natsserver.Server
- sync.Mutex
}
-func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
+var natsLock sync.Mutex
+
+func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
for _, stream := range streams { // streams are defined in streams.go
name := cfg.Prefixed(stream.Name)
_ = js.DeleteStream(name)
@@ -31,11 +31,12 @@ func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
}
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
+ natsLock.Lock()
+ defer natsLock.Unlock()
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil)
}
- s.Lock()
if s.Server == nil {
var err error
s.Server, err = natsserver.NewServer(&natsserver.Options{
@@ -63,7 +64,6 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
process.ComponentFinished()
}()
}
- s.Unlock()
if !s.ReadyForConnections(time.Second * 10) {
logrus.Fatalln("NATS did not start in time")
}
@@ -77,9 +77,9 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
- opts := []nats.Option{}
+ opts := []natsclient.Option{}
if cfg.DisableTLSValidation {
- opts = append(opts, nats.Secure(&tls.Config{
+ opts = append(opts, natsclient.Secure(&tls.Config{
InsecureSkipVerify: true,
}))
}