aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue/queue.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-02-17 15:16:35 +0000
committerGitHub <noreply@github.com>2021-02-17 15:16:35 +0000
commit8b5cd256cbd473822dc358b958f17c3a381b32c5 (patch)
treed94b89177a59abf65fde756245c50287a8b0bd1a /federationsender/queue/queue.go
parentda797c79982bfc5c88be576b8c5d3df60a9088ed (diff)
Don't hold destination queues in memory forever (#1769)
* Don't hold destination queues in memory forever * Close channels * Fix ordering * Clear more aggressively * clearQueue only called by defer so should be safe to delete queue in any case * Wake queue when created, otherwise cleanup doesn't get called in all cases * Clean up periodically, we hit a race condition otherwise * Tweaks * Don't create queues for blacklisted hosts * Check blacklist properly
Diffstat (limited to 'federationsender/queue/queue.go')
-rw-r--r--federationsender/queue/queue.go34
1 files changed, 25 insertions, 9 deletions
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 4453ddb0..f32ae20f 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -120,7 +120,7 @@ func NewOutgoingQueues(
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
}
for serverName := range serverNames {
- if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() {
+ if queue := queues.getQueue(serverName); queue != nil {
queue.wakeQueueIfNeeded()
}
}
@@ -148,12 +148,16 @@ type queuedEDU struct {
}
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
+ if oqs.statistics.ForServer(destination).Blacklisted() {
+ return nil
+ }
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
- oq := oqs.queues[destination]
- if oq == nil {
+ oq, ok := oqs.queues[destination]
+ if !ok {
destinationQueueTotal.Inc()
oq = &destinationQueue{
+ queues: oqs,
db: oqs.db,
process: oqs.process,
rsAPI: oqs.rsAPI,
@@ -170,6 +174,16 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
return oq
}
+func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
+ oqs.queuesMutex.Lock()
+ defer oqs.queuesMutex.Unlock()
+
+ close(oq.notify)
+ close(oq.interruptBackoff)
+ delete(oqs.queues, oq.destination)
+ destinationQueueTotal.Dec()
+}
+
type ErrorFederationDisabled struct {
Message string
}
@@ -236,7 +250,9 @@ func (oqs *OutgoingQueues) SendEvent(
}
for destination := range destmap {
- oqs.getQueue(destination).sendEvent(ev, nid)
+ if queue := oqs.getQueue(destination); queue != nil {
+ queue.sendEvent(ev, nid)
+ }
}
return nil
@@ -306,7 +322,9 @@ func (oqs *OutgoingQueues) SendEDU(
}
for destination := range destmap {
- oqs.getQueue(destination).sendEDU(e, nid)
+ if queue := oqs.getQueue(destination); queue != nil {
+ queue.sendEDU(e, nid)
+ }
}
return nil
@@ -317,9 +335,7 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
if oqs.disabled {
return
}
- q := oqs.getQueue(srv)
- if q == nil {
- return
+ if queue := oqs.getQueue(srv); queue != nil {
+ queue.wakeQueueIfNeeded()
}
- q.wakeQueueIfNeeded()
}