aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2022-02-09 20:31:24 +0000
committerGitHub <noreply@github.com>2022-02-09 20:31:24 +0000
commitaa5c3b88dea207410461820ee480b002d185aa54 (patch)
tree7249679bd4c8d84f796dbd11198074dd98a0ee53 /roomserver
parentcc688a9a386f48e38687a697b50f9be7d2b06fb0 (diff)
Unmarshal events at the Dendrite level not GMSL level (#2164)
* Use new event json types in gmsl * Fix EventJSON to actually unmarshal events * Update GMSL * Bump GMSL and improve error messages * Send back the correct RespState * Update GMSL
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/wrapper.go9
-rw-r--r--roomserver/internal/input/input_events.go2
-rw-r--r--roomserver/internal/input/input_missing.go79
3 files changed, 57 insertions, 33 deletions
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index e9b94e48..012094c6 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -51,7 +51,7 @@ func SendEventWithState(
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool,
) error {
- outliers, err := state.Events()
+ outliers, err := state.Events(event.RoomVersion)
if err != nil {
return err
}
@@ -68,9 +68,10 @@ func SendEventWithState(
})
}
- stateEventIDs := make([]string, len(state.StateEvents))
- for i := range state.StateEvents {
- stateEventIDs[i] = state.StateEvents[i].EventID()
+ stateEvents := state.StateEvents.UntrustedEvents(event.RoomVersion)
+ stateEventIDs := make([]string, len(stateEvents))
+ for i := range stateEvents {
+ stateEventIDs[i] = stateEvents[i].EventID()
}
ires = append(ires, InputRoomEvent{
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index bb35ade9..774e71dd 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -438,7 +438,7 @@ func (r *Inputer) fetchAuthEvents(
isRejected := false
nextAuthEvent:
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
- res.AuthEvents,
+ res.AuthEvents.UntrustedEvents(event.RoomVersion),
gomatrixserverlib.TopologicalOrderByAuthEvents,
) {
// If we already know about this event from the database then we don't
diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go
index 7a72b038..497c049d 100644
--- a/roomserver/internal/input/input_missing.go
+++ b/roomserver/internal/input/input_missing.go
@@ -18,6 +18,11 @@ import (
"github.com/sirupsen/logrus"
)
+type parsedRespState struct {
+ AuthEvents []*gomatrixserverlib.Event
+ StateEvents []*gomatrixserverlib.Event
+}
+
type missingStateReq struct {
origin gomatrixserverlib.ServerName
db *shared.RoomUpdater
@@ -98,7 +103,7 @@ func (t *missingStateReq) processEventWithMissingState(
// That's because the state will have been through state resolution once
// already in QueryStateAfterEvent.
trustworthy bool
- *gomatrixserverlib.RespState
+ *parsedRespState
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
@@ -125,7 +130,7 @@ func (t *missingStateReq) processEventWithMissingState(
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
- resolvedState := &gomatrixserverlib.RespState{}
+ resolvedState := &parsedRespState{}
switch len(states) {
case 0:
extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
@@ -140,16 +145,16 @@ func (t *missingStateReq) processEventWithMissingState(
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again.
if states[0].trustworthy {
- resolvedState = states[0].RespState
+ resolvedState = states[0].parsedRespState
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default:
- respStates := make([]*gomatrixserverlib.RespState, len(states))
+ respStates := make([]*parsedRespState, len(states))
for i := range states {
- respStates[i] = states[i].RespState
+ respStates[i] = states[i].parsedRespState
}
// There's more than one previous state - run them all through state res
t.roomsMu.Lock(e.RoomID())
@@ -169,7 +174,7 @@ func (t *missingStateReq) processEventWithMissingState(
t.hadEventsMutex.Unlock()
// Send outliers first so we can send the new backwards extremity without causing errors
- outliers, err := resolvedState.Events()
+ outliers, err := gomatrixserverlib.OrderAuthAndStateEvents(resolvedState.AuthEvents, resolvedState.StateEvents, roomVersion)
if err != nil {
return err
}
@@ -239,7 +244,7 @@ func (t *missingStateReq) processEventWithMissingState(
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
-func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) {
+func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
// try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
if respState != nil {
@@ -290,7 +295,7 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *g
return ev
}
-func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
+func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState {
var res api.QueryStateAfterEventsResponse
err := t.queryer.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
RoomID: roomID,
@@ -345,7 +350,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
queryRes.Events = nil
}
- return &gomatrixserverlib.RespState{
+ return &parsedRespState{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
AuthEvents: authEvents,
}
@@ -354,13 +359,13 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
// the server supports.
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
- *gomatrixserverlib.RespState, error) {
+ *parsedRespState, error) {
// Attempt to fetch the missing state using /state_ids and /events
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
}
-func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
+func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
var authEventList []*gomatrixserverlib.Event
var stateEventList []*gomatrixserverlib.Event
for _, state := range states {
@@ -379,7 +384,7 @@ retryAllowedState:
h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
switch err2.(type) {
case verifySigError:
- return &gomatrixserverlib.RespState{
+ return &parsedRespState{
AuthEvents: authEventList,
StateEvents: resolvedStateEvents,
}, nil
@@ -395,7 +400,7 @@ retryAllowedState:
}
return nil, err
}
- return &gomatrixserverlib.RespState{
+ return &parsedRespState{
AuthEvents: authEventList,
StateEvents: resolvedStateEvents,
}, nil
@@ -452,12 +457,21 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
// Make sure events from the missingResp are using the cache - missing events
// will be added and duplicates will be removed.
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
- for i, ev := range missingResp.Events {
- missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
+ missingEvents := make([]*gomatrixserverlib.Event, len(missingResp.Events))
+ for i, evJSON := range missingResp.Events {
+ ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(evJSON, roomVersion)
+ if err != nil {
+ logger.WithError(err).WithField("event", string(evJSON)).Warn("NewEventFromUntrustedJSON: failed")
+ return nil, false, missingPrevEventsError{
+ eventID: e.EventID(),
+ err: err,
+ }
+ }
+ missingEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
// topologically sort and sanity check that we are making forward progress
- newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
+ newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs()
hasPrevEvent := false
Event:
@@ -498,29 +512,37 @@ Event:
return newEvents, true, nil
}
-func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
- respState *gomatrixserverlib.RespState, err error) {
+func (t *missingStateReq) lookupMissingStateViaState(
+ ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
+) (respState *parsedRespState, err error) {
state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion)
if err != nil {
return nil, err
}
// Check that the returned state is valid.
- if err := state.Check(ctx, t.keys, nil); err != nil {
+ if err := state.Check(ctx, roomVersion, t.keys, nil); err != nil {
return nil, err
}
+ parsedState := &parsedRespState{
+ AuthEvents: make([]*gomatrixserverlib.Event, len(state.AuthEvents)),
+ StateEvents: make([]*gomatrixserverlib.Event, len(state.StateEvents)),
+ }
// Cache the results of this state lookup and deduplicate anything we already
// have in the cache, freeing up memory.
- for i, ev := range state.AuthEvents {
- state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
+ // We load these as trusted as we called state.Check before which loaded them as untrusted.
+ for i, evJSON := range state.AuthEvents {
+ ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
+ parsedState.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
- for i, ev := range state.StateEvents {
- state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
+ for i, evJSON := range state.StateEvents {
+ ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
+ parsedState.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
- return &state, nil
+ return parsedState, nil
}
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
- *gomatrixserverlib.RespState, error) {
+ *parsedRespState, error) {
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID)
@@ -652,13 +674,14 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
return resp, err
}
-func (t *missingStateReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
- *gomatrixserverlib.RespState, error) { // nolint:unparam
+func (t *missingStateReq) createRespStateFromStateIDs(
+ stateIDs gomatrixserverlib.RespStateIDs,
+) (*parsedRespState, error) { // nolint:unparam
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
// create a RespState response using the response to /state_ids as a guide
- respState := gomatrixserverlib.RespState{}
+ respState := parsedRespState{}
for i := range stateIDs.StateEventIDs {
ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]