aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-10-14 15:14:29 +0100
committerGitHub <noreply@github.com>2022-10-14 15:14:29 +0100
commitcd8f7e125172629984f60546446e7c9d261b26c0 (patch)
tree1f024be4514de74f3328c2a069726a6c5ad4bf6c /roomserver
parenteac5678449f9134769344a6d7b3761e79eed1328 (diff)
Set inactivity threshold on durable consumers in the roomserver input API (#2795)
This prevents us from holding onto durable consumers indefinitely for rooms that have long since turned inactive, since they do have a bit of a processing overhead in the NATS Server. If we clear up a consumer and then a room becomes active again, the consumer gets recreated as needed. The threshold is set to 24 hours for now, we can tweak it later if needs be.
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/input/input.go34
1 files changed, 29 insertions, 5 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index c47793f0..f5099ca1 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -89,6 +89,13 @@ type Inputer struct {
Queryer *query.Queryer
}
+// If a room consumer is inactive for a while then we will allow NATS
+// to clean it up. This stops us from holding onto durable consumers
+// indefinitely for rooms that might no longer be active, since they do
+// have an interest overhead in the NATS Server. If the room becomes
+// active again then we'll recreate the consumer anyway.
+const inactiveThreshold = time.Hour * 24
+
type worker struct {
phony.Inbox
sync.Mutex
@@ -125,11 +132,12 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
- Durable: consumer,
- AckPolicy: nats.AckAllPolicy,
- DeliverPolicy: nats.DeliverAllPolicy,
- FilterSubject: subject,
- AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ InactiveThreshold: inactiveThreshold,
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
@@ -145,6 +153,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.DeliverAll(),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
nats.Bind(r.InputRoomEventTopic, consumer),
+ nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
@@ -180,6 +189,21 @@ func (r *Inputer) Start() error {
nats.ReplayInstant(),
nats.BindStream(r.InputRoomEventTopic),
)
+
+ // Make sure that the room consumers have the right config.
+ stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
+ for consumer := range r.JetStream.Consumers(stream) {
+ switch {
+ case consumer.Config.Durable == "":
+ continue // Ignore ephemeral consumers
+ case consumer.Config.InactiveThreshold != inactiveThreshold:
+ consumer.Config.InactiveThreshold = inactiveThreshold
+ if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil {
+ logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name)
+ }
+ }
+ }
+
return err
}