aboutsummaryrefslogtreecommitdiff
path: root/test/jetstream.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/jetstream.go')
-rw-r--r--test/jetstream.go35
1 files changed, 35 insertions, 0 deletions
diff --git a/test/jetstream.go b/test/jetstream.go
new file mode 100644
index 00000000..488c22be
--- /dev/null
+++ b/test/jetstream.go
@@ -0,0 +1,35 @@
+package test
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/base"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/nats-io/nats.go"
+)
+
+func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
+ t.Helper()
+ for _, msg := range msgs {
+ if _, err := jsctx.PublishMsg(msg); err != nil {
+ t.Fatalf("MustPublishMsgs: failed to publish message: %s", err)
+ }
+ }
+}
+
+func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
+ t.Helper()
+ msg := &nats.Msg{
+ Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ Header: nats.Header{},
+ }
+ msg.Header.Set(jetstream.RoomID, roomID)
+ var err error
+ msg.Data, err = json.Marshal(update)
+ if err != nil {
+ t.Fatalf("failed to marshal update: %s", err)
+ }
+ return msg
+}