diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-20 16:55:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-20 16:55:20 +0100 |
commit | 11a39fe3b5afeadafd2c842951462d9332ebecca (patch) | |
tree | 88c3601666ecd6e891c64875f29e1f8ce22eac34 /federationsender/queue | |
parent | f3c482b078c2fbb6d6d7a41be44c2e841d70d8b8 (diff) |
Deduplicate FS database, EDU persistence table (#1207)
* Deduplicate FS database, add some EDU persistence groundwork
* Extend TransactionWriter to use optional existing transaction, use that for FS SQLite database writes
* Fix build due to bad keyserver import
* Working EDU persistence
* gocyclo, unsurprisingly
* Remove unused
* Update copyright notices
Diffstat (limited to 'federationsender/queue')
-rw-r--r-- | federationsender/queue/destinationqueue.go | 136 | ||||
-rw-r--r-- | federationsender/queue/queue.go | 33 |
2 files changed, 115 insertions, 54 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 57c8cff6..b7582bf9 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -22,6 +22,7 @@ import ( "time" "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" @@ -31,8 +32,11 @@ import ( "go.uber.org/atomic" ) -const maxPDUsPerTransaction = 50 -const queueIdleTimeout = time.Second * 30 +const ( + maxPDUsPerTransaction = 50 + maxEDUsPerTransaction = 50 + queueIdleTimeout = time.Second * 30 +) // destinationQueue is a queue of events for a single destination. // It is responsible for sending the events to the destination and @@ -49,20 +53,19 @@ type destinationQueue struct { backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send - incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send transactionIDMutex sync.Mutex // protects transactionID transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount atomic.Int32 // how many events in this transaction so far - pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend notifyPDUs chan bool // interrupts idle wait for PDUs + notifyEDUs chan bool // interrupts idle wait for EDUs interruptBackoff chan bool // interrupts backoff } // Send event adds the event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEvent(nid int64) { +func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) { if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. log.Infof("%s is blacklisted; dropping event", oq.destination) @@ -86,9 +89,9 @@ func (oq *destinationQueue) sendEvent(nid int64) { context.TODO(), oq.transactionID, // the current transaction ID oq.destination, // the destination server name - []int64{nid}, // NID from federationsender_queue_json table + receipt, // NIDs from federationsender_queue_json table ); err != nil { - log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination) + log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination) return } // We've successfully added a PDU to the transaction so increase @@ -107,13 +110,34 @@ func (oq *destinationQueue) sendEvent(nid int64) { // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { +func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) { if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. + log.Infof("%s is blacklisted; dropping ephemeral event", oq.destination) return } + // Create a database entry that associates the given PDU NID with + // this destination queue. We'll then be able to retrieve the PDU + // later. + if err := oq.db.AssociateEDUWithDestination( + context.TODO(), + oq.destination, // the destination server name + receipt, // NIDs from federationsender_queue_json table + ); err != nil { + log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination) + return + } + // We've successfully added an EDU to the transaction so increase + // the counter. + oq.transactionCount.Add(1) + // Wake up the queue if it's asleep. oq.wakeQueueIfNeeded() - oq.incomingEDUs <- ev + // If we're blocking on waiting PDUs then tell the queue that we + // have work to do. + select { + case oq.notifyEDUs <- true: + default: + } } // sendInvite adds the invite event to the pending queue for the @@ -166,6 +190,28 @@ func (oq *destinationQueue) waitForPDUs() chan bool { return oq.notifyPDUs } +// waitForEDUs returns a channel for pending EDUs, which will be +// used in backgroundSend select. It returns a closed channel if +// there is something pending right now, or an open channel if +// we're waiting for something. +func (oq *destinationQueue) waitForEDUs() chan bool { + pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination) + if err != nil { + log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination) + } + // If there are EDUs pending right now then we'll return a closed + // channel. This will mean that the backgroundSend will not block. + if pendingEDUs > 0 { + ch := make(chan bool, 1) + close(ch) + return ch + } + // If there are no EDUs pending right now then instead we'll return + // the notify channel, so that backgroundSend can pick up normal + // notifications from sendEvent. + return oq.notifyEDUs +} + // backgroundSend is the worker goroutine for sending events. // nolint:gocyclo func (oq *destinationQueue) backgroundSend() { @@ -177,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() { defer oq.running.Store(false) for { - pendingPDUs := false + pendingPDUs, pendingEDUs := false, false // If we have nothing to do then wait either for incoming events, or // until we hit an idle timeout. @@ -186,18 +232,10 @@ func (oq *destinationQueue) backgroundSend() { // We were woken up because there are new PDUs waiting in the // database. pendingPDUs = true - case edu := <-oq.incomingEDUs: - // EDUs are handled in-memory for now. We will try to keep - // the ordering intact. - // TODO: Certain EDU types need persistence, e.g. send-to-device - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) - } + case <-oq.waitForEDUs(): + // We were woken up because there are new PDUs waiting in the + // database. + pendingEDUs = true case invite := <-oq.incomingInvites: // There's no strict ordering requirement for invites like // there is for transactions, so we put the invite onto the @@ -238,16 +276,15 @@ func (oq *destinationQueue) backgroundSend() { } // If we have pending PDUs or EDUs then construct a transaction. - if pendingPDUs || len(oq.pendingEDUs) > 0 { + if pendingPDUs || pendingEDUs { // Try sending the next transaction and see what happens. - transaction, terr := oq.nextTransaction(oq.pendingEDUs) + transaction, terr := oq.nextTransaction() if terr != nil { // We failed to send the transaction. if giveUp := oq.statistics.Failure(); giveUp { // It's been suggested that we should give up because the backoff // has exceeded a maximum allowable value. Clean up the in-memory // buffers at this point. The PDU clean-up is already on a defer. - oq.cleanPendingEDUs() oq.cleanPendingInvites() log.Infof("Blacklisting %q due to errors", oq.destination) return @@ -265,8 +302,6 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() - // Clean up the in-memory buffers. - oq.cleanPendingEDUs() } } @@ -294,15 +329,6 @@ func (oq *destinationQueue) backgroundSend() { } } -// cleanPendingEDUs cleans out the pending EDU buffer, removing -// all references so that the underlying objects can be GC'd. -func (oq *destinationQueue) cleanPendingEDUs() { - for i := 0; i < len(oq.pendingEDUs); i++ { - oq.pendingEDUs[i] = nil - } - oq.pendingEDUs = []*gomatrixserverlib.EDU{} -} - // cleanPendingInvites cleans out the pending invite buffer, // removing all references so that the underlying objects can // be GC'd. @@ -316,9 +342,8 @@ func (oq *destinationQueue) cleanPendingInvites() { // nextTransaction creates a new transaction from the pending event // queue and sends it. Returns true if a transaction was sent or // false otherwise. -func (oq *destinationQueue) nextTransaction( - pendingEDUs []*gomatrixserverlib.EDU, -) (bool, error) { +// nolint:gocyclo +func (oq *destinationQueue) nextTransaction() (bool, error) { // Before we do anything, we need to roll over the transaction // ID that is being used to coalesce events into the next TX. // Otherwise it's possible that we'll pick up an incomplete @@ -343,7 +368,7 @@ func (oq *destinationQueue) nextTransaction( // actually retrieve that many events. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - txid, pdus, err := oq.db.GetNextTransactionPDUs( + txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs( ctx, // context oq.destination, // server name maxPDUsPerTransaction, // max events to retrieve @@ -353,9 +378,19 @@ func (oq *destinationQueue) nextTransaction( return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err) } + edus, eduReceipt, err := oq.db.GetNextTransactionEDUs( + ctx, // context + oq.destination, // server name + maxEDUsPerTransaction, // max events to retrieve + ) + if err != nil { + log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination) + return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err) + } + // If we didn't get anything from the database and there are no // pending EDUs then there's nothing to do - stop here. - if len(pdus) == 0 && len(pendingEDUs) == 0 { + if len(pdus) == 0 && len(edus) == 0 { return false, nil } @@ -377,7 +412,7 @@ func (oq *destinationQueue) nextTransaction( } // Do the same for pending EDUS in the queue. - for _, edu := range pendingEDUs { + for _, edu := range edus { t.EDUs = append(t.EDUs, *edu) } @@ -393,12 +428,17 @@ func (oq *destinationQueue) nextTransaction( switch err.(type) { case nil: // Clean up the transaction in the database. - if err = oq.db.CleanTransactionPDUs( - context.Background(), - t.Destination, - t.TransactionID, - ); err != nil { - log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination) + if pduReceipt != nil { + //logrus.Infof("Cleaning PDUs %q", pduReceipt.String()) + if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil { + log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination) + } + } + if eduReceipt != nil { + //logrus.Infof("Cleaning EDUs %q", eduReceipt.String()) + if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil { + log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination) + } } return true, nil case gomatrix.HTTPError: diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 46c9fddb..e488a34a 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -61,12 +61,23 @@ func NewOutgoingQueues( queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } // Look up which servers we have pending items for and then rehydrate those queues. - if serverNames, err := db.GetPendingServerNames(context.Background()); err == nil { - for _, serverName := range serverNames { - queues.getQueue(serverName).wakeQueueIfNeeded() + serverNames := map[gomatrixserverlib.ServerName]struct{}{} + if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} } } else { - log.WithError(err).Error("Failed to get server names for destination queue hydration") + log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") + } + if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} + } + } else { + log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") + } + for serverName := range serverNames { + queues.getQueue(serverName).wakeQueueIfNeeded() } return queues } @@ -91,9 +102,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d destination: destination, client: oqs.client, statistics: oqs.statistics.ForServer(destination), - incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), notifyPDUs: make(chan bool, 1), + notifyEDUs: make(chan bool, 1), interruptBackoff: make(chan bool), signing: oqs.signing, } @@ -196,8 +207,18 @@ func (oqs *OutgoingQueues) SendEDU( }).Info("Sending EDU event") } + ephemeralJSON, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + nid, err := oqs.db.StoreJSON(context.TODO(), string(ephemeralJSON)) + if err != nil { + return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) + } + for _, destination := range destinations { - oqs.getQueue(destination).sendEDU(e) + oqs.getQueue(destination).sendEDU(nid) } return nil |