aboutsummaryrefslogtreecommitdiff
path: root/federationapi/routing/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r--federationapi/routing/send.go90
1 files changed, 63 insertions, 27 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index fe429521..a6e39821 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -183,7 +183,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
// Process the events.
for _, e := range pdus {
- if err := t.processEvent(ctx, e.Unwrap(), true); err != nil {
+ if err := t.processEvent(ctx, e.Unwrap()); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
@@ -338,11 +338,15 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}
-func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, isInboundTxn bool) error {
+func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) error {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
// Work out if the roomserver knows everything it needs to know to auth
- // the event.
+ // the event. This includes the prev_events and auth_events.
+ // NOTE! This is going to include prev_events that have an empty state
+ // snapshot. This is because we will need to re-request the event, and
+ // it's /state_ids, in order for it to exist in the roomserver correctly
+ // before the roomserver tries to work out
stateReq := api.QueryMissingAuthPrevEventsRequest{
RoomID: e.RoomID(),
AuthEventIDs: e.AuthEventIDs(),
@@ -410,7 +414,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
if len(stateResp.MissingPrevEventIDs) > 0 {
logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs))
- return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion, isInboundTxn)
+ return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion)
}
// pass the event to the roomserver which will do auth checks
@@ -438,7 +442,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState)
}
-func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
+func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
// side retries, we'll have fetched the missing state.
@@ -464,39 +468,82 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
- backwardsExtremity, err := t.getMissingEvents(gmectx, e, roomVersion, isInboundTxn)
+ newEvents, err := t.getMissingEvents(gmectx, e, roomVersion)
if err != nil {
return err
}
- if backwardsExtremity == nil {
- // we filled in the gap!
+ if len(newEvents) == 0 {
return nil
}
+ backwardsExtremity := &newEvents[0]
+ newEvents = newEvents[1:]
+
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
- // security: we have to do state resolution on the new backwards extremity (TODO: WHY)
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*gomatrixserverlib.RespState
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
+ // Look up what the state is after the backward extremity. This will either
+ // come from the roomserver, if we know all the required events, or it will
+ // come from a remote server via /state_ids if not.
var prevState *gomatrixserverlib.RespState
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
if err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return err
}
+ // Append the state onto the collected state. We'll run this through the
+ // state resolution next.
states = append(states, prevState)
}
+
+ // Now that we have collected all of the state from the prev_events, we'll
+ // run the state through the appropriate state resolution algorithm for the
+ // room. This does a couple of things:
+ // 1. Ensures that the state is deduplicated fully for each state-key tuple
+ // 2. Ensures that we pick the latest events from both sets, in the case that
+ // one of the prev_events is quite a bit older than the others
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
if err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
return err
}
- // pass the event along with the state to the roomserver using a background context so we don't
- // needlessly expire
- return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs())
+ // First of all, send the backward extremity into the roomserver with the
+ // newly resolved state. This marks the "oldest" point in the backfill and
+ // sets the baseline state for any new events after this.
+ err = api.SendEventWithState(
+ context.Background(),
+ t.rsAPI,
+ resolvedState,
+ backwardsExtremity.Headered(roomVersion),
+ t.haveEventIDs(),
+ )
+ if err != nil {
+ return fmt.Errorf("api.SendEventWithState: %w", err)
+ }
+
+ // Then send all of the newer backfilled events, of which will all be newer
+ // than the backward extremity, into the roomserver without state. This way
+ // they will automatically fast-forward based on the room state at the
+ // extremity in the last step.
+ headeredNewEvents := make([]gomatrixserverlib.HeaderedEvent, len(newEvents))
+ for i, newEvent := range newEvents {
+ headeredNewEvents[i] = newEvent.Headered(roomVersion)
+ }
+ if err = api.SendEvents(
+ context.Background(),
+ t.rsAPI,
+ append(headeredNewEvents, e.Headered(roomVersion)),
+ api.DoNotSendToOtherServers,
+ nil,
+ ); err != nil {
+ return fmt.Errorf("api.SendEvents: %w", err)
+ }
+
+ return nil
}
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
@@ -652,11 +699,7 @@ retryAllowedState:
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
// This means that we may recursively call this function, as we spider back up prev_events.
// nolint:gocyclo
-func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
- if !isInboundTxn {
- // we've recursed here, so just take a state snapshot please!
- return &e, nil
- }
+func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []gomatrixserverlib.Event, err error) {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
// query latest events (our trusted forward extremities)
@@ -667,7 +710,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
var res api.QueryLatestEventsAndStateResponse
if err = t.rsAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
logger.WithError(err).Warn("Failed to query latest events")
- return &e, nil
+ return nil, err
}
latestEvents := make([]string, len(res.LatestEvents))
for i := range res.LatestEvents {
@@ -726,7 +769,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
// topologically sort and sanity check that we are making forward progress
- newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
+ newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs()
hasPrevEvent := false
Event:
@@ -749,16 +792,9 @@ Event:
err: err,
}
}
- // process the missing events then the event which started this whole thing
- for _, ev := range append(newEvents, e) {
- err := t.processEvent(ctx, ev, false)
- if err != nil {
- return nil, err
- }
- }
// we processed everything!
- return nil, nil
+ return newEvents, nil
}
func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (