aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--roomserver/internal/input/input.go34
1 files changed, 29 insertions, 5 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index c47793f0..f5099ca1 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -89,6 +89,13 @@ type Inputer struct {
Queryer *query.Queryer
}
+// If a room consumer is inactive for a while then we will allow NATS
+// to clean it up. This stops us from holding onto durable consumers
+// indefinitely for rooms that might no longer be active, since they do
+// have an interest overhead in the NATS Server. If the room becomes
+// active again then we'll recreate the consumer anyway.
+const inactiveThreshold = time.Hour * 24
+
type worker struct {
phony.Inbox
sync.Mutex
@@ -125,11 +132,12 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
- Durable: consumer,
- AckPolicy: nats.AckAllPolicy,
- DeliverPolicy: nats.DeliverAllPolicy,
- FilterSubject: subject,
- AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ InactiveThreshold: inactiveThreshold,
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
@@ -145,6 +153,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.DeliverAll(),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
nats.Bind(r.InputRoomEventTopic, consumer),
+ nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
@@ -180,6 +189,21 @@ func (r *Inputer) Start() error {
nats.ReplayInstant(),
nats.BindStream(r.InputRoomEventTopic),
)
+
+ // Make sure that the room consumers have the right config.
+ stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
+ for consumer := range r.JetStream.Consumers(stream) {
+ switch {
+ case consumer.Config.Durable == "":
+ continue // Ignore ephemeral consumers
+ case consumer.Config.InactiveThreshold != inactiveThreshold:
+ consumer.Config.InactiveThreshold = inactiveThreshold
+ if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil {
+ logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name)
+ }
+ }
+ }
+
return err
}