aboutsummaryrefslogtreecommitdiff
path: root/roomserver/roomserver_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/roomserver_test.go')
-rw-r--r--roomserver/roomserver_test.go53
1 files changed, 53 insertions, 0 deletions
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index e9cd926d..88e33571 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -12,7 +12,9 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
@@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) {
assert.Equal(t, false, banned)
})
}
+
+// Validate that changing the AckPolicy/AckWait of room consumers
+// results in their recreation
+func TestRoomConsumerRecreation(t *testing.T) {
+
+ alice := test.NewUser(t)
+ room := test.NewRoom(t, alice)
+
+ // As this is DB unrelated, just use SQLite
+ cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite)
+ defer closeDB()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ natsInstance := &jetstream.NATSInstance{}
+
+ // Prepare a stream and consumer using the old configuration
+ jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+
+ streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent)
+ consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID))
+ subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID))
+
+ consumerConfig := &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: (time.Minute * 2) + (time.Second * 10),
+ InactiveThreshold: time.Hour * 24,
+ }
+
+ // Create the consumer with the old config
+ _, err := jsCtx.AddConsumer(streamName, consumerConfig)
+ assert.NoError(t, err)
+
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ // start JetStream listeners
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ // let the RS create the events, this also recreates the Consumers
+ err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false)
+ assert.NoError(t, err)
+
+ // Validate that AckPolicy and AckWait has changed
+ info, err := jsCtx.ConsumerInfo(streamName, consumer)
+ assert.NoError(t, err)
+ assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy)
+
+ wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10)
+ assert.Equal(t, wantAckWait, info.Config.AckWait)
+}