aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod18
-rw-r--r--go.sum36
-rw-r--r--roomserver/internal/input/input.go92
-rw-r--r--roomserver/internal/input/input_events.go6
-rw-r--r--roomserver/roomserver_test.go53
5 files changed, 158 insertions, 47 deletions
diff --git a/go.mod b/go.mod
index 91d8cd3b..d8b319d1 100644
--- a/go.mod
+++ b/go.mod
@@ -26,8 +26,8 @@ require (
github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7
github.com/matrix-org/util v0.0.0-20221111132719-399730281e66
github.com/mattn/go-sqlite3 v1.14.17
- github.com/nats-io/nats-server/v2 v2.9.23
- github.com/nats-io/nats.go v1.28.0
+ github.com/nats-io/nats-server/v2 v2.10.7
+ github.com/nats-io/nats.go v1.31.0
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/opentracing/opentracing-go v1.2.0
@@ -42,12 +42,12 @@ require (
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.4.6
go.uber.org/atomic v1.10.0
- golang.org/x/crypto v0.14.0
+ golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819
golang.org/x/image v0.10.0
golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e
golang.org/x/sync v0.3.0
- golang.org/x/term v0.13.0
+ golang.org/x/term v0.15.0
gopkg.in/h2non/bimg.v1 v1.1.9
gopkg.in/yaml.v2 v2.4.0
gotest.tools/v3 v3.4.0
@@ -94,7 +94,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
- github.com/klauspost/compress v1.16.7 // indirect
+ github.com/klauspost/compress v1.17.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@@ -104,7 +104,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
- github.com/nats-io/jwt/v2 v2.5.0 // indirect
+ github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
@@ -124,9 +124,9 @@ require (
go.etcd.io/bbolt v1.3.6 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
- golang.org/x/sys v0.13.0 // indirect
- golang.org/x/text v0.13.0 // indirect
- golang.org/x/time v0.3.0 // indirect
+ golang.org/x/sys v0.15.0 // indirect
+ golang.org/x/text v0.14.0 // indirect
+ golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.12.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/macaroon.v2 v2.1.0 // indirect
diff --git a/go.sum b/go.sum
index fba25076..543b8b4a 100644
--- a/go.sum
+++ b/go.sum
@@ -190,8 +190,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
-github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
+github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -242,12 +242,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
-github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak=
-github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
-github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU=
-github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0=
-github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
-github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
+github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
+github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
+github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
+github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
+github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
+github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
-golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
-golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
+golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -422,24 +422,24 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
-golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
+golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
-golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
-golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
+golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
-golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
-golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
-golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
+golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 40475153..20d2cfc7 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
w.Lock()
defer w.Unlock()
if !loaded || w.subscription == nil {
+ streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
+ logger := logrus.WithFields(logrus.Fields{
+ "stream_name": streamName,
+ "consumer": consumer,
+ })
// Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
@@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
- if _, err := w.r.JetStream.AddConsumer(
- r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
- &nats.ConsumerConfig{
- Durable: consumer,
- AckPolicy: nats.AckAllPolicy,
- DeliverPolicy: nats.DeliverAllPolicy,
- FilterSubject: subject,
- AckWait: MaximumMissingProcessingTime + (time.Second * 10),
- InactiveThreshold: inactiveThreshold,
- },
- ); err != nil {
- logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
+
+ info, err := w.r.JetStream.ConsumerInfo(streamName, consumer)
+ if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) {
+ // log and return, we will retry anyway
+ logger.WithError(err).Errorf("failed to get consumer info")
return
}
+ consumerConfig := &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckExplicitPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ InactiveThreshold: inactiveThreshold,
+ }
+
+ // The consumer already exists, try to update if necessary.
+ if info != nil {
+ // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set
+ // e.g. the consumerName, which is added by NATS later. So this would result
+ // in constantly updating/recreating the consumer.
+ switch {
+ case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds():
+ // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update
+ // existing consumers.
+ fallthrough
+ case info.Config.AckPolicy != consumerConfig.AckPolicy:
+ // We've changed the AckPolicy from AckAll to AckExplicit, this needs a
+ // recreation of the consumer. (Note: Only a few changes actually need a recreat)
+ logger.Warn("Consumer already exists, trying to update it.")
+ // Try updating the consumer first
+ if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil {
+ // We failed to update the consumer, recreate it
+ logger.WithError(err).Warn("Unable to update consumer, recreating...")
+ if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil {
+ logger.WithError(err).Fatal("Unable to delete consumer")
+ return
+ }
+ // Set info to nil, so it can be recreated with the correct config.
+ info = nil
+ }
+ }
+ }
+
+ if info == nil {
+ // Create the consumer with the correct config
+ if _, err = w.r.JetStream.AddConsumer(
+ r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
+ consumerConfig,
+ ); err != nil {
+ logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
+ return
+ }
+ }
+
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
@@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
- logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
+ logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return
}
@@ -263,21 +309,23 @@ func (w *worker) _next() {
return
}
+ // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
+
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0]
var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
- _ = msg.Term()
+ // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS
+ _ = msg.Term(nats.AckWait(time.Second * 5))
return
}
if scope := sentry.CurrentHub().Scope(); scope != nil {
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
}
- roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
- defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
@@ -307,10 +355,15 @@ func (w *worker) _next() {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process event")
}
- _ = msg.Term()
+ // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled),
+ // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg
+ // after restarting. We only Ack if the context was not yet canceled.
+ if w.r.ProcessContext.Context().Err() == nil {
+ _ = msg.AckSync()
+ }
errString = err.Error()
} else {
- _ = msg.Ack()
+ _ = msg.AckSync()
}
// If it was a synchronous input request then the "sync" field
@@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents(
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
+
+ // Now that the event is queued, increment the room backpressure
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
return
}
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 520f82a8..1d920843 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -48,8 +48,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
)
-// TODO: Does this value make sense?
-const MaximumMissingProcessingTime = time.Minute * 2
+// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch
+// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded
+// NATS queues the event for redelivery.
+const MaximumMissingProcessingTime = time.Minute * 5
var processRoomEventDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index e9cd926d..88e33571 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -12,7 +12,9 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
@@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) {
assert.Equal(t, false, banned)
})
}
+
+// Validate that changing the AckPolicy/AckWait of room consumers
+// results in their recreation
+func TestRoomConsumerRecreation(t *testing.T) {
+
+ alice := test.NewUser(t)
+ room := test.NewRoom(t, alice)
+
+ // As this is DB unrelated, just use SQLite
+ cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite)
+ defer closeDB()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ natsInstance := &jetstream.NATSInstance{}
+
+ // Prepare a stream and consumer using the old configuration
+ jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+
+ streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent)
+ consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID))
+ subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID))
+
+ consumerConfig := &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: (time.Minute * 2) + (time.Second * 10),
+ InactiveThreshold: time.Hour * 24,
+ }
+
+ // Create the consumer with the old config
+ _, err := jsCtx.AddConsumer(streamName, consumerConfig)
+ assert.NoError(t, err)
+
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ // start JetStream listeners
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ // let the RS create the events, this also recreates the Consumers
+ err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false)
+ assert.NoError(t, err)
+
+ // Validate that AckPolicy and AckWait has changed
+ info, err := jsCtx.ConsumerInfo(streamName, consumer)
+ assert.NoError(t, err)
+ assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy)
+
+ wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10)
+ assert.Equal(t, wantAckWait, info.Config.AckWait)
+}