aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream/nats.go
blob: 5d7937b5c8fe96d63921d2656563cd2a74261086 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package jetstream

import (
	"strings"
	"sync"
	"time"

	"github.com/Shopify/sarama"
	"github.com/matrix-org/dendrite/setup/config"
	"github.com/sirupsen/logrus"

	saramajs "github.com/S7evinK/saramajetstream"
	natsserver "github.com/nats-io/nats-server/v2/server"
	"github.com/nats-io/nats.go"
	natsclient "github.com/nats-io/nats.go"
)

var natsServer *natsserver.Server
var natsServerMutex sync.Mutex

func Prepare(cfg *config.JetStream) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
	// check if we need an in-process NATS Server
	if len(cfg.Addresses) != 0 {
		return setupNATS(cfg, nil)
	}
	natsServerMutex.Lock()
	if natsServer == nil {
		var err error
		natsServer, err = natsserver.NewServer(&natsserver.Options{
			ServerName:       "monolith",
			DontListen:       true,
			JetStream:        true,
			StoreDir:         string(cfg.StoragePath),
			NoSystemAccount:  true,
			AllowNewAccounts: false,
			MaxPayload:       16 * 1024 * 1024,
		})
		if err != nil {
			panic(err)
		}
		natsServer.ConfigureLogger()
		go natsServer.Start()
	}
	natsServerMutex.Unlock()
	if !natsServer.ReadyForConnections(time.Second * 10) {
		logrus.Fatalln("NATS did not start in time")
	}
	nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer))
	if err != nil {
		logrus.Fatalln("Failed to create NATS client")
	}
	return setupNATS(cfg, nc)
}

func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContext, sarama.Consumer, sarama.SyncProducer) {
	if nc == nil {
		var err error
		nc, err = nats.Connect(strings.Join(cfg.Addresses, ","))
		if err != nil {
			logrus.WithError(err).Panic("Unable to connect to NATS")
			return nil, nil, nil
		}
	}

	s, err := nc.JetStream()
	if err != nil {
		logrus.WithError(err).Panic("Unable to get JetStream context")
		return nil, nil, nil
	}

	for _, stream := range streams { // streams are defined in streams.go
		name := cfg.TopicFor(stream.Name)
		info, err := s.StreamInfo(name)
		if err != nil && err != natsclient.ErrStreamNotFound {
			logrus.WithError(err).Fatal("Unable to get stream info")
		}
		if info == nil {
			stream.Subjects = []string{name}

			// If we're trying to keep everything in memory (e.g. unit tests)
			// then overwrite the storage policy.
			if cfg.InMemory {
				stream.Storage = nats.MemoryStorage
			}

			// Namespace the streams without modifying the original streams
			// array, otherwise we end up with namespaces on namespaces.
			namespaced := *stream
			namespaced.Name = name
			if _, err = s.AddStream(&namespaced); err != nil {
				logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
			}
		}
	}

	consumer := saramajs.NewJetStreamConsumer(nc, s, "")
	producer := saramajs.NewJetStreamProducer(nc, s, "")
	return s, consumer, producer
}