diff options
author | Kegsay <kegan@matrix.org> | 2020-08-26 12:03:09 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-26 12:03:09 +0100 |
commit | abd16ff4a0fe6e67fdddf31edd61d1ced797c7b8 (patch) | |
tree | 870bb878de2f1d1ee8eaf465d550112d879e3c20 /keyserver | |
parent | 3205b9212d76f61301efae3c554bf9c3fbfc94c8 (diff) |
Modify DeviceListUpdater to retry requests according to RetryAfter (#1342)
* Modify DeviceListUpdater to retry requests according to RetryAfter
* Reduce wait time for sytest test pollution
Diffstat (limited to 'keyserver')
-rw-r--r-- | keyserver/internal/device_list_update.go | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 36918256..3fbf31f1 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -310,24 +310,25 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { } } lastProcessed[serverName] = time.Now() - shouldRetry := u.processServer(serverName) + waitTime, shouldRetry := u.processServer(serverName) if shouldRetry { - scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) + scheduledRetries[serverName] = time.Now().Add(waitTime) + go inject(serverName, waitTime) } } } -func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool { +func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) { requestTimeout := time.Minute // max amount of time we want to spend on each request ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() logger := util.GetLogger(ctx).WithField("server_name", serverName) + waitTime := 2 * time.Second // fetch stale device lists userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName}) if err != nil { logger.WithError(err).Error("failed to load stale device lists") - return true + return waitTime, true } hasFailures := false for _, userID := range userIDs { @@ -339,6 +340,10 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) if err != nil { logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") + fcerr, ok := err.(*fedsenderapi.FederationClientError) + if ok && fcerr.RetryAfter > 0 { + waitTime = fcerr.RetryAfter + } hasFailures = true continue } @@ -352,7 +357,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam // always clear the channel to unblock Update calls regardless of success/failure u.clearChannel(userID) } - return hasFailures + return waitTime, hasFailures } func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error { |