aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2021-04-08 13:50:39 +0100
committerGitHub <noreply@github.com>2021-04-08 13:50:39 +0100
commitb769d5a25ee1dd298899ab9fec0c31e834f9b99e (patch)
tree53c2e72af0b5b8339aa706485fde04c2fc72c605 /keyserver
parent5ade348d142012367e6cf4b8c2c65d6fbf357af6 (diff)
Optimise memory usage when calling /g_m_e (#1819)
* Optimise memory usage when calling /g_m_e * cache more events * refactor handling of device list update pokes * Sigh
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/internal/device_list_update.go68
1 files changed, 31 insertions, 37 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index bd563ef3..47bfb72c 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -72,10 +72,8 @@ func init() {
// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests
// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse
// than being stuck behind foo.bar
-// In the event that the query fails, the worker spins up a short-lived goroutine whose sole purpose is to inject the server
-// name back into the channel after a certain amount of time. If in the interim the device lists have been updated, then
-// the database query will return no stale lists. Reinjection into the channel continues until success or the server terminates,
-// when it will be reloaded on startup.
+// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
+// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
type DeviceListUpdater struct {
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
// request to the remote server and race.
@@ -297,42 +295,38 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
}
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
- // It's possible to get many of the same server name in the channel, so in order
- // to prevent processing the same server over and over we keep track of when we
- // last made a request to the server. If we get the server name during the cooloff
- // period, we'll ignore the poke.
- lastProcessed := make(map[gomatrixserverlib.ServerName]time.Time)
- // this can't be too long else sytest will give up trying to do a test
- cooloffPeriod := 500 * time.Millisecond
- shouldProcess := func(srv gomatrixserverlib.ServerName) bool {
- // we should process requests when now is after the last process time + cooloff
- return time.Now().After(lastProcessed[srv].Add(cooloffPeriod))
- }
-
- // on failure, spin up a short-lived goroutine to inject the server name again.
- scheduledRetries := make(map[gomatrixserverlib.ServerName]time.Time)
- inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) {
- time.Sleep(duration)
- ch <- srv
- }
-
- for serverName := range ch {
- if !shouldProcess(serverName) {
- if time.Now().Before(scheduledRetries[serverName]) {
- // do not inject into the channel as we know there will be a sleeping goroutine
- // which will do it after the cooloff period expires
- continue
- } else {
- scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
- go inject(serverName, cooloffPeriod)
- continue
+ retries := make(map[gomatrixserverlib.ServerName]time.Time)
+ retriesMu := &sync.Mutex{}
+ // restarter goroutine which will inject failed servers into ch when it is time
+ go func() {
+ for {
+ var serversToRetry []gomatrixserverlib.ServerName
+ time.Sleep(time.Second)
+ retriesMu.Lock()
+ now := time.Now()
+ for srv, retryAt := range retries {
+ if now.After(retryAt) {
+ serversToRetry = append(serversToRetry, srv)
+ }
+ }
+ for _, srv := range serversToRetry {
+ delete(retries, srv)
+ }
+ retriesMu.Unlock()
+ for _, srv := range serversToRetry {
+ ch <- srv
}
}
- lastProcessed[serverName] = time.Now()
+ }()
+ for serverName := range ch {
waitTime, shouldRetry := u.processServer(serverName)
if shouldRetry {
- scheduledRetries[serverName] = time.Now().Add(waitTime)
- go inject(serverName, waitTime)
+ retriesMu.Lock()
+ _, exists := retries[serverName]
+ if !exists {
+ retries[serverName] = time.Now().Add(waitTime)
+ }
+ retriesMu.Unlock()
}
}
}
@@ -380,7 +374,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
}
}
if failCount > 0 {
- logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users")
+ logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Error("failed to query device keys for some users")
}
for _, userID := range userIDs {
// always clear the channel to unblock Update calls regardless of success/failure