aboutsummaryrefslogtreecommitdiff
path: root/federationapi/queue/destinationqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/queue/destinationqueue.go')
-rw-r--r--federationapi/queue/destinationqueue.go348
1 files changed, 227 insertions, 121 deletions
diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go
index 5cb8cae1..00e02b2d 100644
--- a/federationapi/queue/destinationqueue.go
+++ b/federationapi/queue/destinationqueue.go
@@ -35,7 +35,7 @@ import (
const (
maxPDUsPerTransaction = 50
- maxEDUsPerTransaction = 50
+ maxEDUsPerTransaction = 100
maxPDUsInMemory = 128
maxEDUsInMemory = 128
queueIdleTimeout = time.Second * 30
@@ -64,7 +64,6 @@ type destinationQueue struct {
pendingPDUs []*queuedPDU // PDUs waiting to be sent
pendingEDUs []*queuedEDU // EDUs waiting to be sent
pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
- interruptBackoff chan bool // interrupts backoff
}
// Send event adds the event to the pending queue for the destination.
@@ -75,6 +74,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
logrus.Errorf("attempt to send nil PDU with destination %q", oq.destination)
return
}
+
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
@@ -102,12 +102,12 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
oq.overflowed.Store(true)
}
oq.pendingMutex.Unlock()
- // Wake up the queue if it's asleep.
- oq.wakeQueueIfNeeded()
- select {
- case oq.notify <- struct{}{}:
- default:
+
+ if !oq.backingOff.Load() {
+ oq.wakeQueueAndNotify()
}
+ } else {
+ oq.overflowed.Store(true)
}
}
@@ -147,12 +147,37 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
oq.overflowed.Store(true)
}
oq.pendingMutex.Unlock()
- // Wake up the queue if it's asleep.
- oq.wakeQueueIfNeeded()
- select {
- case oq.notify <- struct{}{}:
- default:
+
+ if !oq.backingOff.Load() {
+ oq.wakeQueueAndNotify()
}
+ } else {
+ oq.overflowed.Store(true)
+ }
+}
+
+// handleBackoffNotifier is registered as the backoff notification
+// callback with Statistics. It will wakeup and notify the queue
+// if the queue is currently backing off.
+func (oq *destinationQueue) handleBackoffNotifier() {
+ // Only wake up the queue if it is backing off.
+ // Otherwise there is no pending work for the queue to handle
+ // so waking the queue would be a waste of resources.
+ if oq.backingOff.Load() {
+ oq.wakeQueueAndNotify()
+ }
+}
+
+// wakeQueueAndNotify ensures the destination queue is running and notifies it
+// that there is pending work.
+func (oq *destinationQueue) wakeQueueAndNotify() {
+ // Wake up the queue if it's asleep.
+ oq.wakeQueueIfNeeded()
+
+ // Notify the queue that there are events ready to send.
+ select {
+ case oq.notify <- struct{}{}:
+ default:
}
}
@@ -161,10 +186,11 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
// then we will interrupt the backoff, causing any federation
// requests to retry.
func (oq *destinationQueue) wakeQueueIfNeeded() {
- // If we are backing off then interrupt the backoff.
+ // Clear the backingOff flag and update the backoff metrics if it was set.
if oq.backingOff.CompareAndSwap(true, false) {
- oq.interruptBackoff <- true
+ destinationQueueBackingOff.Dec()
}
+
// If we aren't running then wake up the queue.
if !oq.running.Load() {
// Start the queue.
@@ -196,38 +222,54 @@ func (oq *destinationQueue) getPendingFromDatabase() {
gotEDUs[edu.receipt.String()] = struct{}{}
}
+ overflowed := false
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
// We have room in memory for some PDUs - let's request no more than that.
- if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
+ if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, maxPDUsInMemory); err == nil {
+ if len(pdus) == maxPDUsInMemory {
+ overflowed = true
+ }
for receipt, pdu := range pdus {
if _, ok := gotPDUs[receipt.String()]; ok {
continue
}
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
retrieved = true
+ if len(oq.pendingPDUs) == maxPDUsInMemory {
+ break
+ }
}
} else {
logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
}
}
+
if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 {
// We have room in memory for some EDUs - let's request no more than that.
- if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
+ if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, maxEDUsInMemory); err == nil {
+ if len(edus) == maxEDUsInMemory {
+ overflowed = true
+ }
for receipt, edu := range edus {
if _, ok := gotEDUs[receipt.String()]; ok {
continue
}
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
retrieved = true
+ if len(oq.pendingEDUs) == maxEDUsInMemory {
+ break
+ }
}
} else {
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
}
}
+
// If we've retrieved all of the events from the database with room to spare
// in memory then we'll no longer consider this queue to be overflowed.
- if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory {
+ if !overflowed {
oq.overflowed.Store(false)
+ } else {
}
// If we've retrieved some events then notify the destination queue goroutine.
if retrieved {
@@ -238,6 +280,24 @@ func (oq *destinationQueue) getPendingFromDatabase() {
}
}
+// checkNotificationsOnClose checks for any remaining notifications
+// and starts a new backgroundSend goroutine if any exist.
+func (oq *destinationQueue) checkNotificationsOnClose() {
+ // NOTE : If we are stopping the queue due to blacklist then it
+ // doesn't matter if we have been notified of new work since
+ // this queue instance will be deleted anyway.
+ if !oq.statistics.Blacklisted() {
+ select {
+ case <-oq.notify:
+ // We received a new notification in between the
+ // idle timeout firing and stopping the goroutine.
+ // Immediately restart the queue.
+ oq.wakeQueueAndNotify()
+ default:
+ }
+ }
+}
+
// backgroundSend is the worker goroutine for sending events.
func (oq *destinationQueue) backgroundSend() {
// Check if a worker is already running, and if it isn't, then
@@ -245,10 +305,17 @@ func (oq *destinationQueue) backgroundSend() {
if !oq.running.CompareAndSwap(false, true) {
return
}
+
+ // Register queue cleanup functions.
+ // NOTE : The ordering here is very intentional.
+ defer oq.checkNotificationsOnClose()
+ defer oq.running.Store(false)
+
destinationQueueRunning.Inc()
defer destinationQueueRunning.Dec()
- defer oq.queues.clearQueue(oq)
- defer oq.running.Store(false)
+
+ idleTimeout := time.NewTimer(queueIdleTimeout)
+ defer idleTimeout.Stop()
// Mark the queue as overflowed, so we will consult the database
// to see if there's anything new to send.
@@ -261,59 +328,33 @@ func (oq *destinationQueue) backgroundSend() {
oq.getPendingFromDatabase()
}
+ // Reset the queue idle timeout.
+ if !idleTimeout.Stop() {
+ select {
+ case <-idleTimeout.C:
+ default:
+ }
+ }
+ idleTimeout.Reset(queueIdleTimeout)
+
// If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout.
select {
case <-oq.notify:
// There's work to do, either because getPendingFromDatabase
- // told us there is, or because a new event has come in via
- // sendEvent/sendEDU.
- case <-time.After(queueIdleTimeout):
+ // told us there is, a new event has come in via sendEvent/sendEDU,
+ // or we are backing off and it is time to retry.
+ case <-idleTimeout.C:
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
case <-oq.process.Context().Done():
// The parent process is shutting down, so stop.
+ oq.statistics.ClearBackoff()
return
}
- // If we are backing off this server then wait for the
- // backoff duration to complete first, or until explicitly
- // told to retry.
- until, blacklisted := oq.statistics.BackoffInfo()
- if blacklisted {
- // It's been suggested that we should give up because the backoff
- // has exceeded a maximum allowable value. Clean up the in-memory
- // buffers at this point. The PDU clean-up is already on a defer.
- logrus.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
- oq.pendingMutex.Lock()
- for i := range oq.pendingPDUs {
- oq.pendingPDUs[i] = nil
- }
- for i := range oq.pendingEDUs {
- oq.pendingEDUs[i] = nil
- }
- oq.pendingPDUs = nil
- oq.pendingEDUs = nil
- oq.pendingMutex.Unlock()
- return
- }
- if until != nil && until.After(time.Now()) {
- // We haven't backed off yet, so wait for the suggested amount of
- // time.
- duration := time.Until(*until)
- logrus.Debugf("Backing off %q for %s", oq.destination, duration)
- oq.backingOff.Store(true)
- destinationQueueBackingOff.Inc()
- select {
- case <-time.After(duration):
- case <-oq.interruptBackoff:
- }
- destinationQueueBackingOff.Dec()
- oq.backingOff.Store(false)
- }
-
// Work out which PDUs/EDUs to include in the next transaction.
oq.pendingMutex.RLock()
pduCount := len(oq.pendingPDUs)
@@ -328,38 +369,97 @@ func (oq *destinationQueue) backgroundSend() {
toSendEDUs := oq.pendingEDUs[:eduCount]
oq.pendingMutex.RUnlock()
+ // If we didn't get anything from the database and there are no
+ // pending EDUs then there's nothing to do - stop here.
+ if pduCount == 0 && eduCount == 0 {
+ continue
+ }
+
// If we have pending PDUs or EDUs then construct a transaction.
// Try sending the next transaction and see what happens.
- transaction, pc, ec, terr := oq.nextTransaction(toSendPDUs, toSendEDUs)
+ terr := oq.nextTransaction(toSendPDUs, toSendEDUs)
if terr != nil {
// We failed to send the transaction. Mark it as a failure.
- oq.statistics.Failure()
-
- } else if transaction {
- // If we successfully sent the transaction then clear out
- // the pending events and EDUs, and wipe our transaction ID.
- oq.statistics.Success()
- oq.pendingMutex.Lock()
- for i := range oq.pendingPDUs[:pc] {
- oq.pendingPDUs[i] = nil
+ _, blacklisted := oq.statistics.Failure()
+ if !blacklisted {
+ // Register the backoff state and exit the goroutine.
+ // It'll get restarted automatically when the backoff
+ // completes.
+ oq.backingOff.Store(true)
+ destinationQueueBackingOff.Inc()
+ return
+ } else {
+ // Immediately trigger the blacklist logic.
+ oq.blacklistDestination()
+ return
}
- for i := range oq.pendingEDUs[:ec] {
- oq.pendingEDUs[i] = nil
- }
- oq.pendingPDUs = oq.pendingPDUs[pc:]
- oq.pendingEDUs = oq.pendingEDUs[ec:]
- oq.pendingMutex.Unlock()
+ } else {
+ oq.handleTransactionSuccess(pduCount, eduCount)
}
}
}
// nextTransaction creates a new transaction from the pending event
-// queue and sends it. Returns true if a transaction was sent or
-// false otherwise.
+// queue and sends it.
+// Returns an error if the transaction wasn't sent.
func (oq *destinationQueue) nextTransaction(
pdus []*queuedPDU,
edus []*queuedEDU,
-) (bool, int, int, error) {
+) error {
+ // Create the transaction.
+ t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
+ logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
+
+ // Try to send the transaction to the destination server.
+ ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
+ defer cancel()
+ _, err := oq.client.SendTransaction(ctx, t)
+ switch errResponse := err.(type) {
+ case nil:
+ // Clean up the transaction in the database.
+ if pduReceipts != nil {
+ //logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
+ if err = oq.db.CleanPDUs(oq.process.Context(), oq.destination, pduReceipts); err != nil {
+ logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
+ }
+ }
+ if eduReceipts != nil {
+ //logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
+ if err = oq.db.CleanEDUs(oq.process.Context(), oq.destination, eduReceipts); err != nil {
+ logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
+ }
+ }
+ // Reset the transaction ID.
+ oq.transactionIDMutex.Lock()
+ oq.transactionID = ""
+ oq.transactionIDMutex.Unlock()
+ return nil
+ case gomatrix.HTTPError:
+ // Report that we failed to send the transaction and we
+ // will retry again, subject to backoff.
+
+ // TODO: we should check for 500-ish fails vs 400-ish here,
+ // since we shouldn't queue things indefinitely in response
+ // to a 400-ish error
+ code := errResponse.Code
+ logrus.Debug("Transaction failed with HTTP", code)
+ return err
+ default:
+ logrus.WithFields(logrus.Fields{
+ "destination": oq.destination,
+ logrus.ErrorKey: err,
+ }).Debugf("Failed to send transaction %q", t.TransactionID)
+ return err
+ }
+}
+
+// createTransaction generates a gomatrixserverlib.Transaction from the provided pdus and edus.
+// It also returns the associated event receipts so they can be cleaned from the database in
+// the case of a successful transaction.
+func (oq *destinationQueue) createTransaction(
+ pdus []*queuedPDU,
+ edus []*queuedEDU,
+) (gomatrixserverlib.Transaction, []*shared.Receipt, []*shared.Receipt) {
// If there's no projected transaction ID then generate one. If
// the transaction succeeds then we'll set it back to "" so that
// we generate a new one next time. If it fails, we'll preserve
@@ -371,7 +471,6 @@ func (oq *destinationQueue) nextTransaction(
}
oq.transactionIDMutex.Unlock()
- // Create the transaction.
t := gomatrixserverlib.Transaction{
PDUs: []json.RawMessage{},
EDUs: []gomatrixserverlib.EDU{},
@@ -381,18 +480,13 @@ func (oq *destinationQueue) nextTransaction(
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
t.TransactionID = oq.transactionID
- // If we didn't get anything from the database and there are no
- // pending EDUs then there's nothing to do - stop here.
- if len(pdus) == 0 && len(edus) == 0 {
- return false, 0, 0, nil
- }
-
var pduReceipts []*shared.Receipt
var eduReceipts []*shared.Receipt
// Go through PDUs that we retrieved from the database, if any,
// and add them into the transaction.
for _, pdu := range pdus {
+ // These should never be nil.
if pdu == nil || pdu.pdu == nil {
continue
}
@@ -404,6 +498,7 @@ func (oq *destinationQueue) nextTransaction(
// Do the same for pending EDUS in the queue.
for _, edu := range edus {
+ // These should never be nil.
if edu == nil || edu.edu == nil {
continue
}
@@ -411,44 +506,55 @@ func (oq *destinationQueue) nextTransaction(
eduReceipts = append(eduReceipts, edu.receipt)
}
- logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
+ return t, pduReceipts, eduReceipts
+}
- // Try to send the transaction to the destination server.
- // TODO: we should check for 500-ish fails vs 400-ish here,
- // since we shouldn't queue things indefinitely in response
- // to a 400-ish error
- ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
- defer cancel()
- _, err := oq.client.SendTransaction(ctx, t)
- switch err.(type) {
- case nil:
- // Clean up the transaction in the database.
- if pduReceipts != nil {
- //logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
- if err = oq.db.CleanPDUs(oq.process.Context(), oq.destination, pduReceipts); err != nil {
- logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
- }
- }
- if eduReceipts != nil {
- //logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
- if err = oq.db.CleanEDUs(oq.process.Context(), oq.destination, eduReceipts); err != nil {
- logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
- }
+// blacklistDestination removes all pending PDUs and EDUs that have been cached
+// and deletes this queue.
+func (oq *destinationQueue) blacklistDestination() {
+ // It's been suggested that we should give up because the backoff
+ // has exceeded a maximum allowable value. Clean up the in-memory
+ // buffers at this point. The PDU clean-up is already on a defer.
+ logrus.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
+
+ oq.pendingMutex.Lock()
+ for i := range oq.pendingPDUs {
+ oq.pendingPDUs[i] = nil
+ }
+ for i := range oq.pendingEDUs {
+ oq.pendingEDUs[i] = nil
+ }
+ oq.pendingPDUs = nil
+ oq.pendingEDUs = nil
+ oq.pendingMutex.Unlock()
+
+ // Delete this queue as no more messages will be sent to this
+ // destination until it is no longer blacklisted.
+ oq.statistics.AssignBackoffNotifier(nil)
+ oq.queues.clearQueue(oq)
+}
+
+// handleTransactionSuccess updates the cached event queues as well as the success and
+// backoff information for this server.
+func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int) {
+ // If we successfully sent the transaction then clear out
+ // the pending events and EDUs, and wipe our transaction ID.
+ oq.statistics.Success()
+ oq.pendingMutex.Lock()
+ for i := range oq.pendingPDUs[:pduCount] {
+ oq.pendingPDUs[i] = nil
+ }
+ for i := range oq.pendingEDUs[:eduCount] {
+ oq.pendingEDUs[i] = nil
+ }
+ oq.pendingPDUs = oq.pendingPDUs[pduCount:]
+ oq.pendingEDUs = oq.pendingEDUs[eduCount:]
+ oq.pendingMutex.Unlock()
+
+ if len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 {
+ select {
+ case oq.notify <- struct{}{}:
+ default:
}
- // Reset the transaction ID.
- oq.transactionIDMutex.Lock()
- oq.transactionID = ""
- oq.transactionIDMutex.Unlock()
- return true, len(t.PDUs), len(t.EDUs), nil
- case gomatrix.HTTPError:
- // Report that we failed to send the transaction and we
- // will retry again, subject to backoff.
- return false, 0, 0, err
- default:
- logrus.WithFields(logrus.Fields{
- "destination": oq.destination,
- logrus.ErrorKey: err,
- }).Debugf("Failed to send transaction %q", t.TransactionID)
- return false, 0, 0, err
}
}