aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/queue')
-rw-r--r--federationsender/queue/destinationqueue.go33
-rw-r--r--federationsender/queue/queue.go16
2 files changed, 47 insertions, 2 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 09dac464..4ab610de 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -39,6 +39,7 @@ type destinationQueue struct {
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
+ backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
@@ -47,6 +48,28 @@ type destinationQueue struct {
pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
+ retryServerCh chan bool // interrupts backoff
+}
+
+// retry will clear the blacklist state and attempt to send built up events to the server,
+// resetting and interrupting any backoff timers.
+func (oq *destinationQueue) retry() {
+ // TODO: We don't send all events in the case where the server has been blacklisted as we
+ // drop events instead then. This means we will send the oldest N events (chan size, currently 128)
+ // and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events
+ // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database
+ // so we can send a lot of events.
+ oq.statistics.Success()
+ // if we were backing off, swap to not backing off and interrupt the select.
+ // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel
+ // as it is unbuffered.
+ if oq.backingOff.CAS(true, false) {
+ oq.retryServerCh <- true
+ }
+ if !oq.running.Load() {
+ log.Infof("Restarting queue for %s", oq.destination)
+ go oq.backgroundSend()
+ }
}
// Send event adds the event to the pending queue for the destination.
@@ -155,9 +178,15 @@ func (oq *destinationQueue) backgroundSend() {
}
// If we are backing off this server then wait for the
- // backoff duration to complete first.
+ // backoff duration to complete first, or until explicitly
+ // told to retry.
if backoff, duration := oq.statistics.BackoffDuration(); backoff {
- <-time.After(duration)
+ oq.backingOff.Store(true)
+ select {
+ case <-time.After(duration):
+ case <-oq.retryServerCh:
+ }
+ oq.backingOff.Store(false)
}
// How many things do we have waiting?
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index aae6c53a..386a3397 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -52,6 +52,12 @@ func NewOutgoingQueues(
}
}
+func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue {
+ oqs.queuesMutex.Lock()
+ defer oqs.queuesMutex.Unlock()
+ return oqs.queues[destination]
+}
+
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@@ -66,6 +72,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
+ retryServerCh: make(chan bool),
}
oqs.queues[destination] = oq
}
@@ -160,6 +167,15 @@ func (oqs *OutgoingQueues) SendEDU(
return nil
}
+// RetryServer attempts to resend events to the given server if we had given up.
+func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
+ q := oqs.getQueueIfExists(srv)
+ if q == nil {
+ return
+ }
+ q.retry()
+}
+
// filterAndDedupeDests removes our own server from the list of destinations
// and deduplicates any servers in the list that may appear more than once.
func filterAndDedupeDests(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) (