diff options
Diffstat (limited to 'roomserver/internal/input/input.go')
-rw-r--r-- | roomserver/internal/input/input.go | 57 |
1 files changed, 54 insertions, 3 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 7834e2ed..5bdec0a2 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "time" @@ -38,6 +39,19 @@ import ( "github.com/tidwall/gjson" ) +type retryAction int +type commitAction int + +const ( + doNotRetry retryAction = iota + retryLater +) + +const ( + commitTransaction commitAction = iota + rollbackTransaction +) + var keyContentFields = map[string]string{ "m.room.join_rules": "join_rule", "m.room.history_visibility": "history_visibility", @@ -101,7 +115,8 @@ func (r *Inputer) Start() error { _ = msg.InProgress() // resets the acknowledgement wait timer defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - if err := r.processRoomEvent(context.Background(), &inputRoomEvent); err != nil { + action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent) + if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err) } @@ -111,7 +126,12 @@ func (r *Inputer) Start() error { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process async event") } - _ = msg.Ack() + switch action { + case retryLater: + _ = msg.Nak() + case doNotRetry: + _ = msg.Ack() + } }) }, // NATS wants to acknowledge automatically by default when the message is @@ -131,6 +151,37 @@ func (r *Inputer) Start() error { return err } +// processRoomEventUsingUpdater opens up a room updater and tries to +// process the event. It returns whether or not we should positively +// or negatively acknowledge the event (i.e. for NATS) and an error +// if it occurred. +func (r *Inputer) processRoomEventUsingUpdater( + ctx context.Context, + roomID string, + inputRoomEvent *api.InputRoomEvent, +) (retryAction, error) { + roomInfo, err := r.DB.RoomInfo(ctx, roomID) + if err != nil { + return doNotRetry, fmt.Errorf("r.DB.RoomInfo: %w", err) + } + updater, err := r.DB.GetRoomUpdater(ctx, roomInfo) + if err != nil { + return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err) + } + action, err := r.processRoomEvent(ctx, updater, inputRoomEvent) + switch action { + case commitTransaction: + if cerr := updater.Commit(); cerr != nil { + return retryLater, fmt.Errorf("updater.Commit: %w", cerr) + } + case rollbackTransaction: + if rerr := updater.Rollback(); rerr != nil { + return retryLater, fmt.Errorf("updater.Rollback: %w", rerr) + } + } + return doNotRetry, err +} + // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( ctx context.Context, @@ -177,7 +228,7 @@ func (r *Inputer) InputRoomEvents( worker.Act(nil, func() { defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - err := r.processRoomEvent(ctx, &inputRoomEvent) + _, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent) if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err) |