aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-05 15:48:37 +0100
committerGitHub <noreply@github.com>2020-05-05 15:48:37 +0100
commit31d3b0d4a52aa52b05f85069ae3f08f9fc8b64ab (patch)
tree0dbfe18d64d14bc81e424a3afa4e15467a646cbf /federationapi
parent9d15312ef641e871a2958e2f7037c50902dbfcfe (diff)
Prefer /state_ids when missing state across federation (#1008)
* Prefer /state_ids when missing state across federation * Linting * Better logging
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/routing/send.go134
1 files changed, 124 insertions, 10 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 88411b81..a6072c66 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
)
// Send implements /_matrix/federation/v1/send/{txnID}
@@ -297,22 +298,27 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
- // TODO: Attempt to fetch the state using /state_ids and /events
- state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
+
+ // Attempt to fetch the missing state using /state_ids and /events
+ var respState *gomatrixserverlib.RespState
+ var err error
+ respState, err = t.lookupMissingStateViaStateIDs(e, roomVersion)
if err != nil {
- return err
- }
- // Check that the returned state is valid.
- if err := state.Check(t.context, t.keys); err != nil {
- return err
+ // Fallback to /state
+ util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state")
+ respState, err = t.lookupMissingStateViaState(e, roomVersion)
+ if err != nil {
+ return err
+ }
}
+
// Check that the event is allowed by the state.
retryAllowedState:
- if err := checkAllowedByState(e, state.StateEvents); err != nil {
+ if err := checkAllowedByState(e, respState.StateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
// An auth event was missing so let's look up that event over federation
- for _, s := range state.StateEvents {
+ for _, s := range respState.StateEvents {
if s.EventID() != missing.AuthEventID {
continue
}
@@ -329,5 +335,113 @@ retryAllowedState:
}
// pass the event along with the state to the roomserver
- return t.producer.SendEventWithState(t.context, state, e.Headered(roomVersion))
+ return t.producer.SendEventWithState(t.context, respState, e.Headered(roomVersion))
+}
+
+func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
+ respState *gomatrixserverlib.RespState, err error) {
+ state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
+ if err != nil {
+ return nil, err
+ }
+ // Check that the returned state is valid.
+ if err := state.Check(t.context, t.keys); err != nil {
+ return nil, err
+ }
+ return &state, nil
+}
+
+func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
+ *gomatrixserverlib.RespState, error) {
+
+ // fetch all the state events we do know about: the current state
+ queryReq := api.QueryLatestEventsAndStateRequest{
+ RoomID: e.RoomID(),
+ StateToFetch: nil, // fetch all state
+ }
+ var queryRes api.QueryLatestEventsAndStateResponse
+ if err := t.rsAPI.QueryLatestEventsAndState(t.context, &queryReq, &queryRes); err != nil {
+ return nil, err
+ }
+ if !queryRes.RoomExists {
+ return nil, fmt.Errorf("room %s doesn't exist", e.RoomID())
+ }
+ // allow indexing of current state by event ID
+ haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(queryRes.StateEvents))
+ for i := range queryRes.StateEvents {
+ haveEventMap[queryRes.StateEvents[i].EventID()] = &queryRes.StateEvents[i]
+ }
+
+ // fetch the state event IDs at the time of the event
+ stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID())
+ if err != nil {
+ return nil, err
+ }
+ // work out which auth/state IDs are missing
+ wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...)
+ missing := make(map[string]bool)
+ for _, sid := range wantIDs {
+ if _, ok := haveEventMap[sid]; !ok {
+ missing[sid] = true
+ }
+ }
+ util.GetLogger(t.context).WithFields(logrus.Fields{
+ "missing": len(missing),
+ "event_id": e.EventID(),
+ "room_id": e.RoomID(),
+ "already_have": len(queryRes.StateEvents),
+ "total_state": len(stateIDs.StateEventIDs),
+ "total_auth_events": len(stateIDs.AuthEventIDs),
+ }).Info("Fetching missing state at event")
+
+ for missingEventID := range missing {
+ txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID)
+ if err != nil {
+ util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
+ return nil, err
+ }
+ for _, pdu := range txn.PDUs {
+ event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
+ if err != nil {
+ util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
+ return nil, unmarshalError{err}
+ }
+ if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
+ util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
+ return nil, verifySigError{event.EventID(), err}
+ }
+ h := event.Headered(roomVersion)
+ haveEventMap[event.EventID()] = &h
+ }
+ }
+ return t.createRespStateFromStateIDs(stateIDs, haveEventMap)
+}
+
+func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
+ *gomatrixserverlib.RespState, error) {
+ // create a RespState response using the response to /state_ids as a guide
+ respState := gomatrixserverlib.RespState{
+ AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)),
+ StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)),
+ }
+
+ for i := range stateIDs.StateEventIDs {
+ ev, ok := haveEventMap[stateIDs.StateEventIDs[i]]
+ if !ok {
+ return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
+ }
+ respState.StateEvents[i] = ev.Unwrap()
+ }
+ for i := range stateIDs.AuthEventIDs {
+ ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]]
+ if !ok {
+ return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i])
+ }
+ respState.AuthEvents[i] = ev.Unwrap()
+ }
+ // Check that the returned state is valid.
+ if err := respState.Check(t.context, t.keys); err != nil {
+ return nil, err
+ }
+ return &respState, nil
}