aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-01-07 13:41:53 +0000
committerNeil Alexander <neilalexander@users.noreply.github.com>2022-01-07 13:41:53 +0000
commita42232143526de360309b112b57cf0d95adf47cb (patch)
tree3d2b9fff1b184fb0960019d73fb8f0ea7e149964 /roomserver
parent173b1e8d3e800c0029725bcda321a240b5352f7d (diff)
Fix panic at startup if roomserver was not given federation API reference by the time NATS consumes an event, tweak backpressure metrics
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/api.go7
-rw-r--r--roomserver/internal/input/input.go9
2 files changed, 10 insertions, 6 deletions
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 5cfe68da..e370f7e4 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -72,9 +72,6 @@ func NewRoomserverAPI(
},
// perform-er structs get initialised when we have a federation sender to use
}
- if err := a.Inputer.Start(); err != nil {
- logrus.WithError(err).Panic("failed to start roomserver input API")
- }
return a
}
@@ -140,6 +137,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.Forgetter = &perform.Forgetter{
DB: r.DB,
}
+
+ if err := r.Inputer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start roomserver input API")
+ }
}
func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 1eab6780..dbff5fdd 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -59,14 +59,15 @@ func (r *Inputer) Start() error {
// later, possibly with an error response to the inputter if synchronous.
func(msg *nats.Msg) {
roomID := msg.Header.Get("room_id")
- defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
return
}
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
inbox.(*phony.Inbox).Act(nil, func() {
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err)
} else {
@@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents(
if _, err = r.JetStream.PublishMsg(msg); err != nil {
return
}
- roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
} else {
responses := make(chan error, len(request.InputRoomEvents))
defer close(responses)
for _, e := range request.InputRoomEvents {
inputRoomEvent := e
- inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
+ roomID := inputRoomEvent.Event.RoomID()
+ inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
inbox.(*phony.Inbox).Act(nil, func() {
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
if err != nil {
sentry.CaptureException(err)