diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-03-16 14:21:11 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-16 14:21:11 +0000 |
commit | e30aa38fb0d4ebe4ccc2adcbcdf3211b9a1d3ec7 (patch) | |
tree | cd4d41b6d6113dcc9f12f42da7350dec24731dc8 /roomserver/internal/input | |
parent | 485367fcfa2fe25bf7ba3edab2a1f099ad4dd867 (diff) |
Stream tweaks, use same codepath for sync vs async input room events, wait for error response via NATS messages (#2283)
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r-- | roomserver/internal/input/input.go | 129 |
1 files changed, 64 insertions, 65 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 178533de..c6e35461 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -48,6 +48,7 @@ var keyContentFields = map[string]string{ type Inputer struct { ProcessContext *process.ProcessContext DB storage.Database + NATSClient *nats.Conn JetStream nats.JetStreamContext Durable nats.SubOpt ServerName gomatrixserverlib.ServerName @@ -103,6 +104,7 @@ func (r *Inputer) Start() error { _ = msg.InProgress() // resets the acknowledgement wait timer defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() + var errString string if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err) @@ -113,9 +115,19 @@ func (r *Inputer) Start() error { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process async event") _ = msg.Term() + errString = err.Error() } else { _ = msg.Ack() } + if replyTo := msg.Header.Get("sync"); replyTo != "" { + if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": inputRoomEvent.Event.EventID(), + "type": inputRoomEvent.Event.Type(), + }).Warn("Roomserver failed to respond for sync event") + } + } }) }, // NATS wants to acknowledge automatically by default when the message is @@ -131,6 +143,9 @@ func (r *Inputer) Start() error { // Ensure that NATS doesn't try to resend us something that wasn't done // within the period of time that we might still be processing it. nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), + // It is recommended to disable this for pull consumers as per the docs: + // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers + nats.MaxAckPending(-1), ) return err } @@ -141,74 +156,58 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - if request.Asynchronous { + var replyTo string + var replySub *nats.Subscription + if !request.Asynchronous { var err error - for _, e := range request.InputRoomEvents { - msg := &nats.Msg{ - Subject: r.InputRoomEventTopic, - Header: nats.Header{}, - } - roomID := e.Event.RoomID() - msg.Header.Set("room_id", roomID) - msg.Data, err = json.Marshal(e) - if err != nil { - response.ErrMsg = err.Error() - return - } - if _, err = r.JetStream.PublishMsg(msg); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": e.Event.EventID(), - }).Error("Roomserver failed to queue async event") - return - } + replyTo = nats.NewInbox() + replySub, err = r.NATSClient.SubscribeSync(replyTo) + if err != nil { + response.ErrMsg = err.Error() + return } - } else { - responses := make(chan error, len(request.InputRoomEvents)) - for _, e := range request.InputRoomEvents { - inputRoomEvent := e - roomID := inputRoomEvent.Event.RoomID() - index := roomID + "\000" + inputRoomEvent.Event.EventID() - if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok { - // We're already waiting to deal with this event, so there's no - // point in queuing it up again. We've notified NATS that we're - // working on the message still, so that will have deferred the - // redelivery by a bit. - return - } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - worker := r.workerForRoom(roomID) - worker.Act(nil, func() { - defer eventsInProgress.Delete(index) - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - err := r.processRoomEvent(ctx, &inputRoomEvent) - if err != nil { - if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - sentry.CaptureException(err) - } - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": inputRoomEvent.Event.EventID(), - }).Warn("Roomserver failed to process sync event") - } - select { - case <-ctx.Done(): - default: - responses <- err - } - }) + } + + var err error + for _, e := range request.InputRoomEvents { + msg := &nats.Msg{ + Subject: r.InputRoomEventTopic, + Header: nats.Header{}, + Reply: replyTo, } - for i := 0; i < len(request.InputRoomEvents); i++ { - select { - case <-ctx.Done(): - response.ErrMsg = context.DeadlineExceeded.Error() - return - case err := <-responses: - if err != nil { - response.ErrMsg = err.Error() - return - } - } + roomID := e.Event.RoomID() + msg.Header.Set("room_id", roomID) + if replyTo != "" { + msg.Header.Set("sync", replyTo) + } + msg.Data, err = json.Marshal(e) + if err != nil { + response.ErrMsg = err.Error() + return + } + if _, err = r.JetStream.PublishMsg(msg); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": e.Event.EventID(), + }).Error("Roomserver failed to queue async event") + return + } + } + + if request.Asynchronous || replySub == nil { + return + } + + defer replySub.Drain() // nolint:errcheck + for i := 0; i < len(request.InputRoomEvents); i++ { + msg, err := replySub.NextMsgWithContext(ctx) + if err != nil { + response.ErrMsg = err.Error() + return + } + if len(msg.Data) > 0 { + response.ErrMsg = string(msg.Data) + return } } } |