aboutsummaryrefslogtreecommitdiff
path: root/federationapi/routing/send.go
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-12 16:24:28 +0100
committerGitHub <noreply@github.com>2020-05-12 16:24:28 +0100
commitce5dfbebf98a55c0efe4f86ba30956916bf0b3ad (patch)
tree0dfe3d988bbd5cb02754dc9fb2a7e1028e8474e8 /federationapi/routing/send.go
parent32624697fd2d74d4a6e23549b647d323b210fb2a (diff)
Implement /get_missing_events (#1022)
* WIP get_missing_events work * More WIP get_missing_events work * First working /get_missing_events implementation Flakey currently due to racing between /sync and /send * Final tweaks * Remove log lines * Linting * go mod tidy * Clamp min depth to 0 * sort events by depth because sytest makes me sad Specifically I think it's https://github.com/matrix-org/sytest/blob/4172585c2521ec6d640b4b580080276da1ab5353/lib/SyTest/Federation/Client.pm#L265 to blame here.
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r--federationapi/routing/send.go493
1 files changed, 400 insertions, 93 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index e6f91d94..99022074 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -48,6 +48,8 @@ func Send(
eduProducer: eduProducer,
keys: keys,
federation: federation,
+ haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
+ newEvents: make(map[string]bool),
}
var txnEvents struct {
@@ -105,6 +107,11 @@ type txnReq struct {
eduProducer *producers.EDUServerProducer
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
+ // local cache of events for auth checks, etc - this may include events
+ // which the roomserver is unaware of.
+ haveEvents map[string]*gomatrixserverlib.HeaderedEvent
+ // new events which the roomserver does not know about
+ newEvents map[string]bool
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@@ -114,6 +121,8 @@ type txnFederationClient interface {
)
LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
+ LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
+ roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
@@ -148,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Process the events.
for _, e := range pdus {
- err := t.processEvent(e.Unwrap())
+ err := t.processEvent(e.Unwrap(), true)
if err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
@@ -168,7 +177,9 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
switch err.(type) {
case roomNotFoundError:
case *gomatrixserverlib.NotAllowed:
+ case missingPrevEventsError:
default:
+ util.GetLogger(t.context).Warnf("Processing %s failed: %s", e.EventID(), err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
return nil, err
@@ -197,12 +208,30 @@ type verifySigError struct {
eventID string
err error
}
+type missingPrevEventsError struct {
+ eventID string
+ err error
+}
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) }
func (e verifySigError) Error() string {
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
}
+func (e missingPrevEventsError) Error() string {
+ return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
+}
+
+func (t *txnReq) haveEventIDs() map[string]bool {
+ result := make(map[string]bool, len(t.haveEvents))
+ for eventID := range t.haveEvents {
+ if t.newEvents[eventID] {
+ continue
+ }
+ result[eventID] = true
+ }
+ return result
+}
func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for _, e := range edus {
@@ -227,7 +256,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
}
}
-func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
+func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error {
prevEventIDs := e.PrevEventIDs()
// Fetch the state needed to authenticate the event.
@@ -253,21 +282,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
}
if !stateResp.PrevEventsExist {
- return t.processEventWithMissingState(e, stateResp.RoomVersion)
+ return t.processEventWithMissingState(e, stateResp.RoomVersion, isInboundTxn)
}
// Check that the event is allowed by the state at the event.
- var events []gomatrixserverlib.Event
- for _, headeredEvent := range stateResp.StateEvents {
- events = append(events, headeredEvent.Unwrap())
- }
- if err := checkAllowedByState(e, events); err != nil {
+ if err := checkAllowedByState(e, gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents)); err != nil {
return err
}
- // TODO: Check that the roomserver has a copy of all of the auth_events.
- // TODO: Check that the event is allowed by its auth_events.
-
// pass the event to the roomserver
_, err := t.producer.SendEvents(
t.context,
@@ -291,7 +313,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState)
}
-func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
+func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@@ -306,49 +328,315 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
- // TODO: Attempt to fill in the gap using /get_missing_events
- // Attempt to fetch the missing state using /state_ids and /events
- respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion)
+ // Attempt to fill in the gap using /get_missing_events
+ // This will either:
+ // - fill in the gap completely then process event `e` returning no backwards extremity
+ // - fail to fill in the gap and tell us to terminate the transaction err=not nil
+ // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
+ backwardsExtremity, err := t.getMissingEvents(e, roomVersion, isInboundTxn)
if err != nil {
- // Fallback to /state
- util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state")
- respState, err = t.lookupMissingStateViaState(e, roomVersion)
+ return err
+ }
+ if backwardsExtremity == nil {
+ // we filled in the gap!
+ return nil
+ }
+
+ // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
+ // security: we have to do state resolution on the new backwards extremity (TODO: WHY)
+ // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
+ // the state AFTER all the prev_events for this event, then mix in our current room state and apply state resolution
+ // to that to get the state before the event.
+ var states []*gomatrixserverlib.RespState
+ needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
+ for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
+ var prevState *gomatrixserverlib.RespState
+ prevState, err = t.lookupStateAfterEvent(roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
if err != nil {
+ util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return err
}
+ states = append(states, prevState)
+ }
+ // mix in the current room state
+ currState, err := t.lookupCurrentState(backwardsExtremity)
+ if err != nil {
+ util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup current room state")
+ return err
+ }
+ states = append(states, currState)
+ resolvedState, err := t.resolveStatesAndCheck(roomVersion, states, backwardsExtremity)
+ if err != nil {
+ util.GetLogger(t.context).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
+ return err
+ }
+
+ // pass the event along with the state to the roomserver using a background context so we don't
+ // needlessly expire
+ return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs())
+}
+
+// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
+// added into the mix.
+func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) {
+ // try doing all this locally before we resort to querying federation
+ respState := t.lookupStateAfterEventLocally(roomID, eventID, needed)
+ if respState != nil {
+ return respState, nil
+ }
+
+ respState, err := t.lookupStateBeforeEvent(roomVersion, roomID, eventID)
+ if err != nil {
+ return nil, err
+ }
+
+ // fetch the event we're missing and add it to the pile
+ h, err := t.lookupEvent(roomVersion, eventID, false)
+ if err != nil {
+ return nil, err
+ }
+ t.haveEvents[h.EventID()] = h
+ if h.StateKey() != nil {
+ addedToState := false
+ for i := range respState.StateEvents {
+ se := respState.StateEvents[i]
+ if se.Type() == h.Type() && se.StateKeyEquals(*h.StateKey()) {
+ respState.StateEvents[i] = h.Unwrap()
+ addedToState = true
+ break
+ }
+ }
+ if !addedToState {
+ respState.StateEvents = append(respState.StateEvents, h.Unwrap())
+ }
+ }
+
+ return respState, nil
+}
+
+func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState {
+ var res api.QueryStateAfterEventsResponse
+ err := t.rsAPI.QueryStateAfterEvents(t.context, &api.QueryStateAfterEventsRequest{
+ RoomID: roomID,
+ PrevEventIDs: []string{eventID},
+ StateToFetch: needed,
+ }, &res)
+ if err != nil || !res.PrevEventsExist {
+ util.GetLogger(t.context).WithError(err).Warnf("failed to query state after %s locally", eventID)
+ return nil
+ }
+ for i, ev := range res.StateEvents {
+ t.haveEvents[ev.EventID()] = &res.StateEvents[i]
+ }
+ var authEvents []gomatrixserverlib.Event
+ missingAuthEvents := make(map[string]bool)
+ for _, ev := range res.StateEvents {
+ for _, ae := range ev.AuthEventIDs() {
+ aev, ok := t.haveEvents[ae]
+ if ok {
+ authEvents = append(authEvents, aev.Unwrap())
+ } else {
+ missingAuthEvents[ae] = true
+ }
+ }
+ }
+ // QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
+ // have stored the event.
+ var missingEventList []string
+ for evID := range missingAuthEvents {
+ missingEventList = append(missingEventList, evID)
+ }
+ queryReq := api.QueryEventsByIDRequest{
+ EventIDs: missingEventList,
+ }
+ util.GetLogger(t.context).Infof("Fetching missing auth events: %v", missingEventList)
+ var queryRes api.QueryEventsByIDResponse
+ if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
+ return nil
+ }
+ for i := range queryRes.Events {
+ evID := queryRes.Events[i].EventID()
+ t.haveEvents[evID] = &queryRes.Events[i]
+ authEvents = append(authEvents, queryRes.Events[i].Unwrap())
+ }
+
+ evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
+ return &gomatrixserverlib.RespState{
+ StateEvents: evs,
+ AuthEvents: authEvents,
+ }
+}
+
+func (t *txnReq) lookupCurrentState(newEvent *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
+ // Ask the roomserver for information about this room
+ queryReq := api.QueryLatestEventsAndStateRequest{
+ RoomID: newEvent.RoomID(),
+ StateToFetch: gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*newEvent}).Tuples(),
+ }
+ var queryRes api.QueryLatestEventsAndStateResponse
+ if err := t.rsAPI.QueryLatestEventsAndState(t.context, &queryReq, &queryRes); err != nil {
+ return nil, fmt.Errorf("lookupCurrentState rsAPI.QueryLatestEventsAndState: %w", err)
+ }
+ evs := gomatrixserverlib.UnwrapEventHeaders(queryRes.StateEvents)
+ return &gomatrixserverlib.RespState{
+ StateEvents: evs,
+ AuthEvents: evs,
+ }, nil
+}
+
+// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
+// the server supports.
+func (t *txnReq) lookupStateBeforeEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
+ respState *gomatrixserverlib.RespState, err error) {
+
+ util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s", eventID)
+
+ // Attempt to fetch the missing state using /state_ids and /events
+ respState, err = t.lookupMissingStateViaStateIDs(roomID, eventID, roomVersion)
+ if err != nil {
+ // Fallback to /state
+ util.GetLogger(t.context).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state")
+ respState, err = t.lookupMissingStateViaState(roomID, eventID, roomVersion)
}
+ return
+}
- // Check that the event is allowed by the state.
+func (t *txnReq) resolveStatesAndCheck(roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
+ var authEventList []gomatrixserverlib.Event
+ var stateEventList []gomatrixserverlib.Event
+ for _, state := range states {
+ authEventList = append(authEventList, state.AuthEvents...)
+ stateEventList = append(stateEventList, state.StateEvents...)
+ }
+ resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList)
+ if err != nil {
+ return nil, err
+ }
+ // apply the current event
retryAllowedState:
- if err := checkAllowedByState(e, respState.StateEvents); err != nil {
+ if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
- // An auth event was missing so let's look up that event over federation
- for _, s := range respState.StateEvents {
- if s.EventID() != missing.AuthEventID {
- continue
- }
- err = t.processEventWithMissingState(s, roomVersion)
- // If there was no error retrieving the event from federation then
- // we assume that it succeeded, so retry the original state check
- if err == nil {
- goto retryAllowedState
- }
+ h, err2 := t.lookupEvent(roomVersion, missing.AuthEventID, true)
+ if err2 != nil {
+ return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2)
}
+ util.GetLogger(t.context).Infof("fetched event %s", missing.AuthEventID)
+ resolvedStateEvents = append(resolvedStateEvents, h.Unwrap())
+ goto retryAllowedState
default:
}
- return err
+ return nil, err
}
+ return &gomatrixserverlib.RespState{
+ AuthEvents: authEventList,
+ StateEvents: resolvedStateEvents,
+ }, nil
+}
- // pass the event along with the state to the roomserver using a background context so we don't
- // needlessly expire
- return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs)
+// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should
+// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
+// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
+// This means that we may recursively call this function, as we spider back up prev_events to the min depth.
+func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
+ if !isInboundTxn {
+ // we've recursed here, so just take a state snapshot please!
+ return &e, nil
+ }
+ logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
+ needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
+ // query latest events (our trusted forward extremities)
+ req := api.QueryLatestEventsAndStateRequest{
+ RoomID: e.RoomID(),
+ StateToFetch: needed.Tuples(),
+ }
+ var res api.QueryLatestEventsAndStateResponse
+ if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil {
+ logger.WithError(err).Warn("Failed to query latest events")
+ return &e, nil
+ }
+ latestEvents := make([]string, len(res.LatestEvents))
+ for i := range res.LatestEvents {
+ latestEvents[i] = res.LatestEvents[i].EventID
+ }
+ // this server just sent us an event for which we do not know its prev_events - ask that server for those prev_events.
+ minDepth := int(res.Depth) - 20
+ if minDepth < 0 {
+ minDepth = 0
+ }
+ missingResp, err := t.federation.LookupMissingEvents(t.context, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
+ Limit: 20,
+ // synapse uses the min depth they've ever seen in that room
+ MinDepth: minDepth,
+ // The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
+ EarliestEvents: latestEvents,
+ // The event IDs to retrieve the previous events for.
+ LatestEvents: []string{e.EventID()},
+ }, roomVersion)
+
+ // security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
+ // There's 2 scenarios to consider:
+ // - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
+ // - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
+ // In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
+ // as it was called in response to an inbound txn which had it as a prev_event.
+ // In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad
+ // because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked.
+ // https://github.com/matrix-org/synapse/pull/3456
+ // https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
+ // For now, we do not allow Case B, so reject the event.
+ if err != nil {
+ logger.WithError(err).Errorf(
+ "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
+ t.Origin,
+ )
+ return nil, missingPrevEventsError{
+ eventID: e.EventID(),
+ err: err,
+ }
+ }
+ logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
+
+ // topologically sort and sanity check that we are making forward progress
+ newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
+ shouldHaveSomeEventIDs := e.PrevEventIDs()
+ hasPrevEvent := false
+Event:
+ for _, pe := range shouldHaveSomeEventIDs {
+ for _, ev := range newEvents {
+ if ev.EventID() == pe {
+ hasPrevEvent = true
+ break Event
+ }
+ }
+ }
+ if !hasPrevEvent {
+ err = fmt.Errorf("called /get_missing_events but server %s didn't return any prev_events with IDs %v", t.Origin, shouldHaveSomeEventIDs)
+ logger.WithError(err).Errorf(
+ "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
+ t.Origin,
+ )
+ return nil, missingPrevEventsError{
+ eventID: e.EventID(),
+ err: err,
+ }
+ }
+ // process the missing events then the event which started this whole thing
+ for _, ev := range append(newEvents, e) {
+ err := t.processEvent(ev, false)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // we processed everything!
+ return nil, nil
}
-func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
+func (t *txnReq) lookupMissingStateViaState(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
respState *gomatrixserverlib.RespState, err error) {
- state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
+ state, err := t.federation.LookupState(t.context, t.Origin, roomID, eventID, roomVersion)
if err != nil {
return nil, err
}
@@ -359,78 +647,64 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi
return &state, nil
}
-func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
- *gomatrixserverlib.RespState, map[string]bool, error) {
-
+func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
+ *gomatrixserverlib.RespState, error) {
+ util.GetLogger(t.context).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event
- stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID())
+ stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, roomID, eventID)
if err != nil {
- return nil, nil, err
- }
-
- // fetch as many as we can from the roomserver, do them as 2 calls rather than
- // 1 to try to reduce the number of parameters in the bulk query this will use
- haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs))
- haveEventIDs := make(map[string]bool)
- for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} {
- queryReq := api.QueryEventsByIDRequest{
- EventIDs: eventList,
- }
- var queryRes api.QueryEventsByIDResponse
- if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
- return nil, nil, err
- }
- // allow indexing of current state by event ID
- for i := range queryRes.Events {
- haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i]
- haveEventIDs[queryRes.Events[i].EventID()] = true
- }
+ return nil, err
}
-
// work out which auth/state IDs are missing
wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...)
missing := make(map[string]bool)
+ var missingEventList []string
for _, sid := range wantIDs {
- if _, ok := haveEventMap[sid]; !ok {
- missing[sid] = true
+ if _, ok := t.haveEvents[sid]; !ok {
+ if !missing[sid] {
+ missing[sid] = true
+ missingEventList = append(missingEventList, sid)
+ }
+ }
+ }
+
+ // fetch as many as we can from the roomserver
+ queryReq := api.QueryEventsByIDRequest{
+ EventIDs: missingEventList,
+ }
+ var queryRes api.QueryEventsByIDResponse
+ if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
+ return nil, err
+ }
+ for i := range queryRes.Events {
+ evID := queryRes.Events[i].EventID()
+ t.haveEvents[evID] = &queryRes.Events[i]
+ if missing[evID] {
+ delete(missing, evID)
}
}
+
util.GetLogger(t.context).WithFields(logrus.Fields{
"missing": len(missing),
- "event_id": e.EventID(),
- "room_id": e.RoomID(),
- "already_have": len(haveEventMap),
+ "event_id": eventID,
+ "room_id": roomID,
"total_state": len(stateIDs.StateEventIDs),
"total_auth_events": len(stateIDs.AuthEventIDs),
}).Info("Fetching missing state at event")
for missingEventID := range missing {
- var txn gomatrixserverlib.Transaction
- txn, err = t.federation.GetEvent(t.context, t.Origin, missingEventID)
+ var h *gomatrixserverlib.HeaderedEvent
+ h, err = t.lookupEvent(roomVersion, missingEventID, false)
if err != nil {
- util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
- return nil, nil, err
- }
- for _, pdu := range txn.PDUs {
- var event gomatrixserverlib.Event
- event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
- if err != nil {
- util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
- return nil, nil, unmarshalError{err}
- }
- if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
- util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
- return nil, nil, verifySigError{event.EventID(), err}
- }
- h := event.Headered(roomVersion)
- haveEventMap[event.EventID()] = &h
+ return nil, err
}
+ t.haveEvents[h.EventID()] = h
}
- resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap)
- return resp, haveEventIDs, err
+ resp, err := t.createRespStateFromStateIDs(stateIDs)
+ return resp, err
}
-func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
+func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
*gomatrixserverlib.RespState, error) {
// create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{
@@ -439,22 +713,55 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
}
for i := range stateIDs.StateEventIDs {
- ev, ok := haveEventMap[stateIDs.StateEventIDs[i]]
+ ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]
if !ok {
return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
}
respState.StateEvents[i] = ev.Unwrap()
}
for i := range stateIDs.AuthEventIDs {
- ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]]
+ ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]]
if !ok {
return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i])
}
respState.AuthEvents[i] = ev.Unwrap()
}
- // Check that the returned state is valid.
- if err := respState.Check(t.context, t.keys); err != nil {
+ // We purposefully do not do auth checks on the returned events, as they will still
+ // be processed in the exact same way, just as a 'rejected' event
+ // TODO: Add a field to HeaderedEvent to indicate if the event is rejected.
+ return &respState, nil
+}
+
+func (t *txnReq) lookupEvent(roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
+ if localFirst {
+ // fetch from the roomserver
+ queryReq := api.QueryEventsByIDRequest{
+ EventIDs: []string{missingEventID},
+ }
+ var queryRes api.QueryEventsByIDResponse
+ if err := t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
+ util.GetLogger(t.context).Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
+ } else if len(queryRes.Events) == 1 {
+ return &queryRes.Events[0], nil
+ }
+ }
+ txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID)
+ if err != nil || len(txn.PDUs) == 0 {
+ util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
return nil, err
}
- return &respState, nil
+ pdu := txn.PDUs[0]
+ var event gomatrixserverlib.Event
+ event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
+ if err != nil {
+ util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
+ return nil, unmarshalError{err}
+ }
+ if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
+ util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
+ return nil, verifySigError{event.EventID(), err}
+ }
+ h := event.Headered(roomVersion)
+ t.newEvents[h.EventID()] = true
+ return &h, nil
}