diff options
-rw-r--r-- | federationapi/queue/queue.go | 41 | ||||
-rw-r--r-- | keyserver/internal/device_list_update.go | 9 |
2 files changed, 30 insertions, 20 deletions
diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index dcd09085..5b548127 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -104,28 +104,31 @@ func NewOutgoingQueues( } // Look up which servers we have pending items for and then rehydrate those queues. if !disabled { - time.AfterFunc(time.Second*5, func() { - serverNames := map[gomatrixserverlib.ServerName]struct{}{} - if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} - } - } else { - log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") + serverNames := map[gomatrixserverlib.ServerName]struct{}{} + if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} } - if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} - } - } else { - log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") + } else { + log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") + } + if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} } - for serverName := range serverNames { - if queue := queues.getQueue(serverName); queue != nil { - queue.wakeQueueIfNeeded() - } + } else { + log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") + } + offset, step := time.Second*5, time.Second + if max := len(serverNames); max > 120 { + step = (time.Second * 120) / time.Duration(max) + } + for serverName := range serverNames { + if queue := queues.getQueue(serverName); queue != nil { + time.AfterFunc(offset, queue.wakeQueueIfNeeded) + offset += step } - }) + } } return queues } diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 4b2b8c18..561c9a16 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -157,8 +157,15 @@ func (u *DeviceListUpdater) Start() error { if err != nil { return err } + offset, step := time.Second*10, time.Second + if max := len(staleLists); max > 120 { + step = (time.Second * 120) / time.Duration(max) + } for _, userID := range staleLists { - u.notifyWorkers(userID) + time.AfterFunc(offset, func() { + u.notifyWorkers(userID) + }) + offset += step } return nil } |