aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/queue/queue.go')
-rw-r--r--federationsender/queue/queue.go16
1 files changed, 16 insertions, 0 deletions
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index aae6c53a..386a3397 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -52,6 +52,12 @@ func NewOutgoingQueues(
}
}
+func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue {
+ oqs.queuesMutex.Lock()
+ defer oqs.queuesMutex.Unlock()
+ return oqs.queues[destination]
+}
+
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@@ -66,6 +72,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
+ retryServerCh: make(chan bool),
}
oqs.queues[destination] = oq
}
@@ -160,6 +167,15 @@ func (oqs *OutgoingQueues) SendEDU(
return nil
}
+// RetryServer attempts to resend events to the given server if we had given up.
+func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
+ q := oqs.getQueueIfExists(srv)
+ if q == nil {
+ return
+ }
+ q.retry()
+}
+
// filterAndDedupeDests removes our own server from the list of destinations
// and deduplicates any servers in the list that may appear more than once.
func filterAndDedupeDests(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) (