aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-20 16:55:20 +0100
committerGitHub <noreply@github.com>2020-07-20 16:55:20 +0100
commit11a39fe3b5afeadafd2c842951462d9332ebecca (patch)
tree88c3601666ecd6e891c64875f29e1f8ce22eac34 /federationsender/queue
parentf3c482b078c2fbb6d6d7a41be44c2e841d70d8b8 (diff)
Deduplicate FS database, EDU persistence table (#1207)
* Deduplicate FS database, add some EDU persistence groundwork * Extend TransactionWriter to use optional existing transaction, use that for FS SQLite database writes * Fix build due to bad keyserver import * Working EDU persistence * gocyclo, unsurprisingly * Remove unused * Update copyright notices
Diffstat (limited to 'federationsender/queue')
-rw-r--r--federationsender/queue/destinationqueue.go136
-rw-r--r--federationsender/queue/queue.go33
2 files changed, 115 insertions, 54 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 57c8cff6..b7582bf9 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -22,6 +22,7 @@ import (
"time"
"github.com/matrix-org/dendrite/federationsender/storage"
+ "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
@@ -31,8 +32,11 @@ import (
"go.uber.org/atomic"
)
-const maxPDUsPerTransaction = 50
-const queueIdleTimeout = time.Second * 30
+const (
+ maxPDUsPerTransaction = 50
+ maxEDUsPerTransaction = 50
+ queueIdleTimeout = time.Second * 30
+)
// destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
@@ -49,20 +53,19 @@ type destinationQueue struct {
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
- incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount atomic.Int32 // how many events in this transaction so far
- pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
notifyPDUs chan bool // interrupts idle wait for PDUs
+ notifyEDUs chan bool // interrupts idle wait for EDUs
interruptBackoff chan bool // interrupts backoff
}
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
-func (oq *destinationQueue) sendEvent(nid int64) {
+func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
log.Infof("%s is blacklisted; dropping event", oq.destination)
@@ -86,9 +89,9 @@ func (oq *destinationQueue) sendEvent(nid int64) {
context.TODO(),
oq.transactionID, // the current transaction ID
oq.destination, // the destination server name
- []int64{nid}, // NID from federationsender_queue_json table
+ receipt, // NIDs from federationsender_queue_json table
); err != nil {
- log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination)
+ log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination)
return
}
// We've successfully added a PDU to the transaction so increase
@@ -107,13 +110,34 @@ func (oq *destinationQueue) sendEvent(nid int64) {
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
-func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
+func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
+ log.Infof("%s is blacklisted; dropping ephemeral event", oq.destination)
return
}
+ // Create a database entry that associates the given PDU NID with
+ // this destination queue. We'll then be able to retrieve the PDU
+ // later.
+ if err := oq.db.AssociateEDUWithDestination(
+ context.TODO(),
+ oq.destination, // the destination server name
+ receipt, // NIDs from federationsender_queue_json table
+ ); err != nil {
+ log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination)
+ return
+ }
+ // We've successfully added an EDU to the transaction so increase
+ // the counter.
+ oq.transactionCount.Add(1)
+ // Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
- oq.incomingEDUs <- ev
+ // If we're blocking on waiting PDUs then tell the queue that we
+ // have work to do.
+ select {
+ case oq.notifyEDUs <- true:
+ default:
+ }
}
// sendInvite adds the invite event to the pending queue for the
@@ -166,6 +190,28 @@ func (oq *destinationQueue) waitForPDUs() chan bool {
return oq.notifyPDUs
}
+// waitForEDUs returns a channel for pending EDUs, which will be
+// used in backgroundSend select. It returns a closed channel if
+// there is something pending right now, or an open channel if
+// we're waiting for something.
+func (oq *destinationQueue) waitForEDUs() chan bool {
+ pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination)
+ if err != nil {
+ log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination)
+ }
+ // If there are EDUs pending right now then we'll return a closed
+ // channel. This will mean that the backgroundSend will not block.
+ if pendingEDUs > 0 {
+ ch := make(chan bool, 1)
+ close(ch)
+ return ch
+ }
+ // If there are no EDUs pending right now then instead we'll return
+ // the notify channel, so that backgroundSend can pick up normal
+ // notifications from sendEvent.
+ return oq.notifyEDUs
+}
+
// backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo
func (oq *destinationQueue) backgroundSend() {
@@ -177,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() {
defer oq.running.Store(false)
for {
- pendingPDUs := false
+ pendingPDUs, pendingEDUs := false, false
// If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout.
@@ -186,18 +232,10 @@ func (oq *destinationQueue) backgroundSend() {
// We were woken up because there are new PDUs waiting in the
// database.
pendingPDUs = true
- case edu := <-oq.incomingEDUs:
- // EDUs are handled in-memory for now. We will try to keep
- // the ordering intact.
- // TODO: Certain EDU types need persistence, e.g. send-to-device
- oq.pendingEDUs = append(oq.pendingEDUs, edu)
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingEDUs) > 0 {
- oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
- }
+ case <-oq.waitForEDUs():
+ // We were woken up because there are new PDUs waiting in the
+ // database.
+ pendingEDUs = true
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
@@ -238,16 +276,15 @@ func (oq *destinationQueue) backgroundSend() {
}
// If we have pending PDUs or EDUs then construct a transaction.
- if pendingPDUs || len(oq.pendingEDUs) > 0 {
+ if pendingPDUs || pendingEDUs {
// Try sending the next transaction and see what happens.
- transaction, terr := oq.nextTransaction(oq.pendingEDUs)
+ transaction, terr := oq.nextTransaction()
if terr != nil {
// We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp {
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
- oq.cleanPendingEDUs()
oq.cleanPendingInvites()
log.Infof("Blacklisting %q due to errors", oq.destination)
return
@@ -265,8 +302,6 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success()
- // Clean up the in-memory buffers.
- oq.cleanPendingEDUs()
}
}
@@ -294,15 +329,6 @@ func (oq *destinationQueue) backgroundSend() {
}
}
-// cleanPendingEDUs cleans out the pending EDU buffer, removing
-// all references so that the underlying objects can be GC'd.
-func (oq *destinationQueue) cleanPendingEDUs() {
- for i := 0; i < len(oq.pendingEDUs); i++ {
- oq.pendingEDUs[i] = nil
- }
- oq.pendingEDUs = []*gomatrixserverlib.EDU{}
-}
-
// cleanPendingInvites cleans out the pending invite buffer,
// removing all references so that the underlying objects can
// be GC'd.
@@ -316,9 +342,8 @@ func (oq *destinationQueue) cleanPendingInvites() {
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
-func (oq *destinationQueue) nextTransaction(
- pendingEDUs []*gomatrixserverlib.EDU,
-) (bool, error) {
+// nolint:gocyclo
+func (oq *destinationQueue) nextTransaction() (bool, error) {
// Before we do anything, we need to roll over the transaction
// ID that is being used to coalesce events into the next TX.
// Otherwise it's possible that we'll pick up an incomplete
@@ -343,7 +368,7 @@ func (oq *destinationQueue) nextTransaction(
// actually retrieve that many events.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
- txid, pdus, err := oq.db.GetNextTransactionPDUs(
+ txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs(
ctx, // context
oq.destination, // server name
maxPDUsPerTransaction, // max events to retrieve
@@ -353,9 +378,19 @@ func (oq *destinationQueue) nextTransaction(
return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
}
+ edus, eduReceipt, err := oq.db.GetNextTransactionEDUs(
+ ctx, // context
+ oq.destination, // server name
+ maxEDUsPerTransaction, // max events to retrieve
+ )
+ if err != nil {
+ log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination)
+ return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err)
+ }
+
// If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here.
- if len(pdus) == 0 && len(pendingEDUs) == 0 {
+ if len(pdus) == 0 && len(edus) == 0 {
return false, nil
}
@@ -377,7 +412,7 @@ func (oq *destinationQueue) nextTransaction(
}
// Do the same for pending EDUS in the queue.
- for _, edu := range pendingEDUs {
+ for _, edu := range edus {
t.EDUs = append(t.EDUs, *edu)
}
@@ -393,12 +428,17 @@ func (oq *destinationQueue) nextTransaction(
switch err.(type) {
case nil:
// Clean up the transaction in the database.
- if err = oq.db.CleanTransactionPDUs(
- context.Background(),
- t.Destination,
- t.TransactionID,
- ); err != nil {
- log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination)
+ if pduReceipt != nil {
+ //logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
+ if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil {
+ log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination)
+ }
+ }
+ if eduReceipt != nil {
+ //logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
+ if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil {
+ log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination)
+ }
}
return true, nil
case gomatrix.HTTPError:
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 46c9fddb..e488a34a 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -61,12 +61,23 @@ func NewOutgoingQueues(
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
// Look up which servers we have pending items for and then rehydrate those queues.
- if serverNames, err := db.GetPendingServerNames(context.Background()); err == nil {
- for _, serverName := range serverNames {
- queues.getQueue(serverName).wakeQueueIfNeeded()
+ serverNames := map[gomatrixserverlib.ServerName]struct{}{}
+ if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
+ for _, serverName := range names {
+ serverNames[serverName] = struct{}{}
}
} else {
- log.WithError(err).Error("Failed to get server names for destination queue hydration")
+ log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
+ }
+ if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
+ for _, serverName := range names {
+ serverNames[serverName] = struct{}{}
+ }
+ } else {
+ log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
+ }
+ for serverName := range serverNames {
+ queues.getQueue(serverName).wakeQueueIfNeeded()
}
return queues
}
@@ -91,9 +102,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
- incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
notifyPDUs: make(chan bool, 1),
+ notifyEDUs: make(chan bool, 1),
interruptBackoff: make(chan bool),
signing: oqs.signing,
}
@@ -196,8 +207,18 @@ func (oqs *OutgoingQueues) SendEDU(
}).Info("Sending EDU event")
}
+ ephemeralJSON, err := json.Marshal(e)
+ if err != nil {
+ return fmt.Errorf("json.Marshal: %w", err)
+ }
+
+ nid, err := oqs.db.StoreJSON(context.TODO(), string(ephemeralJSON))
+ if err != nil {
+ return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
+ }
+
for _, destination := range destinations {
- oqs.getQueue(destination).sendEDU(e)
+ oqs.getQueue(destination).sendEDU(nid)
}
return nil