diff options
author | kegsay <kegan@matrix.org> | 2022-02-09 20:31:24 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-09 20:31:24 +0000 |
commit | aa5c3b88dea207410461820ee480b002d185aa54 (patch) | |
tree | 7249679bd4c8d84f796dbd11198074dd98a0ee53 /roomserver | |
parent | cc688a9a386f48e38687a697b50f9be7d2b06fb0 (diff) |
Unmarshal events at the Dendrite level not GMSL level (#2164)
* Use new event json types in gmsl
* Fix EventJSON to actually unmarshal events
* Update GMSL
* Bump GMSL and improve error messages
* Send back the correct RespState
* Update GMSL
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/api/wrapper.go | 9 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 2 | ||||
-rw-r--r-- | roomserver/internal/input/input_missing.go | 79 |
3 files changed, 57 insertions, 33 deletions
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index e9b94e48..012094c6 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -51,7 +51,7 @@ func SendEventWithState( state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool, ) error { - outliers, err := state.Events() + outliers, err := state.Events(event.RoomVersion) if err != nil { return err } @@ -68,9 +68,10 @@ func SendEventWithState( }) } - stateEventIDs := make([]string, len(state.StateEvents)) - for i := range state.StateEvents { - stateEventIDs[i] = state.StateEvents[i].EventID() + stateEvents := state.StateEvents.UntrustedEvents(event.RoomVersion) + stateEventIDs := make([]string, len(stateEvents)) + for i := range stateEvents { + stateEventIDs[i] = stateEvents[i].EventID() } ires = append(ires, InputRoomEvent{ diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index bb35ade9..774e71dd 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -438,7 +438,7 @@ func (r *Inputer) fetchAuthEvents( isRejected := false nextAuthEvent: for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering( - res.AuthEvents, + res.AuthEvents.UntrustedEvents(event.RoomVersion), gomatrixserverlib.TopologicalOrderByAuthEvents, ) { // If we already know about this event from the database then we don't diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 7a72b038..497c049d 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -18,6 +18,11 @@ import ( "github.com/sirupsen/logrus" ) +type parsedRespState struct { + AuthEvents []*gomatrixserverlib.Event + StateEvents []*gomatrixserverlib.Event +} + type missingStateReq struct { origin gomatrixserverlib.ServerName db *shared.RoomUpdater @@ -98,7 +103,7 @@ func (t *missingStateReq) processEventWithMissingState( // That's because the state will have been through state resolution once // already in QueryStateAfterEvent. trustworthy bool - *gomatrixserverlib.RespState + *parsedRespState } // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity. @@ -125,7 +130,7 @@ func (t *missingStateReq) processEventWithMissingState( // 1. Ensures that the state is deduplicated fully for each state-key tuple // 2. Ensures that we pick the latest events from both sets, in the case that // one of the prev_events is quite a bit older than the others - resolvedState := &gomatrixserverlib.RespState{} + resolvedState := &parsedRespState{} switch len(states) { case 0: extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("") @@ -140,16 +145,16 @@ func (t *missingStateReq) processEventWithMissingState( // local state snapshot which will already have been through state res), // use it as-is. There's no point in resolving it again. if states[0].trustworthy { - resolvedState = states[0].RespState + resolvedState = states[0].parsedRespState break } // Otherwise, if it isn't trustworthy (came from federation), run it through // state resolution anyway for safety, in case there are duplicates. fallthrough default: - respStates := make([]*gomatrixserverlib.RespState, len(states)) + respStates := make([]*parsedRespState, len(states)) for i := range states { - respStates[i] = states[i].RespState + respStates[i] = states[i].parsedRespState } // There's more than one previous state - run them all through state res t.roomsMu.Lock(e.RoomID()) @@ -169,7 +174,7 @@ func (t *missingStateReq) processEventWithMissingState( t.hadEventsMutex.Unlock() // Send outliers first so we can send the new backwards extremity without causing errors - outliers, err := resolvedState.Events() + outliers, err := gomatrixserverlib.OrderAuthAndStateEvents(resolvedState.AuthEvents, resolvedState.StateEvents, roomVersion) if err != nil { return err } @@ -239,7 +244,7 @@ func (t *missingStateReq) processEventWithMissingState( // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // added into the mix. -func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) { +func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) { // try doing all this locally before we resort to querying federation respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID) if respState != nil { @@ -290,7 +295,7 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *g return ev } -func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState { +func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState { var res api.QueryStateAfterEventsResponse err := t.queryer.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{ RoomID: roomID, @@ -345,7 +350,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room queryRes.Events = nil } - return &gomatrixserverlib.RespState{ + return &parsedRespState{ StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents), AuthEvents: authEvents, } @@ -354,13 +359,13 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room // lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what // the server supports. func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) ( - *gomatrixserverlib.RespState, error) { + *parsedRespState, error) { // Attempt to fetch the missing state using /state_ids and /events return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion) } -func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) { +func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) { var authEventList []*gomatrixserverlib.Event var stateEventList []*gomatrixserverlib.Event for _, state := range states { @@ -379,7 +384,7 @@ retryAllowedState: h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true) switch err2.(type) { case verifySigError: - return &gomatrixserverlib.RespState{ + return &parsedRespState{ AuthEvents: authEventList, StateEvents: resolvedStateEvents, }, nil @@ -395,7 +400,7 @@ retryAllowedState: } return nil, err } - return &gomatrixserverlib.RespState{ + return &parsedRespState{ AuthEvents: authEventList, StateEvents: resolvedStateEvents, }, nil @@ -452,12 +457,21 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve // Make sure events from the missingResp are using the cache - missing events // will be added and duplicates will be removed. logger.Debugf("get_missing_events returned %d events", len(missingResp.Events)) - for i, ev := range missingResp.Events { - missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() + missingEvents := make([]*gomatrixserverlib.Event, len(missingResp.Events)) + for i, evJSON := range missingResp.Events { + ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(evJSON, roomVersion) + if err != nil { + logger.WithError(err).WithField("event", string(evJSON)).Warn("NewEventFromUntrustedJSON: failed") + return nil, false, missingPrevEventsError{ + eventID: e.EventID(), + err: err, + } + } + missingEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() } // topologically sort and sanity check that we are making forward progress - newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents) + newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents) shouldHaveSomeEventIDs := e.PrevEventIDs() hasPrevEvent := false Event: @@ -498,29 +512,37 @@ Event: return newEvents, true, nil } -func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( - respState *gomatrixserverlib.RespState, err error) { +func (t *missingStateReq) lookupMissingStateViaState( + ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, +) (respState *parsedRespState, err error) { state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion) if err != nil { return nil, err } // Check that the returned state is valid. - if err := state.Check(ctx, t.keys, nil); err != nil { + if err := state.Check(ctx, roomVersion, t.keys, nil); err != nil { return nil, err } + parsedState := &parsedRespState{ + AuthEvents: make([]*gomatrixserverlib.Event, len(state.AuthEvents)), + StateEvents: make([]*gomatrixserverlib.Event, len(state.StateEvents)), + } // Cache the results of this state lookup and deduplicate anything we already // have in the cache, freeing up memory. - for i, ev := range state.AuthEvents { - state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() + // We load these as trusted as we called state.Check before which loaded them as untrusted. + for i, evJSON := range state.AuthEvents { + ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion) + parsedState.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() } - for i, ev := range state.StateEvents { - state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() + for i, evJSON := range state.StateEvents { + ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion) + parsedState.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() } - return &state, nil + return parsedState, nil } func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( - *gomatrixserverlib.RespState, error) { + *parsedRespState, error) { util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID) // fetch the state event IDs at the time of the event stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID) @@ -652,13 +674,14 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo return resp, err } -func (t *missingStateReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) ( - *gomatrixserverlib.RespState, error) { // nolint:unparam +func (t *missingStateReq) createRespStateFromStateIDs( + stateIDs gomatrixserverlib.RespStateIDs, +) (*parsedRespState, error) { // nolint:unparam t.haveEventsMutex.Lock() defer t.haveEventsMutex.Unlock() // create a RespState response using the response to /state_ids as a guide - respState := gomatrixserverlib.RespState{} + respState := parsedRespState{} for i := range stateIDs.StateEventIDs { ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]] |