aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/input
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-16 14:21:11 +0000
committerGitHub <noreply@github.com>2022-03-16 14:21:11 +0000
commite30aa38fb0d4ebe4ccc2adcbcdf3211b9a1d3ec7 (patch)
treecd4d41b6d6113dcc9f12f42da7350dec24731dc8 /roomserver/internal/input
parent485367fcfa2fe25bf7ba3edab2a1f099ad4dd867 (diff)
Stream tweaks, use same codepath for sync vs async input room events, wait for error response via NATS messages (#2283)
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r--roomserver/internal/input/input.go129
1 files changed, 64 insertions, 65 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 178533de..c6e35461 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -48,6 +48,7 @@ var keyContentFields = map[string]string{
type Inputer struct {
ProcessContext *process.ProcessContext
DB storage.Database
+ NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName
@@ -103,6 +104,7 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
+ var errString string
if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
@@ -113,9 +115,19 @@ func (r *Inputer) Start() error {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
_ = msg.Term()
+ errString = err.Error()
} else {
_ = msg.Ack()
}
+ if replyTo := msg.Header.Get("sync"); replyTo != "" {
+ if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "room_id": roomID,
+ "event_id": inputRoomEvent.Event.EventID(),
+ "type": inputRoomEvent.Event.Type(),
+ }).Warn("Roomserver failed to respond for sync event")
+ }
+ }
})
},
// NATS wants to acknowledge automatically by default when the message is
@@ -131,6 +143,9 @@ func (r *Inputer) Start() error {
// Ensure that NATS doesn't try to resend us something that wasn't done
// within the period of time that we might still be processing it.
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
+ // It is recommended to disable this for pull consumers as per the docs:
+ // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers
+ nats.MaxAckPending(-1),
)
return err
}
@@ -141,74 +156,58 @@ func (r *Inputer) InputRoomEvents(
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) {
- if request.Asynchronous {
+ var replyTo string
+ var replySub *nats.Subscription
+ if !request.Asynchronous {
var err error
- for _, e := range request.InputRoomEvents {
- msg := &nats.Msg{
- Subject: r.InputRoomEventTopic,
- Header: nats.Header{},
- }
- roomID := e.Event.RoomID()
- msg.Header.Set("room_id", roomID)
- msg.Data, err = json.Marshal(e)
- if err != nil {
- response.ErrMsg = err.Error()
- return
- }
- if _, err = r.JetStream.PublishMsg(msg); err != nil {
- logrus.WithError(err).WithFields(logrus.Fields{
- "room_id": roomID,
- "event_id": e.Event.EventID(),
- }).Error("Roomserver failed to queue async event")
- return
- }
+ replyTo = nats.NewInbox()
+ replySub, err = r.NATSClient.SubscribeSync(replyTo)
+ if err != nil {
+ response.ErrMsg = err.Error()
+ return
}
- } else {
- responses := make(chan error, len(request.InputRoomEvents))
- for _, e := range request.InputRoomEvents {
- inputRoomEvent := e
- roomID := inputRoomEvent.Event.RoomID()
- index := roomID + "\000" + inputRoomEvent.Event.EventID()
- if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
- // We're already waiting to deal with this event, so there's no
- // point in queuing it up again. We've notified NATS that we're
- // working on the message still, so that will have deferred the
- // redelivery by a bit.
- return
- }
- roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
- worker := r.workerForRoom(roomID)
- worker.Act(nil, func() {
- defer eventsInProgress.Delete(index)
- defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- err := r.processRoomEvent(ctx, &inputRoomEvent)
- if err != nil {
- if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
- sentry.CaptureException(err)
- }
- logrus.WithError(err).WithFields(logrus.Fields{
- "room_id": roomID,
- "event_id": inputRoomEvent.Event.EventID(),
- }).Warn("Roomserver failed to process sync event")
- }
- select {
- case <-ctx.Done():
- default:
- responses <- err
- }
- })
+ }
+
+ var err error
+ for _, e := range request.InputRoomEvents {
+ msg := &nats.Msg{
+ Subject: r.InputRoomEventTopic,
+ Header: nats.Header{},
+ Reply: replyTo,
}
- for i := 0; i < len(request.InputRoomEvents); i++ {
- select {
- case <-ctx.Done():
- response.ErrMsg = context.DeadlineExceeded.Error()
- return
- case err := <-responses:
- if err != nil {
- response.ErrMsg = err.Error()
- return
- }
- }
+ roomID := e.Event.RoomID()
+ msg.Header.Set("room_id", roomID)
+ if replyTo != "" {
+ msg.Header.Set("sync", replyTo)
+ }
+ msg.Data, err = json.Marshal(e)
+ if err != nil {
+ response.ErrMsg = err.Error()
+ return
+ }
+ if _, err = r.JetStream.PublishMsg(msg); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "room_id": roomID,
+ "event_id": e.Event.EventID(),
+ }).Error("Roomserver failed to queue async event")
+ return
+ }
+ }
+
+ if request.Asynchronous || replySub == nil {
+ return
+ }
+
+ defer replySub.Drain() // nolint:errcheck
+ for i := 0; i < len(request.InputRoomEvents); i++ {
+ msg, err := replySub.NextMsgWithContext(ctx)
+ if err != nil {
+ response.ErrMsg = err.Error()
+ return
+ }
+ if len(msg.Data) > 0 {
+ response.ErrMsg = string(msg.Data)
+ return
}
}
}