diff options
author | Kegsay <kegan@matrix.org> | 2021-04-08 13:50:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-08 13:50:39 +0100 |
commit | b769d5a25ee1dd298899ab9fec0c31e834f9b99e (patch) | |
tree | 53c2e72af0b5b8339aa706485fde04c2fc72c605 /keyserver | |
parent | 5ade348d142012367e6cf4b8c2c65d6fbf357af6 (diff) |
Optimise memory usage when calling /g_m_e (#1819)
* Optimise memory usage when calling /g_m_e
* cache more events
* refactor handling of device list update pokes
* Sigh
Diffstat (limited to 'keyserver')
-rw-r--r-- | keyserver/internal/device_list_update.go | 68 |
1 files changed, 31 insertions, 37 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index bd563ef3..47bfb72c 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -72,10 +72,8 @@ func init() { // we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests // (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse // than being stuck behind foo.bar -// In the event that the query fails, the worker spins up a short-lived goroutine whose sole purpose is to inject the server -// name back into the channel after a certain amount of time. If in the interim the device lists have been updated, then -// the database query will return no stale lists. Reinjection into the channel continues until success or the server terminates, -// when it will be reloaded on startup. +// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is +// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried. type DeviceListUpdater struct { // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 // request to the remote server and race. @@ -297,42 +295,38 @@ func (u *DeviceListUpdater) clearChannel(userID string) { } func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { - // It's possible to get many of the same server name in the channel, so in order - // to prevent processing the same server over and over we keep track of when we - // last made a request to the server. If we get the server name during the cooloff - // period, we'll ignore the poke. - lastProcessed := make(map[gomatrixserverlib.ServerName]time.Time) - // this can't be too long else sytest will give up trying to do a test - cooloffPeriod := 500 * time.Millisecond - shouldProcess := func(srv gomatrixserverlib.ServerName) bool { - // we should process requests when now is after the last process time + cooloff - return time.Now().After(lastProcessed[srv].Add(cooloffPeriod)) - } - - // on failure, spin up a short-lived goroutine to inject the server name again. - scheduledRetries := make(map[gomatrixserverlib.ServerName]time.Time) - inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) { - time.Sleep(duration) - ch <- srv - } - - for serverName := range ch { - if !shouldProcess(serverName) { - if time.Now().Before(scheduledRetries[serverName]) { - // do not inject into the channel as we know there will be a sleeping goroutine - // which will do it after the cooloff period expires - continue - } else { - scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) - continue + retries := make(map[gomatrixserverlib.ServerName]time.Time) + retriesMu := &sync.Mutex{} + // restarter goroutine which will inject failed servers into ch when it is time + go func() { + for { + var serversToRetry []gomatrixserverlib.ServerName + time.Sleep(time.Second) + retriesMu.Lock() + now := time.Now() + for srv, retryAt := range retries { + if now.After(retryAt) { + serversToRetry = append(serversToRetry, srv) + } + } + for _, srv := range serversToRetry { + delete(retries, srv) + } + retriesMu.Unlock() + for _, srv := range serversToRetry { + ch <- srv } } - lastProcessed[serverName] = time.Now() + }() + for serverName := range ch { waitTime, shouldRetry := u.processServer(serverName) if shouldRetry { - scheduledRetries[serverName] = time.Now().Add(waitTime) - go inject(serverName, waitTime) + retriesMu.Lock() + _, exists := retries[serverName] + if !exists { + retries[serverName] = time.Now().Add(waitTime) + } + retriesMu.Unlock() } } } @@ -380,7 +374,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam } } if failCount > 0 { - logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users") + logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Error("failed to query device keys for some users") } for _, userID := range userIDs { // always clear the channel to unblock Update calls regardless of success/failure |