aboutsummaryrefslogtreecommitdiff
path: root/keyserver/internal
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-08-26 12:03:09 +0100
committerGitHub <noreply@github.com>2020-08-26 12:03:09 +0100
commitabd16ff4a0fe6e67fdddf31edd61d1ced797c7b8 (patch)
tree870bb878de2f1d1ee8eaf465d550112d879e3c20 /keyserver/internal
parent3205b9212d76f61301efae3c554bf9c3fbfc94c8 (diff)
Modify DeviceListUpdater to retry requests according to RetryAfter (#1342)
* Modify DeviceListUpdater to retry requests according to RetryAfter * Reduce wait time for sytest test pollution
Diffstat (limited to 'keyserver/internal')
-rw-r--r--keyserver/internal/device_list_update.go17
1 files changed, 11 insertions, 6 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index 36918256..3fbf31f1 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -310,24 +310,25 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
}
}
lastProcessed[serverName] = time.Now()
- shouldRetry := u.processServer(serverName)
+ waitTime, shouldRetry := u.processServer(serverName)
if shouldRetry {
- scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
- go inject(serverName, cooloffPeriod)
+ scheduledRetries[serverName] = time.Now().Add(waitTime)
+ go inject(serverName, waitTime)
}
}
}
-func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool {
+func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
requestTimeout := time.Minute // max amount of time we want to spend on each request
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
logger := util.GetLogger(ctx).WithField("server_name", serverName)
+ waitTime := 2 * time.Second
// fetch stale device lists
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
if err != nil {
logger.WithError(err).Error("failed to load stale device lists")
- return true
+ return waitTime, true
}
hasFailures := false
for _, userID := range userIDs {
@@ -339,6 +340,10 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user")
+ fcerr, ok := err.(*fedsenderapi.FederationClientError)
+ if ok && fcerr.RetryAfter > 0 {
+ waitTime = fcerr.RetryAfter
+ }
hasFailures = true
continue
}
@@ -352,7 +357,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
// always clear the channel to unblock Update calls regardless of success/failure
u.clearChannel(userID)
}
- return hasFailures
+ return waitTime, hasFailures
}
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {