aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-23 10:20:18 +0000
committerGitHub <noreply@github.com>2022-03-23 10:20:18 +0000
commit98a5e410d7ecae49f525ddabc55a86d8a6731f22 (patch)
treeb957d21ae71c0d298add10417df8ae53808aaae3 /roomserver
parent9572f5ed19abc0b635092108aa6956eaebc60578 (diff)
Per-room consumers (#2293)
* Roomserver input refactoring — again! * Ensure the actor runs again * Preserve consumer after unsubscribe * Another sprinkling of magic * Rename `TopicFor` to `Prefixed` * Recreate the stream if the config is bad * Check streams too * Prefix subjects, preserve inboxes * Recreate if subjects wrong * Remove stream subject * Reconstruct properly * Fix mutex unlock * Comments * Fix tests * Don't drop events * Review comments * Separate `queueInputRoomEvents` function * Re-jig control flow a bit
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/api.go1
-rw-r--r--roomserver/internal/input/input.go349
-rw-r--r--roomserver/roomserver.go4
3 files changed, 256 insertions, 98 deletions
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 91001e41..f96cefcb 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing
r.Inputer = &input.Inputer{
+ Cfg: r.Cfg,
ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index c6e35461..6a8ae6d0 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"errors"
+ "fmt"
"sync"
"time"
@@ -29,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
@@ -45,7 +47,35 @@ var keyContentFields = map[string]string{
"m.room.member": "membership",
}
+// Inputer is responsible for consuming from the roomserver input
+// streams and processing the events. All input events are queued
+// into a single NATS stream and the order is preserved strictly.
+// The `room_id` message header will contain the room ID which will
+// be used to assign the pending event to a per-room worker.
+//
+// The input API maintains an ephemeral headers-only consumer. It
+// will speed through the stream working out which room IDs are
+// pending and create durable consumers for them. The durable
+// consumer will then be used for each room worker goroutine to
+// fetch events one by one and process them. Each room having a
+// durable consumer of its own means there is no head-of-line
+// blocking between rooms. Filtering ensures that each durable
+// consumer only receives events for the room it is interested in.
+//
+// The ephemeral consumer closely tracks the newest events. The
+// per-room durable consumers will only progress through the stream
+// as events are processed.
+//
+// A BC * -> positions of each consumer (* = ephemeral)
+// ⌄ ⌄⌄ ⌄
+// ABAABCAABCAA -> newest (letter = subject for each message)
+//
+// In this example, A is still processing an event but has two
+// pending events to process afterwards. Both B and C are caught
+// up, so they will do nothing until a new event comes in for B
+// or C.
type Inputer struct {
+ Cfg *config.RoomServer
ProcessContext *process.ProcessContext
DB storage.Database
NATSClient *nats.Conn
@@ -57,147 +87,275 @@ type Inputer struct {
ACLs *acls.ServerACLs
InputRoomEventTopic string
OutputRoomEventTopic string
- workers sync.Map // room ID -> *phony.Inbox
+ workers sync.Map // room ID -> *worker
Queryer *query.Queryer
}
-func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
- inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
- return inbox.(*phony.Inbox)
+type worker struct {
+ phony.Inbox
+ sync.Mutex
+ r *Inputer
+ roomID string
+ subscription *nats.Subscription
}
-// eventsInProgress is an in-memory map to keep a track of which events we have
-// queued up for processing. If we get a redelivery from NATS and we still have
-// the queued up item then we won't do anything with the redelivered message. If
-// we've restarted Dendrite and now this map is empty then it means that we will
-// reload pending work from NATS.
-var eventsInProgress sync.Map
+func (r *Inputer) startWorkerForRoom(roomID string) {
+ v, loaded := r.workers.LoadOrStore(roomID, &worker{
+ r: r,
+ roomID: roomID,
+ })
+ w := v.(*worker)
+ w.Lock()
+ defer w.Unlock()
+ if !loaded || w.subscription == nil {
+ consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
+ subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
-// onMessage is called when a new event arrives in the roomserver input stream.
-func (r *Inputer) Start() error {
- _, err := r.JetStream.Subscribe(
- r.InputRoomEventTopic,
- // We specifically don't use jetstream.WithJetStreamMessage here because we
- // queue the task off to a room-specific queue and the ACK needs to be sent
- // later, possibly with an error response to the inputter if synchronous.
- func(msg *nats.Msg) {
- roomID := msg.Header.Get("room_id")
- var inputRoomEvent api.InputRoomEvent
- if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
- _ = msg.Term()
- return
- }
+ // Create the consumer. We do this as a specific step rather than
+ // letting PullSubscribe create it for us because we need the consumer
+ // to outlive the subscription. If we do it this way, we can Bind in the
+ // next step, and when we Unsubscribe, the consumer continues to live. If
+ // we leave PullSubscribe to create the durable consumer, Unsubscribe will
+ // delete it because it thinks it "owns" it, which in turn breaks the
+ // interest-based retention storage policy.
+ // If the durable consumer already exists, this is effectively a no-op.
+ // Another interesting tid-bit here: the ACK policy is set to "all" so that
+ // if we acknowledge a message, we also acknowledge everything that comes
+ // before it. This is necessary because otherwise our consumer will never
+ // acknowledge things we filtered out for other subjects and therefore they
+ // will linger around forever.
+ if _, err := w.r.JetStream.AddConsumer(
+ r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
+ &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ },
+ ); err != nil {
+ logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
+ return
+ }
- _ = msg.InProgress()
- index := roomID + "\000" + inputRoomEvent.Event.EventID()
- if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
- // We're already waiting to deal with this event, so there's no
- // point in queuing it up again. We've notified NATS that we're
- // working on the message still, so that will have deferred the
- // redelivery by a bit.
- return
- }
+ // Bind to our durable consumer. We want to receive all messages waiting
+ // for this subject and we want to manually acknowledge them, so that we
+ // can ensure they are only cleaned up when we are done processing them.
+ sub, err := w.r.JetStream.PullSubscribe(
+ subject, consumer,
+ nats.ManualAck(),
+ nats.DeliverAll(),
+ nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
+ nats.Bind(r.InputRoomEventTopic, consumer),
+ )
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
+ return
+ }
- roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
- r.workerForRoom(roomID).Act(nil, func() {
- _ = msg.InProgress() // resets the acknowledgement wait timer
- defer eventsInProgress.Delete(index)
- defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- var errString string
- if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
- if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
- sentry.CaptureException(err)
- }
- logrus.WithError(err).WithFields(logrus.Fields{
- "room_id": roomID,
- "event_id": inputRoomEvent.Event.EventID(),
- "type": inputRoomEvent.Event.Type(),
- }).Warn("Roomserver failed to process async event")
- _ = msg.Term()
- errString = err.Error()
- } else {
- _ = msg.Ack()
- }
- if replyTo := msg.Header.Get("sync"); replyTo != "" {
- if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
- logrus.WithError(err).WithFields(logrus.Fields{
- "room_id": roomID,
- "event_id": inputRoomEvent.Event.EventID(),
- "type": inputRoomEvent.Event.Type(),
- }).Warn("Roomserver failed to respond for sync event")
- }
- }
- })
+ // Go and start pulling messages off the queue.
+ w.subscription = sub
+ w.Act(nil, w._next)
+ }
+}
+
+// Start creates an ephemeral non-durable consumer on the roomserver
+// input topic. It is configured to deliver us headers only because we
+// don't actually care about the contents of the message at this point,
+// we only care about the `room_id` field. Once a message arrives, we
+// will look to see if we have a worker for that room which has its
+// own consumer. If we don't, we'll start one.
+func (r *Inputer) Start() error {
+ _, err := r.JetStream.Subscribe(
+ "", // This is blank because we specified it in BindStream.
+ func(m *nats.Msg) {
+ roomID := m.Header.Get(jetstream.RoomID)
+ r.startWorkerForRoom(roomID)
+ _ = m.Ack()
},
- // NATS wants to acknowledge automatically by default when the message is
- // read from the stream, but we want to override that behaviour by making
- // sure that we only acknowledge when we're happy we've done everything we
- // can. This ensures we retry things when it makes sense to do so.
- nats.ManualAck(),
- // Use a durable named consumer.
- r.Durable,
- // If we've missed things in the stream, e.g. we restarted, then replay
- // all of the queued messages that were waiting for us.
+ nats.HeadersOnly(),
nats.DeliverAll(),
- // Ensure that NATS doesn't try to resend us something that wasn't done
- // within the period of time that we might still be processing it.
- nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
- // It is recommended to disable this for pull consumers as per the docs:
- // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers
- nats.MaxAckPending(-1),
+ nats.AckAll(),
+ nats.BindStream(r.InputRoomEventTopic),
)
return err
}
-// InputRoomEvents implements api.RoomserverInternalAPI
-func (r *Inputer) InputRoomEvents(
+// _next is called by the worker for the room. It must only be called
+// by the actor embedded into the worker.
+func (w *worker) _next() {
+ // Look up what the next event is that's waiting to be processed.
+ ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
+ defer cancel()
+ msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
+ switch err {
+ case nil:
+ // Make sure that once we're done here, we queue up another call
+ // to _next in the inbox.
+ defer w.Act(nil, w._next)
+
+ // If no error was reported, but we didn't get exactly one message,
+ // then skip over this and try again on the next iteration.
+ if len(msgs) != 1 {
+ return
+ }
+
+ case context.DeadlineExceeded:
+ // The context exceeded, so we've been waiting for more than a
+ // minute for activity in this room. At this point we will shut
+ // down the subscriber to free up resources. It'll get started
+ // again if new activity happens.
+ if err = w.subscription.Unsubscribe(); err != nil {
+ logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
+ }
+ w.Lock()
+ w.subscription = nil
+ w.Unlock()
+ return
+
+ default:
+ // Something went wrong while trying to fetch the next event
+ // from the queue. In which case, we'll shut down the subscriber
+ // and wait to be notified about new room activity again. Maybe
+ // the problem will be corrected by then.
+ logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)
+ if err = w.subscription.Unsubscribe(); err != nil {
+ logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
+ }
+ w.Lock()
+ w.subscription = nil
+ w.Unlock()
+ return
+ }
+
+ // Try to unmarshal the input room event. If the JSON unmarshalling
+ // fails then we'll terminate the message — this notifies NATS that
+ // we are done with the message and never want to see it again.
+ msg := msgs[0]
+ var inputRoomEvent api.InputRoomEvent
+ if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
+ _ = msg.Term()
+ return
+ }
+
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
+
+ // Process the room event. If something goes wrong then we'll tell
+ // NATS to terminate the message. We'll store the error result as
+ // a string, because we might want to return that to the caller if
+ // it was a synchronous request.
+ var errString string
+ if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
+ if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
+ sentry.CaptureException(err)
+ }
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "room_id": w.roomID,
+ "event_id": inputRoomEvent.Event.EventID(),
+ "type": inputRoomEvent.Event.Type(),
+ }).Warn("Roomserver failed to process async event")
+ _ = msg.Term()
+ errString = err.Error()
+ } else {
+ _ = msg.Ack()
+ }
+
+ // If it was a synchronous input request then the "sync" field
+ // will be present in the message. That means that someone is
+ // waiting for a response. The temporary inbox name is present in
+ // that field, so send back the error string (if any). If there
+ // was no error then we'll return a blank message, which means
+ // that everything was OK.
+ if replyTo := msg.Header.Get("sync"); replyTo != "" {
+ if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "room_id": w.roomID,
+ "event_id": inputRoomEvent.Event.EventID(),
+ "type": inputRoomEvent.Event.Type(),
+ }).Warn("Roomserver failed to respond for sync event")
+ }
+ }
+}
+
+// queueInputRoomEvents queues events into the roomserver input
+// stream in NATS.
+func (r *Inputer) queueInputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
- response *api.InputRoomEventsResponse,
-) {
+) (replySub *nats.Subscription, err error) {
+ // If the request is synchronous then we need to create a
+ // temporary inbox to wait for responses on, and then create
+ // a subscription to it. If it's asynchronous then we won't
+ // bother, so these values will remain empty.
var replyTo string
- var replySub *nats.Subscription
if !request.Asynchronous {
- var err error
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
if err != nil {
- response.ErrMsg = err.Error()
- return
+ return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err)
+ }
+ if replySub == nil {
+ // This shouldn't ever happen, but it doesn't hurt to check
+ // because we can potentially avoid a nil pointer panic later
+ // if it did for some reason.
+ return nil, fmt.Errorf("expected a subscription to the temporary inbox")
}
}
- var err error
+ // For each event, marshal the input room event and then
+ // send it into the input queue.
for _, e := range request.InputRoomEvents {
+ roomID := e.Event.RoomID()
+ subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{
- Subject: r.InputRoomEventTopic,
+ Subject: subj,
Header: nats.Header{},
- Reply: replyTo,
}
- roomID := e.Event.RoomID()
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
- response.ErrMsg = err.Error()
- return
+ return nil, fmt.Errorf("json.Marshal: %w", err)
}
- if _, err = r.JetStream.PublishMsg(msg); err != nil {
+ if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
+ "subj": subj,
}).Error("Roomserver failed to queue async event")
- return
+ return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
}
+ return
+}
- if request.Asynchronous || replySub == nil {
+// InputRoomEvents implements api.RoomserverInternalAPI
+func (r *Inputer) InputRoomEvents(
+ ctx context.Context,
+ request *api.InputRoomEventsRequest,
+ response *api.InputRoomEventsResponse,
+) {
+ // Queue up the event into the roomserver.
+ replySub, err := r.queueInputRoomEvents(ctx, request)
+ if err != nil {
+ response.ErrMsg = err.Error()
return
}
+ // If we aren't waiting for synchronous responses then we can
+ // give up here, there is nothing further to do.
+ if replySub == nil {
+ return
+ }
+
+ // Otherwise, we'll want to sit and wait for the responses
+ // from the roomserver. There will be one response for every
+ // input we submitted. The last error value we receive will
+ // be the one returned as the error string.
defer replySub.Drain() // nolint:errcheck
for i := 0; i < len(request.InputRoomEvents); i++ {
msg, err := replySub.NextMsgWithContext(ctx)
@@ -207,7 +365,6 @@ func (r *Inputer) InputRoomEvents(
}
if len(msg.Data) > 0 {
response.ErrMsg = string(msg.Data)
- return
}
}
}
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index 896773ba..36e3c526 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -54,8 +54,8 @@ func NewInternalAPI(
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc,
- cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
- cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
+ cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,
)
}