diff options
Diffstat (limited to 'federationsender/queue')
-rw-r--r-- | federationsender/queue/destinationqueue.go | 46 |
1 files changed, 11 insertions, 35 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index aedaeab1..9ccfbace 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -262,15 +262,13 @@ func (oq *destinationQueue) backgroundSend() { // If we are backing off this server then wait for the // backoff duration to complete first, or until explicitly // told to retry. - if backoff, duration := oq.statistics.BackoffDuration(); backoff { - log.WithField("duration", duration).Debugf("Backing off %s", oq.destination) - oq.backingOff.Store(true) - select { - case <-time.After(duration): - case <-oq.interruptBackoff: - log.Debugf("Interrupting backoff for %q", oq.destination) - } - oq.backingOff.Store(false) + if _, giveUp := oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff); giveUp { + // It's been suggested that we should give up because the backoff + // has exceeded a maximum allowable value. Clean up the in-memory + // buffers at this point. The PDU clean-up is already on a defer. + oq.cleanPendingInvites() + log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination) + return } // If we have pending PDUs or EDUs then construct a transaction. @@ -278,24 +276,8 @@ func (oq *destinationQueue) backgroundSend() { // Try sending the next transaction and see what happens. transaction, terr := oq.nextTransaction() if terr != nil { - // We failed to send the transaction. - if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because the backoff - // has exceeded a maximum allowable value. Clean up the in-memory - // buffers at this point. The PDU clean-up is already on a defer. - oq.cleanPendingInvites() - log.Infof("Blacklisting %q due to errors", oq.destination) - return - } else { - // We haven't been told to give up terminally yet but we still have - // PDUs waiting to be sent. By sending a message into the wake chan, - // the next loop iteration will try processing these PDUs again, - // subject to the backoff. - select { - case oq.notifyPDUs <- true: - default: - } - } + // We failed to send the transaction. Mark it as a failure. + oq.statistics.Failure() } else if transaction { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. @@ -307,14 +289,8 @@ func (oq *destinationQueue) backgroundSend() { if len(oq.pendingInvites) > 0 { sent, ierr := oq.nextInvites(oq.pendingInvites) if ierr != nil { - // We failed to send the transaction so increase the - // backoff and give it another go shortly. - if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because - // the backoff has exceeded a maximum allowable value. - log.Infof("Blacklisting %q due to errors", oq.destination) - return - } + // We failed to send the transaction. Mark it as a failure. + oq.statistics.Failure() } else if sent > 0 { // If we successfully sent the invites then clear out // the pending invites. |