diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-02-16 17:12:17 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-16 17:12:17 +0000 |
commit | 5d74a1757f652f1e367a036f931e71bd3da612dd (patch) | |
tree | 1eba4d67889667ce00575be5751c305e4759a9e5 /federationapi | |
parent | f448e8972a1a6974916afc3ab686b342eaf568e2 (diff) |
Don't query for servers so often in /send (#1766)
* Look up servers less often, don't hit API for missing auth events unless there are actually missing auth events
* Remove ResolveConflictsAdhoc (since it is already in GMSL), other tweaks
* Update gomatrixserverlib to matrix-org/gomatrixserverlib#254
* Fix resolve-state
* Initialise t.servers on first use
Diffstat (limited to 'federationapi')
-rw-r--r-- | federationapi/routing/send.go | 108 |
1 files changed, 44 insertions, 64 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 96b5355e..02683aea 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -102,11 +102,13 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - rsAPI api.RoomserverInternalAPI - eduAPI eduserverAPI.EDUServerInputAPI - keyAPI keyapi.KeyInternalAPI - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient + rsAPI api.RoomserverInternalAPI + eduAPI eduserverAPI.EDUServerInputAPI + keyAPI keyapi.KeyInternalAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient + servers []gomatrixserverlib.ServerName + serversMutex sync.RWMutex // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent @@ -404,16 +406,21 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli } func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName { - servers := []gomatrixserverlib.ServerName{t.Origin} + t.serversMutex.Lock() + defer t.serversMutex.Unlock() + if t.servers != nil { + return t.servers + } + t.servers = []gomatrixserverlib.ServerName{t.Origin} serverReq := &api.QueryServerJoinedToRoomRequest{ RoomID: roomID, } serverRes := &api.QueryServerJoinedToRoomResponse{} if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { - servers = append(servers, serverRes.ServerNames...) - util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID) + t.servers = append(t.servers, serverRes.ServerNames...) + util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID) } - return servers + return t.servers } func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { @@ -482,14 +489,10 @@ func (t *txnReq) retrieveMissingAuthEvents( missingAuthEvents[missingAuthEventID] = struct{}{} } - servers := t.getServers(ctx, e.RoomID()) - if len(servers) > 5 { - servers = servers[:5] - } withNextEvent: for missingAuthEventID := range missingAuthEvents { withNextServer: - for _, server := range servers { + for _, server := range t.getServers(ctx, e.RoomID()) { logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server) tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID) if err != nil { @@ -692,13 +695,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) } - servers := t.getServers(ctx, roomID) - if len(servers) > 5 { - servers = servers[:5] - } - // fetch the event we're missing and add it to the pile - h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers) + h, err := t.lookupEvent(ctx, roomVersion, roomID, eventID, false) switch err.(type) { case verifySigError: return respState, false, nil @@ -740,11 +738,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event t.haveEvents[ev.EventID()] = res.StateEvents[i] } var authEvents []*gomatrixserverlib.Event - missingAuthEvents := make(map[string]bool) + missingAuthEvents := map[string]bool{} for _, ev := range res.StateEvents { for _, ae := range ev.AuthEventIDs() { - aev, ok := t.haveEvents[ae] - if ok { + if aev, ok := t.haveEvents[ae]; ok { authEvents = append(authEvents, aev.Unwrap()) } else { missingAuthEvents[ae] = true @@ -753,27 +750,28 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event } // QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't // have stored the event. - var missingEventList []string - for evID := range missingAuthEvents { - missingEventList = append(missingEventList, evID) - } - queryReq := api.QueryEventsByIDRequest{ - EventIDs: missingEventList, - } - util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) - var queryRes api.QueryEventsByIDResponse - if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { - return nil - } - for i := range queryRes.Events { - evID := queryRes.Events[i].EventID() - t.haveEvents[evID] = queryRes.Events[i] - authEvents = append(authEvents, queryRes.Events[i].Unwrap()) + if len(missingAuthEvents) > 0 { + var missingEventList []string + for evID := range missingAuthEvents { + missingEventList = append(missingEventList, evID) + } + queryReq := api.QueryEventsByIDRequest{ + EventIDs: missingEventList, + } + util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) + var queryRes api.QueryEventsByIDResponse + if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { + return nil + } + for i := range queryRes.Events { + evID := queryRes.Events[i].EventID() + t.haveEvents[evID] = queryRes.Events[i] + authEvents = append(authEvents, queryRes.Events[i].Unwrap()) + } } - evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents) return &gomatrixserverlib.RespState{ - StateEvents: evs, + StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents), AuthEvents: authEvents, } } @@ -805,11 +803,7 @@ retryAllowedState: if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil { switch missing := err.(type) { case gomatrixserverlib.MissingAuthEventError: - servers := t.getServers(ctx, backwardsExtremity.RoomID()) - if len(servers) > 5 { - servers = servers[:5] - } - h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers) + h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true) switch err2.(type) { case verifySigError: return &gomatrixserverlib.RespState{ @@ -857,17 +851,8 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even latestEvents[i] = res.LatestEvents[i].EventID } - servers := []gomatrixserverlib.ServerName{t.Origin} - serverReq := &api.QueryServerJoinedToRoomRequest{ - RoomID: e.RoomID(), - } - serverRes := &api.QueryServerJoinedToRoomResponse{} - if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { - servers = append(servers, serverRes.ServerNames...) - logger.Infof("Found %d server(s) to query for missing events", len(servers)) - } - var missingResp *gomatrixserverlib.RespMissingEvents + servers := t.getServers(ctx, e.RoomID()) for _, server := range servers { var m gomatrixserverlib.RespMissingEvents if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ @@ -1015,12 +1000,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even "concurrent_requests": concurrentRequests, }).Info("Fetching missing state at event") - // Get a list of servers to fetch from. - servers := t.getServers(ctx, roomID) - if len(servers) > 5 { - servers = servers[:5] - } - // Create a queue containing all of the missing event IDs that we want // to retrieve. pending := make(chan string, missingCount) @@ -1046,7 +1025,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even // Define what we'll do in order to fetch the missing event ID. fetch := func(missingEventID string) { var h *gomatrixserverlib.HeaderedEvent - h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers) + h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false) switch err.(type) { case verifySigError: return @@ -1112,7 +1091,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat return &respState, nil } -func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) { +func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) { if localFirst { // fetch from the roomserver queryReq := api.QueryEventsByIDRequest{ @@ -1127,6 +1106,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. } var event *gomatrixserverlib.Event found := false + servers := t.getServers(ctx, roomID) for _, serverName := range servers { txn, err := t.federation.GetEvent(ctx, serverName, missingEventID) if err != nil || len(txn.PDUs) == 0 { |