diff options
author | Kegsay <kegan@matrix.org> | 2020-08-13 16:43:27 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-13 16:43:27 +0100 |
commit | 20c8f252a7930e07a113e24acc59964e5e19e708 (patch) | |
tree | a8d4b101f8a4c920c4bc774d2abe21a1a295896b /keyserver | |
parent | 4c4732a9c95e03011ee85a311da13594922b58f0 (diff) |
Make 'Device list doesn't change if remote server is down' pass (#1268)
- As a last resort, query the DB when exhausting all possible remote query
endpoints, but keep the field in `failures` so clients can detect that this
is stale data.
- Unblock `DeviceListUpdater.Update` on failures rather than timing out.
- Use a mutex when writing directly to `res`, not just for failures.
Diffstat (limited to 'keyserver')
-rw-r--r-- | keyserver/internal/device_list_update.go | 6 | ||||
-rw-r--r-- | keyserver/internal/internal.go | 129 |
2 files changed, 77 insertions, 58 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 573285e8..c27e291f 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -342,10 +342,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") hasFailures = true - } else { - u.clearChannel(userID) } } + for _, userID := range userIDs { + // always clear the channel to unblock Update calls regardless of success/failure + u.clearChannel(userID) + } return hasFailures } diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index ef52d014..8904d463 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -318,65 +318,12 @@ func (a *KeyInternalAPI) queryRemoteKeys( // allows us to wait until all federation servers have been poked var wg sync.WaitGroup wg.Add(len(domainToDeviceKeys)) - // mutex for failures - var failMu sync.Mutex + // mutex for writing directly to res (e.g failures) + var respMu sync.Mutex // fan out for domain, deviceKeys := range domainToDeviceKeys { - go func(serverName string, devKeys map[string][]string) { - defer wg.Done() - fedCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - // for users who we do not have any knowledge about, try to start doing device list updates for them - // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but - // lack a stream ID. - var userIDsForAllDevices []string - for userID, deviceIDs := range devKeys { - if len(deviceIDs) == 0 { - userIDsForAllDevices = append(userIDsForAllDevices, userID) - delete(devKeys, userID) - } - } - for _, userID := range userIDsForAllDevices { - err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "user_id": userID, - "server": serverName, - }).Error("Failed to manually update device lists for user") - // try to do it via /keys/query - devKeys[userID] = []string{} - continue - } - // refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this - // user so the fact that we're populating all devices here isn't a problem so long as we have devices. - err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "user_id": userID, - "server": serverName, - }).Error("Failed to manually update device lists for user") - // try to do it via /keys/query - devKeys[userID] = []string{} - continue - } - } - if len(devKeys) == 0 { - return - } - queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys) - if err != nil { - failMu.Lock() - res.Failures[serverName] = map[string]interface{}{ - "message": err.Error(), - } - failMu.Unlock() - return - } - resultCh <- &queryKeysResp - }(domain, deviceKeys) + go a.queryRemoteKeysOnServer(ctx, domain, deviceKeys, &wg, &respMu, timeout, resultCh, res) } // Close the result channel when the goroutines have quit so the for .. range exits @@ -399,6 +346,76 @@ func (a *KeyInternalAPI) queryRemoteKeys( } } +func (a *KeyInternalAPI) queryRemoteKeysOnServer( + ctx context.Context, serverName string, devKeys map[string][]string, wg *sync.WaitGroup, + respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys, + res *api.QueryKeysResponse, +) { + defer wg.Done() + fedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + // for users who we do not have any knowledge about, try to start doing device list updates for them + // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but + // lack a stream ID. + var userIDsForAllDevices []string + for userID, deviceIDs := range devKeys { + if len(deviceIDs) == 0 { + userIDsForAllDevices = append(userIDsForAllDevices, userID) + delete(devKeys, userID) + } + } + for _, userID := range userIDsForAllDevices { + err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID) + if err != nil { + logrus.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "user_id": userID, + "server": serverName, + }).Error("Failed to manually update device lists for user") + // try to do it via /keys/query + devKeys[userID] = []string{} + continue + } + // refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this + // user so the fact that we're populating all devices here isn't a problem so long as we have devices. + respMu.Lock() + err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil) + respMu.Unlock() + if err != nil { + logrus.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "user_id": userID, + "server": serverName, + }).Error("Failed to manually update device lists for user") + // try to do it via /keys/query + devKeys[userID] = []string{} + continue + } + } + if len(devKeys) == 0 { + return + } + queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys) + if err == nil { + resultCh <- &queryKeysResp + return + } + respMu.Lock() + res.Failures[serverName] = map[string]interface{}{ + "message": err.Error(), + } + + // last ditch, use the cache only. This is good for when clients hit /keys/query and the remote server + // is down, better to return something than nothing at all. Clients can know about the failure by + // inspecting the failures map though so they can know it's a cached response. + for userID, dkeys := range devKeys { + // drop the error as it's already a failure at this point + _ = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, dkeys) + } + respMu.Unlock() + +} + func (a *KeyInternalAPI) populateResponseWithDeviceKeysFromDatabase( ctx context.Context, res *api.QueryKeysResponse, userID string, deviceIDs []string, ) error { |