aboutsummaryrefslogtreecommitdiff
path: root/keyserver/internal
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-08-13 16:43:27 +0100
committerGitHub <noreply@github.com>2020-08-13 16:43:27 +0100
commit20c8f252a7930e07a113e24acc59964e5e19e708 (patch)
treea8d4b101f8a4c920c4bc774d2abe21a1a295896b /keyserver/internal
parent4c4732a9c95e03011ee85a311da13594922b58f0 (diff)
Make 'Device list doesn't change if remote server is down' pass (#1268)
- As a last resort, query the DB when exhausting all possible remote query endpoints, but keep the field in `failures` so clients can detect that this is stale data. - Unblock `DeviceListUpdater.Update` on failures rather than timing out. - Use a mutex when writing directly to `res`, not just for failures.
Diffstat (limited to 'keyserver/internal')
-rw-r--r--keyserver/internal/device_list_update.go6
-rw-r--r--keyserver/internal/internal.go129
2 files changed, 77 insertions, 58 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index 573285e8..c27e291f 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -342,10 +342,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
hasFailures = true
- } else {
- u.clearChannel(userID)
}
}
+ for _, userID := range userIDs {
+ // always clear the channel to unblock Update calls regardless of success/failure
+ u.clearChannel(userID)
+ }
return hasFailures
}
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index ef52d014..8904d463 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -318,65 +318,12 @@ func (a *KeyInternalAPI) queryRemoteKeys(
// allows us to wait until all federation servers have been poked
var wg sync.WaitGroup
wg.Add(len(domainToDeviceKeys))
- // mutex for failures
- var failMu sync.Mutex
+ // mutex for writing directly to res (e.g failures)
+ var respMu sync.Mutex
// fan out
for domain, deviceKeys := range domainToDeviceKeys {
- go func(serverName string, devKeys map[string][]string) {
- defer wg.Done()
- fedCtx, cancel := context.WithTimeout(ctx, timeout)
- defer cancel()
- // for users who we do not have any knowledge about, try to start doing device list updates for them
- // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but
- // lack a stream ID.
- var userIDsForAllDevices []string
- for userID, deviceIDs := range devKeys {
- if len(deviceIDs) == 0 {
- userIDsForAllDevices = append(userIDsForAllDevices, userID)
- delete(devKeys, userID)
- }
- }
- for _, userID := range userIDsForAllDevices {
- err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID)
- if err != nil {
- logrus.WithFields(logrus.Fields{
- logrus.ErrorKey: err,
- "user_id": userID,
- "server": serverName,
- }).Error("Failed to manually update device lists for user")
- // try to do it via /keys/query
- devKeys[userID] = []string{}
- continue
- }
- // refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this
- // user so the fact that we're populating all devices here isn't a problem so long as we have devices.
- err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil)
- if err != nil {
- logrus.WithFields(logrus.Fields{
- logrus.ErrorKey: err,
- "user_id": userID,
- "server": serverName,
- }).Error("Failed to manually update device lists for user")
- // try to do it via /keys/query
- devKeys[userID] = []string{}
- continue
- }
- }
- if len(devKeys) == 0 {
- return
- }
- queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys)
- if err != nil {
- failMu.Lock()
- res.Failures[serverName] = map[string]interface{}{
- "message": err.Error(),
- }
- failMu.Unlock()
- return
- }
- resultCh <- &queryKeysResp
- }(domain, deviceKeys)
+ go a.queryRemoteKeysOnServer(ctx, domain, deviceKeys, &wg, &respMu, timeout, resultCh, res)
}
// Close the result channel when the goroutines have quit so the for .. range exits
@@ -399,6 +346,76 @@ func (a *KeyInternalAPI) queryRemoteKeys(
}
}
+func (a *KeyInternalAPI) queryRemoteKeysOnServer(
+ ctx context.Context, serverName string, devKeys map[string][]string, wg *sync.WaitGroup,
+ respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys,
+ res *api.QueryKeysResponse,
+) {
+ defer wg.Done()
+ fedCtx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ // for users who we do not have any knowledge about, try to start doing device list updates for them
+ // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but
+ // lack a stream ID.
+ var userIDsForAllDevices []string
+ for userID, deviceIDs := range devKeys {
+ if len(deviceIDs) == 0 {
+ userIDsForAllDevices = append(userIDsForAllDevices, userID)
+ delete(devKeys, userID)
+ }
+ }
+ for _, userID := range userIDsForAllDevices {
+ err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ logrus.ErrorKey: err,
+ "user_id": userID,
+ "server": serverName,
+ }).Error("Failed to manually update device lists for user")
+ // try to do it via /keys/query
+ devKeys[userID] = []string{}
+ continue
+ }
+ // refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this
+ // user so the fact that we're populating all devices here isn't a problem so long as we have devices.
+ respMu.Lock()
+ err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil)
+ respMu.Unlock()
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ logrus.ErrorKey: err,
+ "user_id": userID,
+ "server": serverName,
+ }).Error("Failed to manually update device lists for user")
+ // try to do it via /keys/query
+ devKeys[userID] = []string{}
+ continue
+ }
+ }
+ if len(devKeys) == 0 {
+ return
+ }
+ queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys)
+ if err == nil {
+ resultCh <- &queryKeysResp
+ return
+ }
+ respMu.Lock()
+ res.Failures[serverName] = map[string]interface{}{
+ "message": err.Error(),
+ }
+
+ // last ditch, use the cache only. This is good for when clients hit /keys/query and the remote server
+ // is down, better to return something than nothing at all. Clients can know about the failure by
+ // inspecting the failures map though so they can know it's a cached response.
+ for userID, dkeys := range devKeys {
+ // drop the error as it's already a failure at this point
+ _ = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, dkeys)
+ }
+ respMu.Unlock()
+
+}
+
func (a *KeyInternalAPI) populateResponseWithDeviceKeysFromDatabase(
ctx context.Context, res *api.QueryKeysResponse, userID string, deviceIDs []string,
) error {