diff options
Diffstat (limited to 'roomserver/roomserver_test.go')
-rw-r--r-- | roomserver/roomserver_test.go | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index e9cd926d..88e33571 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -12,7 +12,9 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) { assert.Equal(t, false, banned) }) } + +// Validate that changing the AckPolicy/AckWait of room consumers +// results in their recreation +func TestRoomConsumerRecreation(t *testing.T) { + + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + + // As this is DB unrelated, just use SQLite + cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + // Prepare a stream and consumer using the old configuration + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent) + consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID)) + subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID)) + + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: (time.Minute * 2) + (time.Second * 10), + InactiveThreshold: time.Hour * 24, + } + + // Create the consumer with the old config + _, err := jsCtx.AddConsumer(streamName, consumerConfig) + assert.NoError(t, err) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // start JetStream listeners + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // let the RS create the events, this also recreates the Consumers + err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false) + assert.NoError(t, err) + + // Validate that AckPolicy and AckWait has changed + info, err := jsCtx.ConsumerInfo(streamName, consumer) + assert.NoError(t, err) + assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy) + + wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10) + assert.Equal(t, wantAckWait, info.Config.AckWait) +} |