diff options
-rw-r--r-- | go.mod | 18 | ||||
-rw-r--r-- | go.sum | 36 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 92 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 6 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 53 |
5 files changed, 158 insertions, 47 deletions
@@ -26,8 +26,8 @@ require ( github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.17 - github.com/nats-io/nats-server/v2 v2.9.23 - github.com/nats-io/nats.go v1.28.0 + github.com/nats-io/nats-server/v2 v2.10.7 + github.com/nats-io/nats.go v1.31.0 github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/opentracing/opentracing-go v1.2.0 @@ -42,12 +42,12 @@ require ( github.com/uber/jaeger-lib v2.4.1+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.16.0 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 golang.org/x/image v0.10.0 golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e golang.org/x/sync v0.3.0 - golang.org/x/term v0.13.0 + golang.org/x/term v0.15.0 gopkg.in/h2non/bimg.v1 v1.1.9 gopkg.in/yaml.v2 v2.4.0 gotest.tools/v3 v3.4.0 @@ -94,7 +94,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juju/errors v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -104,7 +104,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.0 // indirect + github.com/nats-io/jwt/v2 v2.5.3 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect @@ -124,9 +124,9 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.12.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/macaroon.v2 v2.1.0 // indirect @@ -190,8 +190,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -242,12 +242,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak= -github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= -github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= +github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= +github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y= +github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -422,24 +422,24 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 40475153..20d2cfc7 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) { w.Lock() defer w.Unlock() if !loaded || w.subscription == nil { + streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) + logger := logrus.WithFields(logrus.Fields{ + "stream_name": streamName, + "consumer": consumer, + }) // Create the consumer. We do this as a specific step rather than // letting PullSubscribe create it for us because we need the consumer // to outlive the subscription. If we do it this way, we can Bind in the @@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // before it. This is necessary because otherwise our consumer will never // acknowledge things we filtered out for other subjects and therefore they // will linger around forever. - if _, err := w.r.JetStream.AddConsumer( - r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - &nats.ConsumerConfig{ - Durable: consumer, - AckPolicy: nats.AckAllPolicy, - DeliverPolicy: nats.DeliverAllPolicy, - FilterSubject: subject, - AckWait: MaximumMissingProcessingTime + (time.Second * 10), - InactiveThreshold: inactiveThreshold, - }, - ); err != nil { - logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + + info, err := w.r.JetStream.ConsumerInfo(streamName, consumer) + if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) { + // log and return, we will retry anyway + logger.WithError(err).Errorf("failed to get consumer info") return } + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + InactiveThreshold: inactiveThreshold, + } + + // The consumer already exists, try to update if necessary. + if info != nil { + // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set + // e.g. the consumerName, which is added by NATS later. So this would result + // in constantly updating/recreating the consumer. + switch { + case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): + // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update + // existing consumers. + fallthrough + case info.Config.AckPolicy != consumerConfig.AckPolicy: + // We've changed the AckPolicy from AckAll to AckExplicit, this needs a + // recreation of the consumer. (Note: Only a few changes actually need a recreat) + logger.Warn("Consumer already exists, trying to update it.") + // Try updating the consumer first + if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil { + // We failed to update the consumer, recreate it + logger.WithError(err).Warn("Unable to update consumer, recreating...") + if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil { + logger.WithError(err).Fatal("Unable to delete consumer") + return + } + // Set info to nil, so it can be recreated with the correct config. + info = nil + } + } + } + + if info == nil { + // Create the consumer with the correct config + if _, err = w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + consumerConfig, + ); err != nil { + logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + } + // Bind to our durable consumer. We want to receive all messages waiting // for this subject and we want to manually acknowledge them, so that we // can ensure they are only cleaned up when we are done processing them. @@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { nats.InactiveThreshold(inactiveThreshold), ) if err != nil { - logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) + logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) return } @@ -263,21 +309,23 @@ func (w *worker) _next() { return } + // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() + // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS + _ = msg.Term(nats.AckWait(time.Second * 5)) return } if scope := sentry.CurrentHub().Scope(); scope != nil { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() // Process the room event. If something goes wrong then we'll tell // NATS to terminate the message. We'll store the error result as @@ -307,10 +355,15 @@ func (w *worker) _next() { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process event") } - _ = msg.Term() + // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled), + // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg + // after restarting. We only Ack if the context was not yet canceled. + if w.r.ProcessContext.Context().Err() == nil { + _ = msg.AckSync() + } errString = err.Error() } else { - _ = msg.Ack() + _ = msg.AckSync() } // If it was a synchronous input request then the "sync" field @@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents( }).Error("Roomserver failed to queue async event") return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } + + // Now that the event is queued, increment the room backpressure + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } return } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 520f82a8..1d920843 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -48,8 +48,10 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" ) -// TODO: Does this value make sense? -const MaximumMissingProcessingTime = time.Minute * 2 +// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch +// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded +// NATS queues the event for redelivery. +const MaximumMissingProcessingTime = time.Minute * 5 var processRoomEventDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index e9cd926d..88e33571 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -12,7 +12,9 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) { assert.Equal(t, false, banned) }) } + +// Validate that changing the AckPolicy/AckWait of room consumers +// results in their recreation +func TestRoomConsumerRecreation(t *testing.T) { + + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + + // As this is DB unrelated, just use SQLite + cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + // Prepare a stream and consumer using the old configuration + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent) + consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID)) + subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID)) + + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: (time.Minute * 2) + (time.Second * 10), + InactiveThreshold: time.Hour * 24, + } + + // Create the consumer with the old config + _, err := jsCtx.AddConsumer(streamName, consumerConfig) + assert.NoError(t, err) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // start JetStream listeners + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // let the RS create the events, this also recreates the Consumers + err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false) + assert.NoError(t, err) + + // Validate that AckPolicy and AckWait has changed + info, err := jsCtx.ConsumerInfo(streamName, consumer) + assert.NoError(t, err) + assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy) + + wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10) + assert.Equal(t, wantAckWait, info.Config.AckWait) +} |