aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2022-05-09 17:23:02 +0100
committerGitHub <noreply@github.com>2022-05-09 17:23:02 +0100
commit236b16aa6c97bc0894388dce7f6b420ef7a1fd88 (patch)
tree9b9879b068eb45541730da203d75fed0631800d1 /setup/jetstream
parenta443d1e5f3796942f68067741f4bdd482548bfd7 (diff)
Begin adding syncapi component tests (#2442)
* Add very basic syncapi tests * Add a way to inject jetstream messages * implement add_state_ids * bugfixes * Unbreak tests * Remove now un-needed API call * Linting
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/nats.go8
1 files changed, 8 insertions, 0 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 426f02bb..248b0e65 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -13,6 +13,7 @@ import (
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
+ "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
@@ -21,6 +22,13 @@ type NATSInstance struct {
sync.Mutex
}
+func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
+ for _, stream := range streams { // streams are defined in streams.go
+ name := cfg.Prefixed(stream.Name)
+ _ = js.DeleteStream(name)
+ }
+}
+
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {