aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/input/input.go92
-rw-r--r--roomserver/internal/input/input_events.go6
-rw-r--r--roomserver/roomserver_test.go53
3 files changed, 131 insertions, 20 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 40475153..20d2cfc7 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
w.Lock()
defer w.Unlock()
if !loaded || w.subscription == nil {
+ streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
+ logger := logrus.WithFields(logrus.Fields{
+ "stream_name": streamName,
+ "consumer": consumer,
+ })
// Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
@@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
- if _, err := w.r.JetStream.AddConsumer(
- r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
- &nats.ConsumerConfig{
- Durable: consumer,
- AckPolicy: nats.AckAllPolicy,
- DeliverPolicy: nats.DeliverAllPolicy,
- FilterSubject: subject,
- AckWait: MaximumMissingProcessingTime + (time.Second * 10),
- InactiveThreshold: inactiveThreshold,
- },
- ); err != nil {
- logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
+
+ info, err := w.r.JetStream.ConsumerInfo(streamName, consumer)
+ if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) {
+ // log and return, we will retry anyway
+ logger.WithError(err).Errorf("failed to get consumer info")
return
}
+ consumerConfig := &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckExplicitPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ InactiveThreshold: inactiveThreshold,
+ }
+
+ // The consumer already exists, try to update if necessary.
+ if info != nil {
+ // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set
+ // e.g. the consumerName, which is added by NATS later. So this would result
+ // in constantly updating/recreating the consumer.
+ switch {
+ case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds():
+ // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update
+ // existing consumers.
+ fallthrough
+ case info.Config.AckPolicy != consumerConfig.AckPolicy:
+ // We've changed the AckPolicy from AckAll to AckExplicit, this needs a
+ // recreation of the consumer. (Note: Only a few changes actually need a recreat)
+ logger.Warn("Consumer already exists, trying to update it.")
+ // Try updating the consumer first
+ if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil {
+ // We failed to update the consumer, recreate it
+ logger.WithError(err).Warn("Unable to update consumer, recreating...")
+ if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil {
+ logger.WithError(err).Fatal("Unable to delete consumer")
+ return
+ }
+ // Set info to nil, so it can be recreated with the correct config.
+ info = nil
+ }
+ }
+ }
+
+ if info == nil {
+ // Create the consumer with the correct config
+ if _, err = w.r.JetStream.AddConsumer(
+ r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
+ consumerConfig,
+ ); err != nil {
+ logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
+ return
+ }
+ }
+
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
@@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
- logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
+ logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return
}
@@ -263,21 +309,23 @@ func (w *worker) _next() {
return
}
+ // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
+
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0]
var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
- _ = msg.Term()
+ // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS
+ _ = msg.Term(nats.AckWait(time.Second * 5))
return
}
if scope := sentry.CurrentHub().Scope(); scope != nil {
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
}
- roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
- defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
@@ -307,10 +355,15 @@ func (w *worker) _next() {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process event")
}
- _ = msg.Term()
+ // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled),
+ // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg
+ // after restarting. We only Ack if the context was not yet canceled.
+ if w.r.ProcessContext.Context().Err() == nil {
+ _ = msg.AckSync()
+ }
errString = err.Error()
} else {
- _ = msg.Ack()
+ _ = msg.AckSync()
}
// If it was a synchronous input request then the "sync" field
@@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents(
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
+
+ // Now that the event is queued, increment the room backpressure
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
return
}
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 520f82a8..1d920843 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -48,8 +48,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
)
-// TODO: Does this value make sense?
-const MaximumMissingProcessingTime = time.Minute * 2
+// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch
+// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded
+// NATS queues the event for redelivery.
+const MaximumMissingProcessingTime = time.Minute * 5
var processRoomEventDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index e9cd926d..88e33571 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -12,7 +12,9 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/gomatrixserverlib/spec"
+ "github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
@@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) {
assert.Equal(t, false, banned)
})
}
+
+// Validate that changing the AckPolicy/AckWait of room consumers
+// results in their recreation
+func TestRoomConsumerRecreation(t *testing.T) {
+
+ alice := test.NewUser(t)
+ room := test.NewRoom(t, alice)
+
+ // As this is DB unrelated, just use SQLite
+ cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite)
+ defer closeDB()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ natsInstance := &jetstream.NATSInstance{}
+
+ // Prepare a stream and consumer using the old configuration
+ jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+
+ streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent)
+ consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID))
+ subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID))
+
+ consumerConfig := &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: (time.Minute * 2) + (time.Second * 10),
+ InactiveThreshold: time.Hour * 24,
+ }
+
+ // Create the consumer with the old config
+ _, err := jsCtx.AddConsumer(streamName, consumerConfig)
+ assert.NoError(t, err)
+
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ // start JetStream listeners
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ // let the RS create the events, this also recreates the Consumers
+ err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false)
+ assert.NoError(t, err)
+
+ // Validate that AckPolicy and AckWait has changed
+ info, err := jsCtx.ConsumerInfo(streamName, consumer)
+ assert.NoError(t, err)
+ assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy)
+
+ wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10)
+ assert.Equal(t, wantAckWait, info.Config.AckWait)
+}