aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/input/input.go
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/internal/input/input.go')
-rw-r--r--roomserver/internal/input/input.go57
1 files changed, 54 insertions, 3 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 7834e2ed..5bdec0a2 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"errors"
+ "fmt"
"sync"
"time"
@@ -38,6 +39,19 @@ import (
"github.com/tidwall/gjson"
)
+type retryAction int
+type commitAction int
+
+const (
+ doNotRetry retryAction = iota
+ retryLater
+)
+
+const (
+ commitTransaction commitAction = iota
+ rollbackTransaction
+)
+
var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
@@ -101,7 +115,8 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- if err := r.processRoomEvent(context.Background(), &inputRoomEvent); err != nil {
+ action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent)
+ if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
@@ -111,7 +126,12 @@ func (r *Inputer) Start() error {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
}
- _ = msg.Ack()
+ switch action {
+ case retryLater:
+ _ = msg.Nak()
+ case doNotRetry:
+ _ = msg.Ack()
+ }
})
},
// NATS wants to acknowledge automatically by default when the message is
@@ -131,6 +151,37 @@ func (r *Inputer) Start() error {
return err
}
+// processRoomEventUsingUpdater opens up a room updater and tries to
+// process the event. It returns whether or not we should positively
+// or negatively acknowledge the event (i.e. for NATS) and an error
+// if it occurred.
+func (r *Inputer) processRoomEventUsingUpdater(
+ ctx context.Context,
+ roomID string,
+ inputRoomEvent *api.InputRoomEvent,
+) (retryAction, error) {
+ roomInfo, err := r.DB.RoomInfo(ctx, roomID)
+ if err != nil {
+ return doNotRetry, fmt.Errorf("r.DB.RoomInfo: %w", err)
+ }
+ updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
+ if err != nil {
+ return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
+ }
+ action, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
+ switch action {
+ case commitTransaction:
+ if cerr := updater.Commit(); cerr != nil {
+ return retryLater, fmt.Errorf("updater.Commit: %w", cerr)
+ }
+ case rollbackTransaction:
+ if rerr := updater.Rollback(); rerr != nil {
+ return retryLater, fmt.Errorf("updater.Rollback: %w", rerr)
+ }
+ }
+ return doNotRetry, err
+}
+
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
ctx context.Context,
@@ -177,7 +228,7 @@ func (r *Inputer) InputRoomEvents(
worker.Act(nil, func() {
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- err := r.processRoomEvent(ctx, &inputRoomEvent)
+ _, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)