aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/input
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r--roomserver/internal/input/input.go57
-rw-r--r--roomserver/internal/input/input_events.go84
-rw-r--r--roomserver/internal/input/input_latest_events.go18
-rw-r--r--roomserver/internal/input/input_membership.go4
-rw-r--r--roomserver/internal/input/input_missing.go12
5 files changed, 112 insertions, 63 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 7834e2ed..5bdec0a2 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"errors"
+ "fmt"
"sync"
"time"
@@ -38,6 +39,19 @@ import (
"github.com/tidwall/gjson"
)
+type retryAction int
+type commitAction int
+
+const (
+ doNotRetry retryAction = iota
+ retryLater
+)
+
+const (
+ commitTransaction commitAction = iota
+ rollbackTransaction
+)
+
var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
@@ -101,7 +115,8 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- if err := r.processRoomEvent(context.Background(), &inputRoomEvent); err != nil {
+ action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent)
+ if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
@@ -111,7 +126,12 @@ func (r *Inputer) Start() error {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
}
- _ = msg.Ack()
+ switch action {
+ case retryLater:
+ _ = msg.Nak()
+ case doNotRetry:
+ _ = msg.Ack()
+ }
})
},
// NATS wants to acknowledge automatically by default when the message is
@@ -131,6 +151,37 @@ func (r *Inputer) Start() error {
return err
}
+// processRoomEventUsingUpdater opens up a room updater and tries to
+// process the event. It returns whether or not we should positively
+// or negatively acknowledge the event (i.e. for NATS) and an error
+// if it occurred.
+func (r *Inputer) processRoomEventUsingUpdater(
+ ctx context.Context,
+ roomID string,
+ inputRoomEvent *api.InputRoomEvent,
+) (retryAction, error) {
+ roomInfo, err := r.DB.RoomInfo(ctx, roomID)
+ if err != nil {
+ return doNotRetry, fmt.Errorf("r.DB.RoomInfo: %w", err)
+ }
+ updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
+ if err != nil {
+ return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
+ }
+ action, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
+ switch action {
+ case commitTransaction:
+ if cerr := updater.Commit(); cerr != nil {
+ return retryLater, fmt.Errorf("updater.Commit: %w", cerr)
+ }
+ case rollbackTransaction:
+ if rerr := updater.Rollback(); rerr != nil {
+ return retryLater, fmt.Errorf("updater.Rollback: %w", rerr)
+ }
+ }
+ return doNotRetry, err
+}
+
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
ctx context.Context,
@@ -177,7 +228,7 @@ func (r *Inputer) InputRoomEvents(
worker.Act(nil, func() {
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- err := r.processRoomEvent(ctx, &inputRoomEvent)
+ _, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 16703616..f3fa83d8 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/state"
+ "github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -67,14 +68,15 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
// nolint:gocyclo
func (r *Inputer) processRoomEvent(
ctx context.Context,
+ updater *shared.RoomUpdater,
input *api.InputRoomEvent,
-) (err error) {
+) (commitAction, error) {
select {
case <-ctx.Done():
// Before we do anything, make sure the context hasn't expired for this pending task.
// If it has then we'll give up straight away — it's probably a synchronous input
// request and the caller has already given up, but the inbox task was still queued.
- return context.DeadlineExceeded
+ return rollbackTransaction, context.DeadlineExceeded
default:
}
@@ -107,7 +109,7 @@ func (r *Inputer) processRoomEvent(
// if we have already got this event then do not process it again, if the input kind is an outlier.
// Outliers contain no extra information which may warrant a re-processing.
if input.Kind == api.KindOutlier {
- evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
+ evs, err2 := updater.EventsFromIDs(ctx, []string{event.EventID()})
if err2 == nil && len(evs) == 1 {
// check hash matches if we're on early room versions where the event ID was a random string
idFormat, err2 := headered.RoomVersion.EventIDFormat()
@@ -116,11 +118,11 @@ func (r *Inputer) processRoomEvent(
case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
logger.Debugf("Already processed event; ignoring")
- return nil
+ return rollbackTransaction, nil
}
default:
logger.Debugf("Already processed event; ignoring")
- return nil
+ return rollbackTransaction, nil
}
}
}
@@ -134,8 +136,8 @@ func (r *Inputer) processRoomEvent(
AuthEventIDs: event.AuthEventIDs(),
PrevEventIDs: event.PrevEventIDs(),
}
- if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
- return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
+ if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
+ return rollbackTransaction, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
}
}
missingAuth := len(missingRes.MissingAuthEventIDs) > 0
@@ -146,8 +148,8 @@ func (r *Inputer) processRoomEvent(
RoomID: event.RoomID(),
ExcludeSelf: true,
}
- if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
- return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
+ if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
+ return rollbackTransaction, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
}
// Sort all of the servers into a map so that we can randomise
// their order. Then make sure that the input origin and the
@@ -176,8 +178,8 @@ func (r *Inputer) processRoomEvent(
isRejected := false
authEvents := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{}
- if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
- return fmt.Errorf("r.fetchAuthEvents: %w", err)
+ if err := r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
+ return rollbackTransaction, fmt.Errorf("r.fetchAuthEvents: %w", err)
}
// Check if the event is allowed by its auth events. If it isn't then
@@ -193,7 +195,7 @@ func (r *Inputer) processRoomEvent(
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
for _, authEventID := range authEventIDs {
if _, ok := knownEvents[authEventID]; !ok {
- return fmt.Errorf("missing auth event %s", authEventID)
+ return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID)
}
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
}
@@ -202,7 +204,8 @@ func (r *Inputer) processRoomEvent(
if input.Kind == api.KindNew {
// Check that the event passes authentication checks based on the
// current room state.
- softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
+ var err error
+ softfail, err = helpers.CheckForSoftFail(ctx, updater, headered, input.StateEventIDs)
if err != nil {
logger.WithError(err).Warn("Error authing soft-failed event")
}
@@ -227,7 +230,7 @@ func (r *Inputer) processRoomEvent(
origin: input.Origin,
inputer: r,
queryer: r.Queryer,
- db: r.DB,
+ db: updater,
federation: r.FSAPI,
keys: r.KeyRing,
roomsMu: internal.NewMutexByRoom(),
@@ -235,7 +238,7 @@ func (r *Inputer) processRoomEvent(
hadEvents: map[string]bool{},
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
}
- if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
+ if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
isRejected = true
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
} else {
@@ -248,16 +251,16 @@ func (r *Inputer) processRoomEvent(
}
// Store the event.
- _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
+ _, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil {
- return fmt.Errorf("r.DB.StoreEvent: %w", err)
+ return rollbackTransaction, fmt.Errorf("updater.StoreEvent: %w", err)
}
// if storing this event results in it being redacted then do so.
if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, event)
if rerr != nil {
- return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
+ return rollbackTransaction, fmt.Errorf("eventutil.RedactEvent: %w", rerr)
}
event = r
}
@@ -268,23 +271,23 @@ func (r *Inputer) processRoomEvent(
if input.Kind == api.KindOutlier {
logger.Debug("Stored outlier")
hooks.Run(hooks.KindNewEventPersisted, headered)
- return nil
+ return commitTransaction, nil
}
- roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID())
+ roomInfo, err := updater.RoomInfo(ctx, event.RoomID())
if err != nil {
- return fmt.Errorf("r.DB.RoomInfo: %w", err)
+ return rollbackTransaction, fmt.Errorf("updater.RoomInfo: %w", err)
}
if roomInfo == nil {
- return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
+ return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
}
if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet.
// Lets calculate one.
- err = r.calculateAndSetState(ctx, input, roomInfo, &stateAtEvent, event, isRejected)
+ err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
if err != nil {
- return fmt.Errorf("r.calculateAndSetState: %w", err)
+ return rollbackTransaction, fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
@@ -294,13 +297,14 @@ func (r *Inputer) processRoomEvent(
"soft_fail": softfail,
"missing_prev": missingPrev,
}).Warn("Stored rejected event")
- return rejectionErr
+ return commitTransaction, rejectionErr
}
switch input.Kind {
case api.KindNew:
if err = r.updateLatestEvents(
ctx, // context
+ updater, // room updater
roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below)
event, // event
@@ -308,7 +312,7 @@ func (r *Inputer) processRoomEvent(
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
); err != nil {
- return fmt.Errorf("r.updateLatestEvents: %w", err)
+ return rollbackTransaction, fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
@@ -320,7 +324,7 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
- return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
+ return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (old): %w", err)
}
}
@@ -339,14 +343,14 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
- return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
+ return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
}
}
// Everything was OK — the latest events updater didn't error and
// we've sent output events. Finally, generate a hook call.
hooks.Run(hooks.KindNewEventPersisted, headered)
- return nil
+ return commitTransaction, nil
}
// fetchAuthEvents will check to see if any of the
@@ -358,6 +362,7 @@ func (r *Inputer) processRoomEvent(
// they are now in the database.
func (r *Inputer) fetchAuthEvents(
ctx context.Context,
+ updater *shared.RoomUpdater,
logger *logrus.Entry,
event *gomatrixserverlib.HeaderedEvent,
auth *gomatrixserverlib.AuthEvents,
@@ -375,7 +380,7 @@ func (r *Inputer) fetchAuthEvents(
}
for _, authEventID := range authEventIDs {
- authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
+ authEvents, err := updater.EventsFromIDs(ctx, []string{authEventID})
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
unknown[authEventID] = struct{}{}
continue
@@ -454,9 +459,9 @@ func (r *Inputer) fetchAuthEvents(
}
// Finally, store the event in the database.
- eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
+ eventNID, _, _, _, _, err := updater.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
if err != nil {
- return fmt.Errorf("r.DB.StoreEvent: %w", err)
+ return fmt.Errorf("updater.StoreEvent: %w", err)
}
// Now we know about this event, it was stored and the signatures were OK.
@@ -471,6 +476,7 @@ func (r *Inputer) fetchAuthEvents(
func (r *Inputer) calculateAndSetState(
ctx context.Context,
+ updater *shared.RoomUpdater,
input *api.InputRoomEvent,
roomInfo *types.RoomInfo,
stateAtEvent *types.StateAtEvent,
@@ -478,14 +484,14 @@ func (r *Inputer) calculateAndSetState(
isRejected bool,
) error {
var err error
- roomState := state.NewStateResolution(r.DB, roomInfo)
+ roomState := state.NewStateResolution(updater, roomInfo)
if input.HasState {
// Check here if we think we're in the room already.
stateAtEvent.Overwrite = true
var joinEventNIDs []types.EventNID
// Request join memberships only for local users only.
- if joinEventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, true); err == nil {
+ if joinEventNIDs, err = updater.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, true); err == nil {
// If we have no local users that are joined to the room then any state about
// the room that we have is quite possibly out of date. Therefore in that case
// we should overwrite it rather than merge it.
@@ -495,13 +501,13 @@ func (r *Inputer) calculateAndSetState(
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
- if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
- return fmt.Errorf("r.DB.StateEntriesForEventIDs: %w", err)
+ if entries, err = updater.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
+ return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err)
}
entries = types.DeduplicateStateEntries(entries)
- if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
- return fmt.Errorf("r.DB.AddState: %w", err)
+ if stateAtEvent.BeforeStateSnapshotNID, err = updater.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
+ return fmt.Errorf("updater.AddState: %w", err)
}
} else {
stateAtEvent.Overwrite = false
@@ -512,7 +518,7 @@ func (r *Inputer) calculateAndSetState(
}
}
- err = r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
+ err = updater.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
if err != nil {
return fmt.Errorf("r.DB.SetState: %w", err)
}
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index 6137941e..5173d3ab 100644
--- a/roomserver/internal/input/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -20,7 +20,6 @@ import (
"context"
"fmt"
- "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
@@ -48,6 +47,7 @@ import (
// Can only be called once at a time
func (r *Inputer) updateLatestEvents(
ctx context.Context,
+ updater *shared.RoomUpdater,
roomInfo *types.RoomInfo,
stateAtEvent types.StateAtEvent,
event *gomatrixserverlib.Event,
@@ -55,13 +55,6 @@ func (r *Inputer) updateLatestEvents(
transactionID *api.TransactionID,
rewritesState bool,
) (err error) {
- updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo)
- if err != nil {
- return fmt.Errorf("r.DB.GetLatestEventsForUpdate: %w", err)
- }
- succeeded := false
- defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
-
u := latestEventsUpdater{
ctx: ctx,
api: r,
@@ -78,7 +71,6 @@ func (r *Inputer) updateLatestEvents(
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
}
- succeeded = true
return
}
@@ -89,7 +81,7 @@ func (r *Inputer) updateLatestEvents(
type latestEventsUpdater struct {
ctx context.Context
api *Inputer
- updater *shared.LatestEventsUpdater
+ updater *shared.RoomUpdater
roomInfo *types.RoomInfo
stateAtEvent types.StateAtEvent
event *gomatrixserverlib.Event
@@ -199,7 +191,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
func (u *latestEventsUpdater) latestState() error {
var err error
- roomState := state.NewStateResolution(u.api.DB, u.roomInfo)
+ roomState := state.NewStateResolution(u.updater, u.roomInfo)
// Work out if the state at the extremities has actually changed
// or not. If they haven't then we won't bother doing all of the
@@ -413,7 +405,7 @@ func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.Ro
if len(extraEventIDs) == 0 {
return nil, nil
}
- extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs)
+ extraEvents, err := u.updater.EventsFromIDs(u.ctx, extraEventIDs)
if err != nil {
return nil, err
}
@@ -436,7 +428,7 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
- return u.api.DB.EventIDs(u.ctx, stateEventNIDs)
+ return u.updater.EventIDs(u.ctx, stateEventNIDs)
}
type eventNIDSorter []types.EventNID
diff --git a/roomserver/internal/input/input_membership.go b/roomserver/internal/input/input_membership.go
index 2511097d..ff3ed7e5 100644
--- a/roomserver/internal/input/input_membership.go
+++ b/roomserver/internal/input/input_membership.go
@@ -31,7 +31,7 @@ import (
// consumers about the invites added or retired by the change in current state.
func (r *Inputer) updateMemberships(
ctx context.Context,
- updater *shared.LatestEventsUpdater,
+ updater *shared.RoomUpdater,
removed, added []types.StateEntry,
) ([]api.OutputEvent, error) {
changes := membershipChanges(removed, added)
@@ -79,7 +79,7 @@ func (r *Inputer) updateMemberships(
}
func (r *Inputer) updateMembership(
- updater *shared.LatestEventsUpdater,
+ updater *shared.RoomUpdater,
targetUserNID types.EventStateKeyNID,
remove, add *gomatrixserverlib.Event,
updates []api.OutputEvent,
diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go
index d401fa0e..4cd2b3de 100644
--- a/roomserver/internal/input/input_missing.go
+++ b/roomserver/internal/input/input_missing.go
@@ -11,7 +11,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query"
- "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@@ -19,7 +19,7 @@ import (
type missingStateReq struct {
origin gomatrixserverlib.ServerName
- db storage.Database
+ db *shared.RoomUpdater
inputer *Inputer
queryer *query.Queryer
keys gomatrixserverlib.JSONVerifier
@@ -78,7 +78,7 @@ func (t *missingStateReq) processEventWithMissingState(
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
// in the gap in the DAG
for _, newEvent := range newEvents {
- err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
+ _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
Kind: api.KindNew,
Event: newEvent.Headered(roomVersion),
Origin: t.origin,
@@ -187,7 +187,7 @@ func (t *missingStateReq) processEventWithMissingState(
}
// TODO: we could do this concurrently?
for _, ire := range outlierRoomEvents {
- if err = t.inputer.processRoomEvent(ctx, &ire); err != nil {
+ if _, err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil {
return fmt.Errorf("t.inputer.processRoomEvent[outlier]: %w", err)
}
}
@@ -200,7 +200,7 @@ func (t *missingStateReq) processEventWithMissingState(
stateIDs = append(stateIDs, event.EventID())
}
- err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
+ _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
Kind: api.KindOld,
Event: backwardsExtremity.Headered(roomVersion),
Origin: t.origin,
@@ -217,7 +217,7 @@ func (t *missingStateReq) processEventWithMissingState(
// they will automatically fast-forward based on the room state at the
// extremity in the last step.
for _, newEvent := range newEvents {
- err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
+ _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
Kind: api.KindOld,
Event: newEvent.Headered(roomVersion),
Origin: t.origin,