diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-07 18:50:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-07 18:50:29 +0100 |
commit | 58998e98746c7d8610fa1e1c3c11de0445393454 (patch) | |
tree | f2936930940fa4451049ad7ab320e8973efa54e4 /federationsender/queue | |
parent | 366fd975c8ad9ad654845d562d9b87749f1f8e37 (diff) |
Backoff fixes (#1250)
* Backoff fixes
* Update comments
* Fix destination queue
* Log why we're blacklisting
* Fix logic fail
* Logging level
* Fix bug
* Maybe fix that bug after all
* Fix debug output
* Fix tests
Diffstat (limited to 'federationsender/queue')
-rw-r--r-- | federationsender/queue/destinationqueue.go | 46 |
1 files changed, 11 insertions, 35 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index aedaeab1..9ccfbace 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -262,15 +262,13 @@ func (oq *destinationQueue) backgroundSend() { // If we are backing off this server then wait for the // backoff duration to complete first, or until explicitly // told to retry. - if backoff, duration := oq.statistics.BackoffDuration(); backoff { - log.WithField("duration", duration).Debugf("Backing off %s", oq.destination) - oq.backingOff.Store(true) - select { - case <-time.After(duration): - case <-oq.interruptBackoff: - log.Debugf("Interrupting backoff for %q", oq.destination) - } - oq.backingOff.Store(false) + if _, giveUp := oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff); giveUp { + // It's been suggested that we should give up because the backoff + // has exceeded a maximum allowable value. Clean up the in-memory + // buffers at this point. The PDU clean-up is already on a defer. + oq.cleanPendingInvites() + log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination) + return } // If we have pending PDUs or EDUs then construct a transaction. @@ -278,24 +276,8 @@ func (oq *destinationQueue) backgroundSend() { // Try sending the next transaction and see what happens. transaction, terr := oq.nextTransaction() if terr != nil { - // We failed to send the transaction. - if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because the backoff - // has exceeded a maximum allowable value. Clean up the in-memory - // buffers at this point. The PDU clean-up is already on a defer. - oq.cleanPendingInvites() - log.Infof("Blacklisting %q due to errors", oq.destination) - return - } else { - // We haven't been told to give up terminally yet but we still have - // PDUs waiting to be sent. By sending a message into the wake chan, - // the next loop iteration will try processing these PDUs again, - // subject to the backoff. - select { - case oq.notifyPDUs <- true: - default: - } - } + // We failed to send the transaction. Mark it as a failure. + oq.statistics.Failure() } else if transaction { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. @@ -307,14 +289,8 @@ func (oq *destinationQueue) backgroundSend() { if len(oq.pendingInvites) > 0 { sent, ierr := oq.nextInvites(oq.pendingInvites) if ierr != nil { - // We failed to send the transaction so increase the - // backoff and give it another go shortly. - if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because - // the backoff has exceeded a maximum allowable value. - log.Infof("Blacklisting %q due to errors", oq.destination) - return - } + // We failed to send the transaction. Mark it as a failure. + oq.statistics.Failure() } else if sent > 0 { // If we successfully sent the invites then clear out // the pending invites. |