aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--federationapi/queue/queue.go41
-rw-r--r--keyserver/internal/device_list_update.go9
2 files changed, 30 insertions, 20 deletions
diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go
index dcd09085..5b548127 100644
--- a/federationapi/queue/queue.go
+++ b/federationapi/queue/queue.go
@@ -104,28 +104,31 @@ func NewOutgoingQueues(
}
// Look up which servers we have pending items for and then rehydrate those queues.
if !disabled {
- time.AfterFunc(time.Second*5, func() {
- serverNames := map[gomatrixserverlib.ServerName]struct{}{}
- if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
- for _, serverName := range names {
- serverNames[serverName] = struct{}{}
- }
- } else {
- log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
+ serverNames := map[gomatrixserverlib.ServerName]struct{}{}
+ if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
+ for _, serverName := range names {
+ serverNames[serverName] = struct{}{}
}
- if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
- for _, serverName := range names {
- serverNames[serverName] = struct{}{}
- }
- } else {
- log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
+ } else {
+ log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
+ }
+ if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
+ for _, serverName := range names {
+ serverNames[serverName] = struct{}{}
}
- for serverName := range serverNames {
- if queue := queues.getQueue(serverName); queue != nil {
- queue.wakeQueueIfNeeded()
- }
+ } else {
+ log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
+ }
+ offset, step := time.Second*5, time.Second
+ if max := len(serverNames); max > 120 {
+ step = (time.Second * 120) / time.Duration(max)
+ }
+ for serverName := range serverNames {
+ if queue := queues.getQueue(serverName); queue != nil {
+ time.AfterFunc(offset, queue.wakeQueueIfNeeded)
+ offset += step
}
- })
+ }
}
return queues
}
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index 4b2b8c18..561c9a16 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -157,8 +157,15 @@ func (u *DeviceListUpdater) Start() error {
if err != nil {
return err
}
+ offset, step := time.Second*10, time.Second
+ if max := len(staleLists); max > 120 {
+ step = (time.Second * 120) / time.Duration(max)
+ }
for _, userID := range staleLists {
- u.notifyWorkers(userID)
+ time.AfterFunc(offset, func() {
+ u.notifyWorkers(userID)
+ })
+ offset += step
}
return nil
}