aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/routing/send.go108
1 files changed, 44 insertions, 64 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 96b5355e..02683aea 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -102,11 +102,13 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
- rsAPI api.RoomserverInternalAPI
- eduAPI eduserverAPI.EDUServerInputAPI
- keyAPI keyapi.KeyInternalAPI
- keys gomatrixserverlib.JSONVerifier
- federation txnFederationClient
+ rsAPI api.RoomserverInternalAPI
+ eduAPI eduserverAPI.EDUServerInputAPI
+ keyAPI keyapi.KeyInternalAPI
+ keys gomatrixserverlib.JSONVerifier
+ federation txnFederationClient
+ servers []gomatrixserverlib.ServerName
+ serversMutex sync.RWMutex
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
@@ -404,16 +406,21 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
- servers := []gomatrixserverlib.ServerName{t.Origin}
+ t.serversMutex.Lock()
+ defer t.serversMutex.Unlock()
+ if t.servers != nil {
+ return t.servers
+ }
+ t.servers = []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
- servers = append(servers, serverRes.ServerNames...)
- util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID)
+ t.servers = append(t.servers, serverRes.ServerNames...)
+ util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
}
- return servers
+ return t.servers
}
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
@@ -482,14 +489,10 @@ func (t *txnReq) retrieveMissingAuthEvents(
missingAuthEvents[missingAuthEventID] = struct{}{}
}
- servers := t.getServers(ctx, e.RoomID())
- if len(servers) > 5 {
- servers = servers[:5]
- }
withNextEvent:
for missingAuthEventID := range missingAuthEvents {
withNextServer:
- for _, server := range servers {
+ for _, server := range t.getServers(ctx, e.RoomID()) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
@@ -692,13 +695,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
}
- servers := t.getServers(ctx, roomID)
- if len(servers) > 5 {
- servers = servers[:5]
- }
-
// fetch the event we're missing and add it to the pile
- h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
+ h, err := t.lookupEvent(ctx, roomVersion, roomID, eventID, false)
switch err.(type) {
case verifySigError:
return respState, false, nil
@@ -740,11 +738,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
t.haveEvents[ev.EventID()] = res.StateEvents[i]
}
var authEvents []*gomatrixserverlib.Event
- missingAuthEvents := make(map[string]bool)
+ missingAuthEvents := map[string]bool{}
for _, ev := range res.StateEvents {
for _, ae := range ev.AuthEventIDs() {
- aev, ok := t.haveEvents[ae]
- if ok {
+ if aev, ok := t.haveEvents[ae]; ok {
authEvents = append(authEvents, aev.Unwrap())
} else {
missingAuthEvents[ae] = true
@@ -753,27 +750,28 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
- var missingEventList []string
- for evID := range missingAuthEvents {
- missingEventList = append(missingEventList, evID)
- }
- queryReq := api.QueryEventsByIDRequest{
- EventIDs: missingEventList,
- }
- util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
- var queryRes api.QueryEventsByIDResponse
- if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
- return nil
- }
- for i := range queryRes.Events {
- evID := queryRes.Events[i].EventID()
- t.haveEvents[evID] = queryRes.Events[i]
- authEvents = append(authEvents, queryRes.Events[i].Unwrap())
+ if len(missingAuthEvents) > 0 {
+ var missingEventList []string
+ for evID := range missingAuthEvents {
+ missingEventList = append(missingEventList, evID)
+ }
+ queryReq := api.QueryEventsByIDRequest{
+ EventIDs: missingEventList,
+ }
+ util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
+ var queryRes api.QueryEventsByIDResponse
+ if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
+ return nil
+ }
+ for i := range queryRes.Events {
+ evID := queryRes.Events[i].EventID()
+ t.haveEvents[evID] = queryRes.Events[i]
+ authEvents = append(authEvents, queryRes.Events[i].Unwrap())
+ }
}
- evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
return &gomatrixserverlib.RespState{
- StateEvents: evs,
+ StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
AuthEvents: authEvents,
}
}
@@ -805,11 +803,7 @@ retryAllowedState:
if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
- servers := t.getServers(ctx, backwardsExtremity.RoomID())
- if len(servers) > 5 {
- servers = servers[:5]
- }
- h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
+ h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
switch err2.(type) {
case verifySigError:
return &gomatrixserverlib.RespState{
@@ -857,17 +851,8 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
latestEvents[i] = res.LatestEvents[i].EventID
}
- servers := []gomatrixserverlib.ServerName{t.Origin}
- serverReq := &api.QueryServerJoinedToRoomRequest{
- RoomID: e.RoomID(),
- }
- serverRes := &api.QueryServerJoinedToRoomResponse{}
- if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
- servers = append(servers, serverRes.ServerNames...)
- logger.Infof("Found %d server(s) to query for missing events", len(servers))
- }
-
var missingResp *gomatrixserverlib.RespMissingEvents
+ servers := t.getServers(ctx, e.RoomID())
for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
@@ -1015,12 +1000,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
"concurrent_requests": concurrentRequests,
}).Info("Fetching missing state at event")
- // Get a list of servers to fetch from.
- servers := t.getServers(ctx, roomID)
- if len(servers) > 5 {
- servers = servers[:5]
- }
-
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make(chan string, missingCount)
@@ -1046,7 +1025,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
// Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent
- h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
+ h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
switch err.(type) {
case verifySigError:
return
@@ -1112,7 +1091,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
return &respState, nil
}
-func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) {
+func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{
@@ -1127,6 +1106,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
}
var event *gomatrixserverlib.Event
found := false
+ servers := t.getServers(ctx, roomID)
for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {