aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-07 18:50:29 +0100
committerGitHub <noreply@github.com>2020-08-07 18:50:29 +0100
commit58998e98746c7d8610fa1e1c3c11de0445393454 (patch)
treef2936930940fa4451049ad7ab320e8973efa54e4 /federationsender/queue
parent366fd975c8ad9ad654845d562d9b87749f1f8e37 (diff)
Backoff fixes (#1250)
* Backoff fixes * Update comments * Fix destination queue * Log why we're blacklisting * Fix logic fail * Logging level * Fix bug * Maybe fix that bug after all * Fix debug output * Fix tests
Diffstat (limited to 'federationsender/queue')
-rw-r--r--federationsender/queue/destinationqueue.go46
1 files changed, 11 insertions, 35 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index aedaeab1..9ccfbace 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -262,15 +262,13 @@ func (oq *destinationQueue) backgroundSend() {
// If we are backing off this server then wait for the
// backoff duration to complete first, or until explicitly
// told to retry.
- if backoff, duration := oq.statistics.BackoffDuration(); backoff {
- log.WithField("duration", duration).Debugf("Backing off %s", oq.destination)
- oq.backingOff.Store(true)
- select {
- case <-time.After(duration):
- case <-oq.interruptBackoff:
- log.Debugf("Interrupting backoff for %q", oq.destination)
- }
- oq.backingOff.Store(false)
+ if _, giveUp := oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff); giveUp {
+ // It's been suggested that we should give up because the backoff
+ // has exceeded a maximum allowable value. Clean up the in-memory
+ // buffers at this point. The PDU clean-up is already on a defer.
+ oq.cleanPendingInvites()
+ log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
+ return
}
// If we have pending PDUs or EDUs then construct a transaction.
@@ -278,24 +276,8 @@ func (oq *destinationQueue) backgroundSend() {
// Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction()
if terr != nil {
- // We failed to send the transaction.
- if giveUp := oq.statistics.Failure(); giveUp {
- // It's been suggested that we should give up because the backoff
- // has exceeded a maximum allowable value. Clean up the in-memory
- // buffers at this point. The PDU clean-up is already on a defer.
- oq.cleanPendingInvites()
- log.Infof("Blacklisting %q due to errors", oq.destination)
- return
- } else {
- // We haven't been told to give up terminally yet but we still have
- // PDUs waiting to be sent. By sending a message into the wake chan,
- // the next loop iteration will try processing these PDUs again,
- // subject to the backoff.
- select {
- case oq.notifyPDUs <- true:
- default:
- }
- }
+ // We failed to send the transaction. Mark it as a failure.
+ oq.statistics.Failure()
} else if transaction {
// If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID.
@@ -307,14 +289,8 @@ func (oq *destinationQueue) backgroundSend() {
if len(oq.pendingInvites) > 0 {
sent, ierr := oq.nextInvites(oq.pendingInvites)
if ierr != nil {
- // We failed to send the transaction so increase the
- // backoff and give it another go shortly.
- if giveUp := oq.statistics.Failure(); giveUp {
- // It's been suggested that we should give up because
- // the backoff has exceeded a maximum allowable value.
- log.Infof("Blacklisting %q due to errors", oq.destination)
- return
- }
+ // We failed to send the transaction. Mark it as a failure.
+ oq.statistics.Failure()
} else if sent > 0 {
// If we successfully sent the invites then clear out
// the pending invites.