diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2023-12-19 08:25:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-19 08:25:47 +0100 |
commit | f93d1c4790cf2f9b6d5fd838f113e8d5f3da8cb1 (patch) | |
tree | 8746e503c010bdefd0662a6c06f1329e73143fb9 | |
parent | d65449c7822e89b506bf2caa7a098e38970f6f27 (diff) |
Use `AckExplicitPolicy` instead of `AckAllPolicy` (#3288)
Fixes https://github.com/matrix-org/dendrite/issues/3240 and potentially
a root cause for state resets.
While testing, I've had added some more debug logging:
```
time="2023-12-16T18:13:11.319458084Z" level=warning msg="already processed event" event_id="$qFYMl_F2vb1N0yxmvlFAMhqhGhLKq4kA-o_YCQKH7tQ" kind=KindNew times=2
time="2023-12-16T18:13:14.537389126Z" level=warning msg="already processed event" event_id="$EU-LTsKErT6Mt1k12-p_3xOHfiLaK6gtwVDlZ35lSuo" kind=KindNew times=5
time="2023-12-16T18:13:16.789551206Z" level=warning msg="already processed event" event_id="$dIPuAfTL5x0VyG873LKPslQeljCSxFT1WKxUtjIMUGE" kind=KindNew times=5
time="2023-12-16T18:13:17.383838767Z" level=warning msg="already processed event" event_id="$7noSZiCkzerpkz_UBO3iatpRnaOiPx-3IXc0GPDQVGE" kind=KindNew times=2
time="2023-12-16T18:13:22.091946597Z" level=warning msg="already processed event" event_id="$3Lvo3Wbi2ol9-nNbQ93N-E2MuGQCJZo5397KkFH-W6E" kind=KindNew times=1
time="2023-12-16T18:13:23.026417446Z" level=warning msg="already processed event" event_id="$lj1xS46zsLBCChhKOLJEG-bu7z-_pq9i_Y2DUIjzGy4" kind=KindNew times=4
```
So we did receive the same event over and over again. Given they are
`KindNew`, we don't short circuit if we already processed them, which
potentially caused the state to be calculated with a now wrong state
snapshot.
Also fixes the back pressure metric. We now correctly increment the
counter once we sent the message to NATS and decrement it once we
actually processed an event.
-rw-r--r-- | go.mod | 18 | ||||
-rw-r--r-- | go.sum | 36 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 92 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 6 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 53 |
5 files changed, 158 insertions, 47 deletions
@@ -26,8 +26,8 @@ require ( github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.17 - github.com/nats-io/nats-server/v2 v2.9.23 - github.com/nats-io/nats.go v1.28.0 + github.com/nats-io/nats-server/v2 v2.10.7 + github.com/nats-io/nats.go v1.31.0 github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/opentracing/opentracing-go v1.2.0 @@ -42,12 +42,12 @@ require ( github.com/uber/jaeger-lib v2.4.1+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.16.0 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 golang.org/x/image v0.10.0 golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e golang.org/x/sync v0.3.0 - golang.org/x/term v0.13.0 + golang.org/x/term v0.15.0 gopkg.in/h2non/bimg.v1 v1.1.9 gopkg.in/yaml.v2 v2.4.0 gotest.tools/v3 v3.4.0 @@ -94,7 +94,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juju/errors v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -104,7 +104,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.0 // indirect + github.com/nats-io/jwt/v2 v2.5.3 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect @@ -124,9 +124,9 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.12.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/macaroon.v2 v2.1.0 // indirect @@ -190,8 +190,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -242,12 +242,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak= -github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= -github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= +github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= +github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y= +github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -422,24 +422,24 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 40475153..20d2cfc7 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) { w.Lock() defer w.Unlock() if !loaded || w.subscription == nil { + streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) + logger := logrus.WithFields(logrus.Fields{ + "stream_name": streamName, + "consumer": consumer, + }) // Create the consumer. We do this as a specific step rather than // letting PullSubscribe create it for us because we need the consumer // to outlive the subscription. If we do it this way, we can Bind in the @@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // before it. This is necessary because otherwise our consumer will never // acknowledge things we filtered out for other subjects and therefore they // will linger around forever. - if _, err := w.r.JetStream.AddConsumer( - r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - &nats.ConsumerConfig{ - Durable: consumer, - AckPolicy: nats.AckAllPolicy, - DeliverPolicy: nats.DeliverAllPolicy, - FilterSubject: subject, - AckWait: MaximumMissingProcessingTime + (time.Second * 10), - InactiveThreshold: inactiveThreshold, - }, - ); err != nil { - logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + + info, err := w.r.JetStream.ConsumerInfo(streamName, consumer) + if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) { + // log and return, we will retry anyway + logger.WithError(err).Errorf("failed to get consumer info") return } + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + InactiveThreshold: inactiveThreshold, + } + + // The consumer already exists, try to update if necessary. + if info != nil { + // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set + // e.g. the consumerName, which is added by NATS later. So this would result + // in constantly updating/recreating the consumer. + switch { + case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): + // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update + // existing consumers. + fallthrough + case info.Config.AckPolicy != consumerConfig.AckPolicy: + // We've changed the AckPolicy from AckAll to AckExplicit, this needs a + // recreation of the consumer. (Note: Only a few changes actually need a recreat) + logger.Warn("Consumer already exists, trying to update it.") + // Try updating the consumer first + if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil { + // We failed to update the consumer, recreate it + logger.WithError(err).Warn("Unable to update consumer, recreating...") + if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil { + logger.WithError(err).Fatal("Unable to delete consumer") + return + } + // Set info to nil, so it can be recreated with the correct config. + info = nil + } + } + } + + if info == nil { + // Create the consumer with the correct config + if _, err = w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + consumerConfig, + ); err != nil { + logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + } + // Bind to our durable consumer. We want to receive all messages waiting // for this subject and we want to manually acknowledge them, so that we // can ensure they are only cleaned up when we are done processing them. @@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { nats.InactiveThreshold(inactiveThreshold), ) if err != nil { - logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) + logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) return } @@ -263,21 +309,23 @@ func (w *worker) _next() { return } + // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() + // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS + _ = msg.Term(nats.AckWait(time.Second * 5)) return } if scope := sentry.CurrentHub().Scope(); scope != nil { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() // Process the room event. If something goes wrong then we'll tell // NATS to terminate the message. We'll store the error result as @@ -307,10 +355,15 @@ func (w *worker) _next() { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process event") } - _ = msg.Term() + // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled), + // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg + // after restarting. We only Ack if the context was not yet canceled. + if w.r.ProcessContext.Context().Err() == nil { + _ = msg.AckSync() + } errString = err.Error() } else { - _ = msg.Ack() + _ = msg.AckSync() } // If it was a synchronous input request then the "sync" field @@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents( }).Error("Roomserver failed to queue async event") return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } + + // Now that the event is queued, increment the room backpressure + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } return } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 520f82a8..1d920843 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -48,8 +48,10 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" ) -// TODO: Does this value make sense? -const MaximumMissingProcessingTime = time.Minute * 2 +// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch +// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded +// NATS queues the event for redelivery. +const MaximumMissingProcessingTime = time.Minute * 5 var processRoomEventDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index e9cd926d..88e33571 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -12,7 +12,9 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) { assert.Equal(t, false, banned) }) } + +// Validate that changing the AckPolicy/AckWait of room consumers +// results in their recreation +func TestRoomConsumerRecreation(t *testing.T) { + + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + + // As this is DB unrelated, just use SQLite + cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + // Prepare a stream and consumer using the old configuration + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent) + consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID)) + subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID)) + + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: (time.Minute * 2) + (time.Second * 10), + InactiveThreshold: time.Hour * 24, + } + + // Create the consumer with the old config + _, err := jsCtx.AddConsumer(streamName, consumerConfig) + assert.NoError(t, err) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // start JetStream listeners + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // let the RS create the events, this also recreates the Consumers + err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false) + assert.NoError(t, err) + + // Validate that AckPolicy and AckWait has changed + info, err := jsCtx.ConsumerInfo(streamName, consumer) + assert.NoError(t, err) + assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy) + + wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10) + assert.Equal(t, wantAckWait, info.Config.AckWait) +} |