diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-01-07 13:41:53 +0000 |
---|---|---|
committer | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-01-07 13:41:53 +0000 |
commit | a42232143526de360309b112b57cf0d95adf47cb (patch) | |
tree | 3d2b9fff1b184fb0960019d73fb8f0ea7e149964 /roomserver | |
parent | 173b1e8d3e800c0029725bcda321a240b5352f7d (diff) |
Fix panic at startup if roomserver was not given federation API reference by the time NATS consumes an event, tweak backpressure metrics
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/internal/api.go | 7 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 9 |
2 files changed, 10 insertions, 6 deletions
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 5cfe68da..e370f7e4 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -72,9 +72,6 @@ func NewRoomserverAPI( }, // perform-er structs get initialised when we have a federation sender to use } - if err := a.Inputer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start roomserver input API") - } return a } @@ -140,6 +137,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.Forgetter = &perform.Forgetter{ DB: r.DB, } + + if err := r.Inputer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start roomserver input API") + } } func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) { diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1eab6780..dbff5fdd 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -59,14 +59,15 @@ func (r *Inputer) Start() error { // later, possibly with an error response to the inputter if synchronous. func(msg *nats.Msg) { roomID := msg.Header.Get("room_id") - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { _ = msg.Term() return } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) } else { @@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents( if _, err = r.JetStream.PublishMsg(msg); err != nil { return } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } } else { responses := make(chan error, len(request.InputRoomEvents)) defer close(responses) for _, e := range request.InputRoomEvents { inputRoomEvent := e - inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) + roomID := inputRoomEvent.Event.RoomID() + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(context.TODO(), &inputRoomEvent) if err != nil { sentry.CaptureException(err) |