aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-09 10:03:22 +0000
committerGitHub <noreply@github.com>2020-12-09 10:03:22 +0000
commitf64c8822bc5b930d65ef32792df4d7b279143bce (patch)
treec958145e8fb45bb03738272de8c30668e1b49fe0 /federationsender
parente1d32e2ff1bef346836f6b9d795d33ec49dd3864 (diff)
Federation sender refactor (#1621)
* Refactor federation sender, again * Clean up better * Missing operators * Try to get overflowed events from database * Fix queries * Log less * Comments * nil PDUs/EDUs shouldn't happen but guard against them for safety * Tweak logging * Fix transaction coalescing * Update comments * Check nils more * Remove channels as they add extra complexity and possibly will deadlock * Don't hold lock while sending transaction * Less spam about sleeping queues * Comments * Bug-fixing * Don't try to rehydrate twice * Don't queue in memory for blacklisted destinations * Don't queue in memory for blacklisted destinations * Fix a couple of bugs * Check for duplicates when pulling things out of the database * Durable transactions, some more refactoring * Revert "Durable transactions, some more refactoring" This reverts commit 5daf924eaaefec5e4f7c12c16ca24e898de4adbb. * Fix deadlock
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/queue/destinationqueue.go352
-rw-r--r--federationsender/queue/queue.go22
-rw-r--r--federationsender/storage/interface.go10
-rw-r--r--federationsender/storage/postgres/queue_pdus_table.go37
-rw-r--r--federationsender/storage/shared/storage.go14
-rw-r--r--federationsender/storage/shared/storage_edus.go47
-rw-r--r--federationsender/storage/shared/storage_pdus.go57
-rw-r--r--federationsender/storage/sqlite3/queue_pdus_table.go15
-rw-r--r--federationsender/storage/tables/interface.go3
9 files changed, 292 insertions, 265 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 29fef705..31eeaebc 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -35,6 +35,8 @@ import (
const (
maxPDUsPerTransaction = 50
maxEDUsPerTransaction = 50
+ maxPDUsInMemory = 128
+ maxEDUsInMemory = 128
queueIdleTimeout = time.Second * 30
)
@@ -51,54 +53,56 @@ type destinationQueue struct {
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
+ overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
statistics *statistics.ServerStatistics // statistics about this remote server
transactionIDMutex sync.Mutex // protects transactionID
- transactionID gomatrixserverlib.TransactionID // last transaction ID
- transactionCount atomic.Int32 // how many events in this transaction so far
- notifyPDUs chan bool // interrupts idle wait for PDUs
- notifyEDUs chan bool // interrupts idle wait for EDUs
+ transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful
+ notify chan struct{} // interrupts idle wait pending PDUs/EDUs
+ pendingPDUs []*queuedPDU // PDUs waiting to be sent
+ pendingEDUs []*queuedEDU // EDUs waiting to be sent
+ pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
interruptBackoff chan bool // interrupts backoff
}
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
-func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
- // Create a transaction ID. We'll either do this if we don't have
- // one made up yet, or if we've exceeded the number of maximum
- // events allowed in a single tranaction. We'll reset the counter
- // when we do.
- oq.transactionIDMutex.Lock()
- if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
- now := gomatrixserverlib.AsTimestamp(time.Now())
- oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
- oq.transactionCount.Store(0)
+func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) {
+ if event == nil {
+ log.Errorf("attempt to send nil PDU with destination %q", oq.destination)
+ return
}
- oq.transactionIDMutex.Unlock()
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
if err := oq.db.AssociatePDUWithDestination(
context.TODO(),
- oq.transactionID, // the current transaction ID
- oq.destination, // the destination server name
- receipt, // NIDs from federationsender_queue_json table
+ "", // TODO: remove this, as we don't need to persist the transaction ID
+ oq.destination, // the destination server name
+ receipt, // NIDs from federationsender_queue_json table
); err != nil {
- log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination)
+ log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
return
}
- // We've successfully added a PDU to the transaction so increase
- // the counter.
- oq.transactionCount.Add(1)
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
+ // If there's room in memory to hold the event then add it to the
+ // list.
+ oq.pendingMutex.Lock()
+ if len(oq.pendingPDUs) < maxPDUsInMemory {
+ oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
+ pdu: event,
+ receipt: receipt,
+ })
+ } else {
+ oq.overflowed.Store(true)
+ }
+ oq.pendingMutex.Unlock()
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
- // If we're blocking on waiting PDUs then tell the queue that we
- // have work to do.
select {
- case oq.notifyPDUs <- true:
+ case oq.notify <- struct{}{}:
default:
}
}
@@ -107,7 +111,11 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
-func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
+func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) {
+ if event == nil {
+ log.Errorf("attempt to send nil EDU with destination %q", oq.destination)
+ return
+ }
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
@@ -116,21 +124,28 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table
); err != nil {
- log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination)
+ log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
return
}
- // We've successfully added an EDU to the transaction so increase
- // the counter.
- oq.transactionCount.Add(1)
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
+ // If there's room in memory to hold the event then add it to the
+ // list.
+ oq.pendingMutex.Lock()
+ if len(oq.pendingEDUs) < maxEDUsInMemory {
+ oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
+ edu: event,
+ receipt: receipt,
+ })
+ } else {
+ oq.overflowed.Store(true)
+ }
+ oq.pendingMutex.Unlock()
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
- // If we're blocking on waiting EDUs then tell the queue that we
- // have work to do.
select {
- case oq.notifyEDUs <- true:
+ case oq.notify <- struct{}{}:
default:
}
}
@@ -152,48 +167,71 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
}
}
-// waitForPDUs returns a channel for pending PDUs, which will be
-// used in backgroundSend select. It returns a closed channel if
-// there is something pending right now, or an open channel if
-// we're waiting for something.
-func (oq *destinationQueue) waitForPDUs() chan bool {
- pendingPDUs, err := oq.db.GetPendingPDUCount(context.TODO(), oq.destination)
- if err != nil {
- log.WithError(err).Errorf("Failed to get pending PDU count on queue %q", oq.destination)
+// getPendingFromDatabase will look at the database and see if
+// there are any persisted events that haven't been sent to this
+// destination yet. If so, they will be queued up.
+// nolint:gocyclo
+func (oq *destinationQueue) getPendingFromDatabase() {
+ // Check to see if there's anything to do for this server
+ // in the database.
+ retrieved := false
+ ctx := context.Background()
+ oq.pendingMutex.Lock()
+ defer oq.pendingMutex.Unlock()
+
+ // Take a note of all of the PDUs and EDUs that we already
+ // have cached. We will index them based on the receipt,
+ // which ultimately just contains the index of the PDU/EDU
+ // in the database.
+ gotPDUs := map[string]struct{}{}
+ gotEDUs := map[string]struct{}{}
+ for _, pdu := range oq.pendingPDUs {
+ gotPDUs[pdu.receipt.String()] = struct{}{}
}
- // If there are PDUs pending right now then we'll return a closed
- // channel. This will mean that the backgroundSend will not block.
- if pendingPDUs > 0 {
- ch := make(chan bool, 1)
- close(ch)
- return ch
+ for _, edu := range oq.pendingEDUs {
+ gotEDUs[edu.receipt.String()] = struct{}{}
}
- // If there are no PDUs pending right now then instead we'll return
- // the notify channel, so that backgroundSend can pick up normal
- // notifications from sendEvent.
- return oq.notifyPDUs
-}
-// waitForEDUs returns a channel for pending EDUs, which will be
-// used in backgroundSend select. It returns a closed channel if
-// there is something pending right now, or an open channel if
-// we're waiting for something.
-func (oq *destinationQueue) waitForEDUs() chan bool {
- pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination)
- if err != nil {
- log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination)
+ if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
+ // We have room in memory for some PDUs - let's request no more than that.
+ if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
+ for receipt, pdu := range pdus {
+ if _, ok := gotPDUs[receipt.String()]; ok {
+ continue
+ }
+ oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
+ retrieved = true
+ }
+ } else {
+ logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
+ }
}
- // If there are EDUs pending right now then we'll return a closed
- // channel. This will mean that the backgroundSend will not block.
- if pendingEDUs > 0 {
- ch := make(chan bool, 1)
- close(ch)
- return ch
+ if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 {
+ // We have room in memory for some EDUs - let's request no more than that.
+ if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
+ for receipt, edu := range edus {
+ if _, ok := gotEDUs[receipt.String()]; ok {
+ continue
+ }
+ oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
+ retrieved = true
+ }
+ } else {
+ logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
+ }
+ }
+ // If we've retrieved all of the events from the database with room to spare
+ // in memory then we'll no longer consider this queue to be overflowed.
+ if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory {
+ oq.overflowed.Store(false)
+ }
+ // If we've retrieved some events then notify the destination queue goroutine.
+ if retrieved {
+ select {
+ case oq.notify <- struct{}{}:
+ default:
+ }
}
- // If there are no EDUs pending right now then instead we'll return
- // the notify channel, so that backgroundSend can pick up normal
- // notifications from sendEvent.
- return oq.notifyEDUs
}
// backgroundSend is the worker goroutine for sending events.
@@ -206,25 +244,28 @@ func (oq *destinationQueue) backgroundSend() {
}
defer oq.running.Store(false)
+ // Mark the queue as overflowed, so we will consult the database
+ // to see if there's anything new to send.
+ oq.overflowed.Store(true)
+
for {
- pendingPDUs, pendingEDUs := false, false
+ // If we are overflowing memory and have sent things out to the
+ // database then we can look up what those things are.
+ if oq.overflowed.Load() {
+ oq.getPendingFromDatabase()
+ }
// If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout.
select {
- case <-oq.waitForPDUs():
- // We were woken up because there are new PDUs waiting in the
- // database.
- pendingPDUs = true
- case <-oq.waitForEDUs():
- // We were woken up because there are new PDUs waiting in the
- // database.
- pendingEDUs = true
+ case <-oq.notify:
+ // There's work to do, either because getPendingFromDatabase
+ // told us there is, or because a new event has come in via
+ // sendEvent/sendEDU.
case <-time.After(queueIdleTimeout):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
- log.Tracef("Queue %q has been idle for %s, going to sleep", oq.destination, queueIdleTimeout)
return
}
@@ -237,6 +278,16 @@ func (oq *destinationQueue) backgroundSend() {
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
+ oq.pendingMutex.Lock()
+ for i := range oq.pendingPDUs {
+ oq.pendingPDUs[i] = nil
+ }
+ for i := range oq.pendingEDUs {
+ oq.pendingEDUs[i] = nil
+ }
+ oq.pendingPDUs = nil
+ oq.pendingEDUs = nil
+ oq.pendingMutex.Unlock()
return
}
if until != nil && until.After(time.Now()) {
@@ -250,18 +301,41 @@ func (oq *destinationQueue) backgroundSend() {
}
}
+ // Work out which PDUs/EDUs to include in the next transaction.
+ oq.pendingMutex.RLock()
+ pduCount := len(oq.pendingPDUs)
+ eduCount := len(oq.pendingEDUs)
+ if pduCount > maxPDUsPerTransaction {
+ pduCount = maxPDUsPerTransaction
+ }
+ if eduCount > maxEDUsPerTransaction {
+ eduCount = maxEDUsPerTransaction
+ }
+ toSendPDUs := oq.pendingPDUs[:pduCount]
+ toSendEDUs := oq.pendingEDUs[:eduCount]
+ oq.pendingMutex.RUnlock()
+
// If we have pending PDUs or EDUs then construct a transaction.
- if pendingPDUs || pendingEDUs {
- // Try sending the next transaction and see what happens.
- transaction, terr := oq.nextTransaction()
- if terr != nil {
- // We failed to send the transaction. Mark it as a failure.
- oq.statistics.Failure()
- } else if transaction {
- // If we successfully sent the transaction then clear out
- // the pending events and EDUs, and wipe our transaction ID.
- oq.statistics.Success()
+ // Try sending the next transaction and see what happens.
+ transaction, pc, ec, terr := oq.nextTransaction(toSendPDUs, toSendEDUs)
+ if terr != nil {
+ // We failed to send the transaction. Mark it as a failure.
+ oq.statistics.Failure()
+
+ } else if transaction {
+ // If we successfully sent the transaction then clear out
+ // the pending events and EDUs, and wipe our transaction ID.
+ oq.statistics.Success()
+ oq.pendingMutex.Lock()
+ for i := range oq.pendingPDUs[:pc] {
+ oq.pendingPDUs[i] = nil
}
+ for i := range oq.pendingEDUs[:ec] {
+ oq.pendingEDUs[i] = nil
+ }
+ oq.pendingPDUs = oq.pendingPDUs[pc:]
+ oq.pendingEDUs = oq.pendingEDUs[ec:]
+ oq.pendingMutex.Unlock()
}
}
}
@@ -270,16 +344,20 @@ func (oq *destinationQueue) backgroundSend() {
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
// nolint:gocyclo
-func (oq *destinationQueue) nextTransaction() (bool, error) {
- // Before we do anything, we need to roll over the transaction
- // ID that is being used to coalesce events into the next TX.
- // Otherwise it's possible that we'll pick up an incomplete
- // transaction and end up nuking the rest of the events at the
- // cleanup stage.
+func (oq *destinationQueue) nextTransaction(
+ pdus []*queuedPDU,
+ edus []*queuedEDU,
+) (bool, int, int, error) {
+ // If there's no projected transaction ID then generate one. If
+ // the transaction succeeds then we'll set it back to "" so that
+ // we generate a new one next time. If it fails, we'll preserve
+ // it so that we retry with the same transaction ID.
oq.transactionIDMutex.Lock()
- oq.transactionID = ""
+ if oq.transactionID == "" {
+ now := gomatrixserverlib.AsTimestamp(time.Now())
+ oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
+ }
oq.transactionIDMutex.Unlock()
- oq.transactionCount.Store(0)
// Create the transaction.
t := gomatrixserverlib.Transaction{
@@ -289,58 +367,36 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
t.Origin = oq.origin
t.Destination = oq.destination
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
-
- // Ask the database for any pending PDUs from the next transaction.
- // maxPDUsPerTransaction is an upper limit but we probably won't
- // actually retrieve that many events.
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
- defer cancel()
- txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs(
- ctx, // context
- oq.destination, // server name
- maxPDUsPerTransaction, // max events to retrieve
- )
- if err != nil {
- log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
- return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
- }
-
- edus, eduReceipt, err := oq.db.GetNextTransactionEDUs(
- ctx, // context
- oq.destination, // server name
- maxEDUsPerTransaction, // max events to retrieve
- )
- if err != nil {
- log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination)
- return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err)
- }
+ t.TransactionID = oq.transactionID
// If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here.
if len(pdus) == 0 && len(edus) == 0 {
- return false, nil
+ return false, 0, 0, nil
}
- // Pick out the transaction ID from the database. If we didn't
- // get a transaction ID (i.e. because there are no PDUs but only
- // EDUs) then generate a transaction ID.
- t.TransactionID = txid
- if t.TransactionID == "" {
- now := gomatrixserverlib.AsTimestamp(time.Now())
- t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
- }
+ var pduReceipts []*shared.Receipt
+ var eduReceipts []*shared.Receipt
// Go through PDUs that we retrieved from the database, if any,
// and add them into the transaction.
for _, pdu := range pdus {
+ if pdu == nil || pdu.pdu == nil {
+ continue
+ }
// Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct
- t.PDUs = append(t.PDUs, (*pdu).JSON())
+ t.PDUs = append(t.PDUs, pdu.pdu.JSON())
+ pduReceipts = append(pduReceipts, pdu.receipt)
}
// Do the same for pending EDUS in the queue.
for _, edu := range edus {
- t.EDUs = append(t.EDUs, *edu)
+ if edu == nil || edu.edu == nil {
+ continue
+ }
+ t.EDUs = append(t.EDUs, *edu.edu)
+ eduReceipts = append(eduReceipts, edu.receipt)
}
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
@@ -349,34 +405,38 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
- ctx, cancel = context.WithTimeout(context.Background(), time.Minute*5)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
- _, err = oq.client.SendTransaction(ctx, t)
+ _, err := oq.client.SendTransaction(ctx, t)
switch err.(type) {
case nil:
// Clean up the transaction in the database.
- if pduReceipt != nil {
+ if pduReceipts != nil {
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
- if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil {
- log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination)
+ if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
+ log.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
}
}
- if eduReceipt != nil {
+ if eduReceipts != nil {
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
- if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil {
- log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination)
+ if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
+ log.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
}
}
- return true, nil
+ // Reset the transaction ID.
+ oq.transactionIDMutex.Lock()
+ oq.transactionID = ""
+ oq.transactionIDMutex.Unlock()
+ return true, len(t.PDUs), len(t.EDUs), nil
case gomatrix.HTTPError:
// Report that we failed to send the transaction and we
// will retry again, subject to backoff.
- return false, err
+ return false, 0, 0, err
default:
log.WithFields(log.Fields{
"destination": oq.destination,
log.ErrorKey: err,
- }).Info("problem sending transaction")
- return false, err
+ }).Infof("Failed to send transaction %q", t.TransactionID)
+ return false, 0, 0, err
}
}
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index d45af868..da30e4de 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
+ "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@@ -83,8 +84,8 @@ func NewOutgoingQueues(
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
}
for serverName := range serverNames {
- if !queues.getQueue(serverName).statistics.Blacklisted() {
- queues.getQueue(serverName).wakeQueueIfNeeded()
+ if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() {
+ queue.wakeQueueIfNeeded()
}
}
})
@@ -100,6 +101,16 @@ type SigningInfo struct {
PrivateKey ed25519.PrivateKey
}
+type queuedPDU struct {
+ receipt *shared.Receipt
+ pdu *gomatrixserverlib.HeaderedEvent
+}
+
+type queuedEDU struct {
+ receipt *shared.Receipt
+ edu *gomatrixserverlib.EDU
+}
+
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@@ -112,8 +123,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
- notifyPDUs: make(chan bool, 1),
- notifyEDUs: make(chan bool, 1),
+ notify: make(chan struct{}, 1),
interruptBackoff: make(chan bool),
signing: oqs.signing,
}
@@ -188,7 +198,7 @@ func (oqs *OutgoingQueues) SendEvent(
}
for destination := range destmap {
- oqs.getQueue(destination).sendEvent(nid)
+ oqs.getQueue(destination).sendEvent(ev, nid)
}
return nil
@@ -258,7 +268,7 @@ func (oqs *OutgoingQueues) SendEDU(
}
for destination := range destmap {
- oqs.getQueue(destination).sendEDU(nid)
+ oqs.getQueue(destination).sendEDU(e, nid)
}
return nil
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index a3f5073f..03d616f1 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -36,14 +36,14 @@ type Database interface {
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)
+ GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error)
+ GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error)
+
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
- GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, *shared.Receipt, error)
- GetNextTransactionEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) ([]*gomatrixserverlib.EDU, *shared.Receipt, error)
-
- CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
- CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
+ CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
+ CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)
GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)
diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go
index 95a3b9ee..f9a47748 100644
--- a/federationsender/storage/postgres/queue_pdus_table.go
+++ b/federationsender/storage/postgres/queue_pdus_table.go
@@ -45,16 +45,10 @@ const insertQueuePDUSQL = "" +
const deleteQueuePDUSQL = "" +
"DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND json_nid = ANY($2)"
-const selectQueuePDUNextTransactionIDSQL = "" +
- "SELECT transaction_id FROM federationsender_queue_pdus" +
- " WHERE server_name = $1" +
- " ORDER BY transaction_id ASC" +
- " LIMIT 1"
-
-const selectQueuePDUsByTransactionSQL = "" +
+const selectQueuePDUsSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" +
- " WHERE server_name = $1 AND transaction_id = $2" +
- " LIMIT $3"
+ " WHERE server_name = $1" +
+ " LIMIT $2"
const selectQueuePDUReferenceJSONCountSQL = "" +
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
@@ -71,8 +65,7 @@ type queuePDUsStatements struct {
db *sql.DB
insertQueuePDUStmt *sql.Stmt
deleteQueuePDUsStmt *sql.Stmt
- selectQueuePDUNextTransactionIDStmt *sql.Stmt
- selectQueuePDUsByTransactionStmt *sql.Stmt
+ selectQueuePDUsStmt *sql.Stmt
selectQueuePDUReferenceJSONCountStmt *sql.Stmt
selectQueuePDUsCountStmt *sql.Stmt
selectQueuePDUServerNamesStmt *sql.Stmt
@@ -92,10 +85,7 @@ func NewPostgresQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
if s.deleteQueuePDUsStmt, err = s.db.Prepare(deleteQueuePDUSQL); err != nil {
return
}
- if s.selectQueuePDUNextTransactionIDStmt, err = s.db.Prepare(selectQueuePDUNextTransactionIDSQL); err != nil {
- return
- }
- if s.selectQueuePDUsByTransactionStmt, err = s.db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
+ if s.selectQueuePDUsStmt, err = s.db.Prepare(selectQueuePDUsSQL); err != nil {
return
}
if s.selectQueuePDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueuePDUReferenceJSONCountSQL); err != nil {
@@ -137,18 +127,6 @@ func (s *queuePDUsStatements) DeleteQueuePDUs(
return err
}
-func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID(
- ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
-) (gomatrixserverlib.TransactionID, error) {
- var transactionID gomatrixserverlib.TransactionID
- stmt := sqlutil.TxStmt(txn, s.selectQueuePDUNextTransactionIDStmt)
- err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
- if err == sql.ErrNoRows {
- return "", nil
- }
- return transactionID, err
-}
-
func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64,
) (int64, error) {
@@ -182,11 +160,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount(
func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
- transactionID gomatrixserverlib.TransactionID,
limit int,
) ([]int64, error) {
- stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
- rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
+ stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
+ rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil {
return nil, err
}
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index af9d0d6a..fbf84c70 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -17,7 +17,6 @@ package shared
import (
"context"
"database/sql"
- "encoding/json"
"fmt"
"github.com/matrix-org/dendrite/federationsender/storage/tables"
@@ -44,16 +43,11 @@ type Database struct {
// to pass them back so that we can clean up if the transaction sends
// successfully.
type Receipt struct {
- nids []int64
+ nid int64
}
-func (e *Receipt) Empty() bool {
- return len(e.nids) == 0
-}
-
-func (e *Receipt) String() string {
- j, _ := json.Marshal(e.nids)
- return string(j)
+func (r *Receipt) String() string {
+ return fmt.Sprintf("%d", r.nid)
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
@@ -146,7 +140,7 @@ func (d *Database) StoreJSON(
return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
}
return &Receipt{
- nids: []int64{nid},
+ nid: nid,
}, nil
}
diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go
index ae1d1511..86fee1a3 100644
--- a/federationsender/storage/shared/storage_edus.go
+++ b/federationsender/storage/shared/storage_edus.go
@@ -33,16 +33,14 @@ func (d *Database) AssociateEDUWithDestination(
receipt *Receipt,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- for _, nid := range receipt.nids {
- if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
- ctx, // context
- txn, // SQL transaction
- "", // TODO: EDU type for coalescing
- serverName, // destination server name
- nid, // NID from the federationsender_queue_json table
- ); err != nil {
- return fmt.Errorf("InsertQueueEDU: %w", err)
- }
+ if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
+ ctx, // context
+ txn, // SQL transaction
+ "", // TODO: EDU type for coalescing
+ serverName, // destination server name
+ receipt.nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("InsertQueueEDU: %w", err)
}
return nil
})
@@ -50,29 +48,25 @@ func (d *Database) AssociateEDUWithDestination(
// GetNextTransactionEDUs retrieves events from the database for
// the next pending transaction, up to the limit specified.
-func (d *Database) GetNextTransactionEDUs(
+func (d *Database) GetPendingEDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
limit int,
) (
- edus []*gomatrixserverlib.EDU,
- receipt *Receipt,
+ edus map[*Receipt]*gomatrixserverlib.EDU,
err error,
) {
+ edus = make(map[*Receipt]*gomatrixserverlib.EDU)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err)
}
- receipt = &Receipt{
- nids: nids,
- }
-
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
- edus = append(edus, edu)
+ edus[&Receipt{nid}] = edu
} else {
retrieve = append(retrieve, nid)
}
@@ -83,12 +77,12 @@ func (d *Database) GetNextTransactionEDUs(
return fmt.Errorf("SelectQueueJSON: %w", err)
}
- for _, blob := range blobs {
+ for nid, blob := range blobs {
var event gomatrixserverlib.EDU
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
- edus = append(edus, &event)
+ edus[&Receipt{nid}] = &event
}
return nil
@@ -101,19 +95,24 @@ func (d *Database) GetNextTransactionEDUs(
func (d *Database) CleanEDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
- receipt *Receipt,
+ receipts []*Receipt,
) error {
- if receipt == nil {
+ if len(receipts) == 0 {
return errors.New("expected receipt")
}
+ nids := make([]int64, len(receipts))
+ for i := range receipts {
+ nids[i] = receipts[i].nid
+ }
+
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil {
+ if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil {
return err
}
var deleteNIDs []int64
- for _, nid := range receipt.nids {
+ for _, nid := range nids {
count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
if err != nil {
return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err)
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 09235a5e..bc298a90 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -34,16 +34,14 @@ func (d *Database) AssociatePDUWithDestination(
receipt *Receipt,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- for _, nid := range receipt.nids {
- if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
- ctx, // context
- txn, // SQL transaction
- transactionID, // transaction ID
- serverName, // destination server name
- nid, // NID from the federationsender_queue_json table
- ); err != nil {
- return fmt.Errorf("InsertQueuePDU: %w", err)
- }
+ if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
+ ctx, // context
+ txn, // SQL transaction
+ transactionID, // transaction ID
+ serverName, // destination server name
+ receipt.nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("InsertQueuePDU: %w", err)
}
return nil
})
@@ -51,14 +49,12 @@ func (d *Database) AssociatePDUWithDestination(
// GetNextTransactionPDUs retrieves events from the database for
// the next pending transaction, up to the limit specified.
-func (d *Database) GetNextTransactionPDUs(
+func (d *Database) GetPendingPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
limit int,
) (
- transactionID gomatrixserverlib.TransactionID,
- events []*gomatrixserverlib.HeaderedEvent,
- receipt *Receipt,
+ events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
err error,
) {
// Strictly speaking this doesn't need to be using the writer
@@ -66,29 +62,17 @@ func (d *Database) GetNextTransactionPDUs(
// a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify
// the database.
+ events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName)
- if err != nil {
- return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err)
- }
-
- if transactionID == "" {
- return nil
- }
-
- nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, transactionID, limit)
+ nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueuePDUs: %w", err)
}
- receipt = &Receipt{
- nids: nids,
- }
-
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
- events = append(events, event)
+ events[&Receipt{nid}] = event
} else {
retrieve = append(retrieve, nid)
}
@@ -104,7 +88,7 @@ func (d *Database) GetNextTransactionPDUs(
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
- events = append(events, &event)
+ events[&Receipt{nid}] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
@@ -119,19 +103,24 @@ func (d *Database) GetNextTransactionPDUs(
func (d *Database) CleanPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
- receipt *Receipt,
+ receipts []*Receipt,
) error {
- if receipt == nil {
+ if len(receipts) == 0 {
return errors.New("expected receipt")
}
+ nids := make([]int64, len(receipts))
+ for i := range receipts {
+ nids[i] = receipts[i].nid
+ }
+
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
+ if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
return err
}
var deleteNIDs []int64
- for _, nid := range receipt.nids {
+ for _, nid := range nids {
count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
if err != nil {
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)
diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go
index 70519c9e..e0fdbda5 100644
--- a/federationsender/storage/sqlite3/queue_pdus_table.go
+++ b/federationsender/storage/sqlite3/queue_pdus_table.go
@@ -53,10 +53,10 @@ const selectQueueNextTransactionIDSQL = "" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
-const selectQueuePDUsByTransactionSQL = "" +
+const selectQueuePDUsSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" +
- " WHERE server_name = $1 AND transaction_id = $2" +
- " LIMIT $3"
+ " WHERE server_name = $1" +
+ " LIMIT $2"
const selectQueuePDUsReferenceJSONCountSQL = "" +
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
@@ -73,7 +73,7 @@ type queuePDUsStatements struct {
db *sql.DB
insertQueuePDUStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt
- selectQueuePDUsByTransactionStmt *sql.Stmt
+ selectQueuePDUsStmt *sql.Stmt
selectQueueReferenceJSONCountStmt *sql.Stmt
selectQueuePDUsCountStmt *sql.Stmt
selectQueueServerNamesStmt *sql.Stmt
@@ -97,7 +97,7 @@ func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
return
}
- if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
+ if s.selectQueuePDUsStmt, err = db.Prepare(selectQueuePDUsSQL); err != nil {
return
}
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueuePDUsReferenceJSONCountSQL); err != nil {
@@ -193,11 +193,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount(
func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
- transactionID gomatrixserverlib.TransactionID,
limit int,
) ([]int64, error) {
- stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
- rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
+ stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
+ rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil {
return nil, err
}
diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go
index 1167a212..69e952de 100644
--- a/federationsender/storage/tables/interface.go
+++ b/federationsender/storage/tables/interface.go
@@ -25,10 +25,9 @@ import (
type FederationSenderQueuePDUs interface {
InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error
DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
- SelectQueuePDUNextTransactionID(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (gomatrixserverlib.TransactionID, error)
SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)
SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error)
- SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, limit int) ([]int64, error)
+ SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error)
SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
}