diff options
Diffstat (limited to 'federationsender/queue')
-rw-r--r-- | federationsender/queue/destinationqueue.go | 33 | ||||
-rw-r--r-- | federationsender/queue/queue.go | 16 |
2 files changed, 47 insertions, 2 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 09dac464..4ab610de 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -39,6 +39,7 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName // origin of requests destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send @@ -47,6 +48,28 @@ type destinationQueue struct { pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + retryServerCh chan bool // interrupts backoff +} + +// retry will clear the blacklist state and attempt to send built up events to the server, +// resetting and interrupting any backoff timers. +func (oq *destinationQueue) retry() { + // TODO: We don't send all events in the case where the server has been blacklisted as we + // drop events instead then. This means we will send the oldest N events (chan size, currently 128) + // and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events + // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database + // so we can send a lot of events. + oq.statistics.Success() + // if we were backing off, swap to not backing off and interrupt the select. + // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel + // as it is unbuffered. + if oq.backingOff.CAS(true, false) { + oq.retryServerCh <- true + } + if !oq.running.Load() { + log.Infof("Restarting queue for %s", oq.destination) + go oq.backgroundSend() + } } // Send event adds the event to the pending queue for the destination. @@ -155,9 +178,15 @@ func (oq *destinationQueue) backgroundSend() { } // If we are backing off this server then wait for the - // backoff duration to complete first. + // backoff duration to complete first, or until explicitly + // told to retry. if backoff, duration := oq.statistics.BackoffDuration(); backoff { - <-time.After(duration) + oq.backingOff.Store(true) + select { + case <-time.After(duration): + case <-oq.retryServerCh: + } + oq.backingOff.Store(false) } // How many things do we have waiting? diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index aae6c53a..386a3397 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -52,6 +52,12 @@ func NewOutgoingQueues( } } +func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + return oqs.queues[destination] +} + func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() @@ -66,6 +72,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + retryServerCh: make(chan bool), } oqs.queues[destination] = oq } @@ -160,6 +167,15 @@ func (oqs *OutgoingQueues) SendEDU( return nil } +// RetryServer attempts to resend events to the given server if we had given up. +func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { + q := oqs.getQueueIfExists(srv) + if q == nil { + return + } + q.retry() +} + // filterAndDedupeDests removes our own server from the list of destinations // and deduplicates any servers in the list that may appear more than once. func filterAndDedupeDests(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) ( |