aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-13 11:53:20 +0100
committerGitHub <noreply@github.com>2020-10-13 11:53:20 +0100
commit9d6b77c58afd64be07ae82f05e11f5671d936ba8 (patch)
tree025036bb573650ee399bff5804d0e1e4ee25f7e3 /federationapi
parentd7ea814fa80ea2aba671be17c7d985d1191fbf6a (diff)
Try to retrieve missing auth events from multiple servers (#1516)
* Recursively fetch auth events if needed * Fix processEvent call * Ask more servers in lookupEvent * Don't panic! * Panic at the Disco * Find servers more aggressively * Add getServers * Fix number of servers to 5, don't bail making RespState if auth events missing * Fix panic * Ignore missing state events too * Report number of servers correctly * Don't reuse request context for /send_join * Update federation API tests * Don't recurse processEvents * Implement getEvents differently
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/routing/send.go189
-rw-r--r--federationapi/routing/send_test.go2
2 files changed, 119 insertions, 72 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index a6e39821..24e29a18 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -246,9 +246,6 @@ func isProcessingErrorFatal(err error) bool {
type roomNotFoundError struct {
roomID string
}
-type unmarshalError struct {
- err error
-}
type verifySigError struct {
eventID string
err error
@@ -259,7 +256,6 @@ type missingPrevEventsError struct {
}
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
-func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) }
func (e verifySigError) Error() string {
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
}
@@ -338,6 +334,19 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}
+func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
+ servers := []gomatrixserverlib.ServerName{t.Origin}
+ serverReq := &api.QueryServerJoinedToRoomRequest{
+ RoomID: roomID,
+ }
+ serverRes := &api.QueryServerJoinedToRoomResponse{}
+ if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
+ servers = append(servers, serverRes.ServerNames...)
+ util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID)
+ }
+ return servers
+}
+
func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) error {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
@@ -354,7 +363,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
}
var stateResp api.QueryMissingAuthPrevEventsResponse
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
- return err
+ return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
}
if !stateResp.RoomExists {
@@ -369,46 +378,8 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
if len(stateResp.MissingAuthEventIDs) > 0 {
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
-
- servers := []gomatrixserverlib.ServerName{t.Origin}
- serverReq := &api.QueryServerJoinedToRoomRequest{
- RoomID: e.RoomID(),
- }
- serverRes := &api.QueryServerJoinedToRoomResponse{}
- if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
- servers = append(servers, serverRes.ServerNames...)
- logger.Infof("Found %d server(s) to query for missing events", len(servers))
- }
-
- getAuthEvent:
- for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
- for _, server := range servers {
- logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
- tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
- if err != nil {
- continue // try the next server
- }
- ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
- if err != nil {
- logger.WithError(err).Errorf("Failed to unmarshal auth event %q", missingAuthEventID)
- continue // try the next server
- }
- if err = api.SendInputRoomEvents(
- context.Background(),
- t.rsAPI,
- []api.InputRoomEvent{
- {
- Kind: api.KindOutlier,
- Event: ev.Headered(stateResp.RoomVersion),
- AuthEventIDs: ev.AuthEventIDs(),
- SendAsServer: api.DoNotSendToOtherServers,
- },
- },
- ); err != nil {
- logger.WithError(err).Errorf("Failed to send auth event %q to roomserver", missingAuthEventID)
- continue getAuthEvent // move onto the next event
- }
- }
+ if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil {
+ return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err)
}
}
@@ -431,6 +402,60 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
)
}
+func (t *txnReq) retrieveMissingAuthEvents(
+ ctx context.Context, e gomatrixserverlib.Event, stateResp *api.QueryMissingAuthPrevEventsResponse,
+) error {
+ logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
+
+ missingAuthEvents := make(map[string]struct{})
+ for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
+ missingAuthEvents[missingAuthEventID] = struct{}{}
+ }
+
+ servers := t.getServers(ctx, e.RoomID())
+ if len(servers) > 5 {
+ servers = servers[:5]
+ }
+withNextEvent:
+ for missingAuthEventID := range missingAuthEvents {
+ withNextServer:
+ for _, server := range servers {
+ logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
+ tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
+ if err != nil {
+ logger.WithError(err).Warnf("Failed to retrieve auth event %q", missingAuthEventID)
+ continue withNextServer
+ }
+ ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
+ if err != nil {
+ logger.WithError(err).Warnf("Failed to unmarshal auth event %q", missingAuthEventID)
+ continue withNextServer
+ }
+ if err = api.SendInputRoomEvents(
+ context.Background(),
+ t.rsAPI,
+ []api.InputRoomEvent{
+ {
+ Kind: api.KindOutlier,
+ Event: ev.Headered(stateResp.RoomVersion),
+ AuthEventIDs: ev.AuthEventIDs(),
+ SendAsServer: api.DoNotSendToOtherServers,
+ },
+ },
+ ); err != nil {
+ return fmt.Errorf("api.SendEvents: %w", err)
+ }
+ delete(missingAuthEvents, missingAuthEventID)
+ continue withNextEvent
+ }
+ }
+
+ if missing := len(missingAuthEvents); missing > 0 {
+ return fmt.Errorf("Event refers to %d auth_events which we failed to fetch", missing)
+ }
+ return nil
+}
+
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
for i := range stateEvents {
@@ -557,18 +582,23 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
+ }
+
+ servers := t.getServers(ctx, roomID)
+ if len(servers) > 5 {
+ servers = servers[:5]
}
// fetch the event we're missing and add it to the pile
- h, err := t.lookupEvent(ctx, roomVersion, eventID, false)
+ h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
switch err.(type) {
case verifySigError:
return respState, nil
case nil:
// do nothing
default:
- return nil, err
+ return nil, fmt.Errorf("t.lookupEvent: %w", err)
}
t.haveEvents[h.EventID()] = h
if h.StateKey() != nil {
@@ -669,7 +699,11 @@ retryAllowedState:
if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
- h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true)
+ servers := t.getServers(ctx, backwardsExtremity.RoomID())
+ if len(servers) > 5 {
+ servers = servers[:5]
+ }
+ h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
switch err2.(type) {
case verifySigError:
return &gomatrixserverlib.RespState{
@@ -874,6 +908,12 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
"concurrent_requests": concurrentRequests,
}).Info("Fetching missing state at event")
+ // Get a list of servers to fetch from.
+ servers := t.getServers(ctx, roomID)
+ if len(servers) > 5 {
+ servers = servers[:5]
+ }
+
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make(chan string, missingCount)
@@ -899,7 +939,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
// Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent
- h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false)
+ h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
switch err.(type) {
case verifySigError:
return
@@ -937,26 +977,25 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
}
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
- *gomatrixserverlib.RespState, error) {
+ *gomatrixserverlib.RespState, error) { // nolint:unparam
// create a RespState response using the response to /state_ids as a guide
- respState := gomatrixserverlib.RespState{
- AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)),
- StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)),
- }
+ respState := gomatrixserverlib.RespState{}
for i := range stateIDs.StateEventIDs {
ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]
if !ok {
- return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
+ logrus.Warnf("Missing state event in createRespStateFromStateIDs: %s", stateIDs.StateEventIDs[i])
+ continue
}
- respState.StateEvents[i] = ev.Unwrap()
+ respState.StateEvents = append(respState.StateEvents, ev.Unwrap())
}
for i := range stateIDs.AuthEventIDs {
ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]]
if !ok {
- return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i])
+ logrus.Warnf("Missing auth event in createRespStateFromStateIDs: %s", stateIDs.AuthEventIDs[i])
+ continue
}
- respState.AuthEvents[i] = ev.Unwrap()
+ respState.AuthEvents = append(respState.AuthEvents, ev.Unwrap())
}
// We purposefully do not do auth checks on the returned events, as they will still
// be processed in the exact same way, just as a 'rejected' event
@@ -964,7 +1003,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
return &respState, nil
}
-func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
+func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{
@@ -977,19 +1016,27 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
return &queryRes.Events[0], nil
}
}
- txn, err := t.federation.GetEvent(ctx, t.Origin, missingEventID)
- if err != nil || len(txn.PDUs) == 0 {
- util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
- return nil, err
- }
- pdu := txn.PDUs[0]
var event gomatrixserverlib.Event
- event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
- if err != nil {
- util.GetLogger(ctx).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
- return nil, unmarshalError{err}
+ found := false
+ for _, serverName := range servers {
+ txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
+ if err != nil || len(txn.PDUs) == 0 {
+ util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID")
+ continue
+ }
+ event, err = gomatrixserverlib.NewEventFromUntrustedJSON(txn.PDUs[0], roomVersion)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warnf("Transaction: Failed to parse event JSON of event")
+ continue
+ }
+ found = true
+ break
+ }
+ if !found {
+ util.GetLogger(ctx).WithField("event_id", missingEventID).Warnf("Failed to get missing /event for event ID from %d server(s)", len(servers))
+ return nil, fmt.Errorf("wasn't able to find event via %d server(s)", len(servers))
}
- if err = gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
+ if err := gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index d7e42247..0a462433 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -491,7 +491,7 @@ func TestTransactionFailAuthChecks(t *testing.T) {
queryMissingAuthPrevEvents: func(req *api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse {
return api.QueryMissingAuthPrevEventsResponse{
RoomExists: true,
- MissingAuthEventIDs: []string{"create_event"},
+ MissingAuthEventIDs: []string{},
MissingPrevEventIDs: []string{},
}
},