aboutsummaryrefslogtreecommitdiff
path: root/federationapi/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/queue/queue.go')
-rw-r--r--federationapi/queue/queue.go26
1 files changed, 15 insertions, 11 deletions
diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go
index 8245aa5b..68f789e3 100644
--- a/federationapi/queue/queue.go
+++ b/federationapi/queue/queue.go
@@ -162,23 +162,25 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
if !ok || oq == nil {
destinationQueueTotal.Inc()
oq = &destinationQueue{
- queues: oqs,
- db: oqs.db,
- process: oqs.process,
- rsAPI: oqs.rsAPI,
- origin: oqs.origin,
- destination: destination,
- client: oqs.client,
- statistics: oqs.statistics.ForServer(destination),
- notify: make(chan struct{}, 1),
- interruptBackoff: make(chan bool),
- signing: oqs.signing,
+ queues: oqs,
+ db: oqs.db,
+ process: oqs.process,
+ rsAPI: oqs.rsAPI,
+ origin: oqs.origin,
+ destination: destination,
+ client: oqs.client,
+ statistics: oqs.statistics.ForServer(destination),
+ notify: make(chan struct{}, 1),
+ signing: oqs.signing,
}
+ oq.statistics.AssignBackoffNotifier(oq.handleBackoffNotifier)
oqs.queues[destination] = oq
}
return oq
}
+// clearQueue removes the queue for the provided destination from the
+// set of destination queues.
func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@@ -332,7 +334,9 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
if oqs.disabled {
return
}
+ oqs.statistics.ForServer(srv).RemoveBlacklist()
if queue := oqs.getQueue(srv); queue != nil {
+ queue.statistics.ClearBackoff()
queue.wakeQueueIfNeeded()
}
}