aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-01 11:46:38 +0100
committerGitHub <noreply@github.com>2020-07-01 11:46:38 +0100
commit42dd96242574866378bb95d92bc0c7fdf3dbabf6 (patch)
tree78f6afb8160006ec5f573f19e1f6653c5187373b /federationsender/queue
parent6f49758b90d655d9c2bb9170da2ea1d0a2bdd664 (diff)
Persistent federation sender queues (PDUs) (#1173)
* Initial work on persistent queues * Update index for event ID and server name * Put things into database (postgres for now) * Duplicate postgres code into sqlite for now just to stop build errors, will fix SQLite soon * Fix table name * Fix index * Fix table name * Use RETURNING because LastInsertID is not supported by postgres * Use functions * Marshal headered event * Don't error on now rows * Don't block if there are PDUs waiting * Try to tidy up JSON * Debug logging * Fix query, use transactions in postgres * Clean up * Rehydrate more opportunistically * Fix SQLite * remove unused types * Review comments * Shuffle things around a bit * Clean up transaction properly * Don't send empty transactions * Reduce unnecessary retries * Count PDUs to make more resilient * Don't stop when there is work to be done * Try to limit wakeups * well this is tedious * Fix race in incomplete transactions * Thread safety on transaction ID/count
Diffstat (limited to 'federationsender/queue')
-rw-r--r--federationsender/queue/destinationqueue.go267
-rw-r--r--federationsender/queue/queue.go21
2 files changed, 189 insertions, 99 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 4449f9e6..a736b385 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -18,8 +18,10 @@ import (
"context"
"encoding/json"
"fmt"
+ "sync"
"time"
+ "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
@@ -29,11 +31,14 @@ import (
"go.uber.org/atomic"
)
+const maxPDUsPerTransaction = 50
+
// destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
+ db storage.Database
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
@@ -42,13 +47,15 @@ type destinationQueue struct {
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
- incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
- incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
- lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID
- pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
+ incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
+ transactionIDMutex sync.Mutex // protects transactionID
+ transactionID gomatrixserverlib.TransactionID // last transaction ID
+ transactionCount atomic.Int32 // how many events in this transaction so far
+ pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
+ wakeServerCh chan bool // interrupts idle wait
retryServerCh chan bool // interrupts backoff
}
@@ -79,15 +86,47 @@ func (oq *destinationQueue) retry() {
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
-func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
+func (oq *destinationQueue) sendEvent(nid int64) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
return
}
+ // Create a transaction ID. We'll either do this if we don't have
+ // one made up yet, or if we've exceeded the number of maximum
+ // events allowed in a single tranaction. We'll reset the counter
+ // when we do.
+ oq.transactionIDMutex.Lock()
+ if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
+ now := gomatrixserverlib.AsTimestamp(time.Now())
+ oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
+ oq.transactionCount.Store(0)
+ }
+ oq.transactionIDMutex.Unlock()
+ // Create a database entry that associates the given PDU NID with
+ // this destination queue. We'll then be able to retrieve the PDU
+ // later.
+ if err := oq.db.AssociatePDUWithDestination(
+ context.TODO(),
+ oq.transactionID, // the current transaction ID
+ oq.destination, // the destination server name
+ []int64{nid}, // NID from federationsender_queue_json table
+ ); err != nil {
+ log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination)
+ return
+ }
+ // We've successfully added a PDU to the transaction so increase
+ // the counter.
+ oq.transactionCount.Add(1)
+ // If the queue isn't running at this point then start it.
if !oq.running.Load() {
go oq.backgroundSend()
}
- oq.incomingPDUs <- ev
+ // Signal that we've sent a new PDU. This will cause the queue to
+ // wake up if it's asleep. The return to the Add function will only
+ // be 1 if the previous value was 0, e.g. nothing was waiting before.
+ if oq.pendingPDUs.Add(1) == 1 {
+ oq.wakeServerCh <- true
+ }
}
// sendEDU adds the EDU event to the pending queue for the destination.
@@ -129,56 +168,48 @@ func (oq *destinationQueue) backgroundSend() {
defer oq.running.Store(false)
for {
- // Wait either for incoming events, or until we hit an
- // idle timeout.
- select {
- case pdu := <-oq.incomingPDUs:
- // Ordering of PDUs is important so we add them to the end
- // of the queue and they will all be added to transactions
- // in order.
- oq.pendingPDUs = append(oq.pendingPDUs, pdu)
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingPDUs) > 0 {
- oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs)
- }
- case edu := <-oq.incomingEDUs:
- // Likewise for EDUs, although we should probably not try
- // too hard with some EDUs (like typing notifications) after
- // a certain amount of time has passed.
- // TODO: think about EDU expiry some more
- oq.pendingEDUs = append(oq.pendingEDUs, edu)
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingEDUs) > 0 {
- oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
- }
- case invite := <-oq.incomingInvites:
- // There's no strict ordering requirement for invites like
- // there is for transactions, so we put the invite onto the
- // front of the queue. This means that if an invite that is
- // stuck failing already, that it won't block our new invite
- // from being sent.
- oq.pendingInvites = append(
- []*gomatrixserverlib.InviteV2Request{invite},
- oq.pendingInvites...,
- )
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingInvites) > 0 {
- oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
+ // If we have nothing to do then wait either for incoming events, or
+ // until we hit an idle timeout.
+ if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 {
+ select {
+ case <-oq.wakeServerCh:
+ // We were woken up because there are new PDUs waiting in the
+ // database.
+ case edu := <-oq.incomingEDUs:
+ // EDUs are handled in-memory for now. We will try to keep
+ // the ordering intact.
+ // TODO: Certain EDU types need persistence, e.g. send-to-device
+ oq.pendingEDUs = append(oq.pendingEDUs, edu)
+ // If there are any more things waiting in the channel queue
+ // then read them. This is safe because we guarantee only
+ // having one goroutine per destination queue, so the channel
+ // isn't being consumed anywhere else.
+ for len(oq.incomingEDUs) > 0 {
+ oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
+ }
+ case invite := <-oq.incomingInvites:
+ // There's no strict ordering requirement for invites like
+ // there is for transactions, so we put the invite onto the
+ // front of the queue. This means that if an invite that is
+ // stuck failing already, that it won't block our new invite
+ // from being sent.
+ oq.pendingInvites = append(
+ []*gomatrixserverlib.InviteV2Request{invite},
+ oq.pendingInvites...,
+ )
+ // If there are any more things waiting in the channel queue
+ // then read them. This is safe because we guarantee only
+ // having one goroutine per destination queue, so the channel
+ // isn't being consumed anywhere else.
+ for len(oq.incomingInvites) > 0 {
+ oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
+ }
+ case <-time.After(time.Second * 30):
+ // The worker is idle so stop the goroutine. It'll get
+ // restarted automatically the next time we have an event to
+ // send.
+ return
}
- case <-time.After(time.Second * 30):
- // The worker is idle so stop the goroutine. It'll
- // get restarted automatically the next time we
- // get an event.
- return
}
// If we are backing off this server then wait for the
@@ -193,47 +224,31 @@ func (oq *destinationQueue) backgroundSend() {
oq.backingOff.Store(false)
}
- // How many things do we have waiting?
- numPDUs := len(oq.pendingPDUs)
- numEDUs := len(oq.pendingEDUs)
- numInvites := len(oq.pendingInvites)
-
// If we have pending PDUs or EDUs then construct a transaction.
- if numPDUs > 0 || numEDUs > 0 {
+ if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 {
// Try sending the next transaction and see what happens.
- transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount())
+ transaction, terr := oq.nextTransaction(oq.pendingEDUs)
if terr != nil {
// We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp {
- // It's been suggested that we should give up because
- // the backoff has exceeded a maximum allowable value.
+ // It's been suggested that we should give up because the backoff
+ // has exceeded a maximum allowable value. Clean up the in-memory
+ // buffers at this point. The PDU clean-up is already on a defer.
+ oq.cleanPendingEDUs()
+ oq.cleanPendingInvites()
return
}
} else if transaction {
// If we successfully sent the transaction then clear out
- // the pending events and EDUs.
+ // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success()
- // Reallocate so that the underlying arrays can be GC'd, as
- // opposed to growing forever.
- for i := 0; i < numPDUs; i++ {
- oq.pendingPDUs[i] = nil
- }
- for i := 0; i < numEDUs; i++ {
- oq.pendingEDUs[i] = nil
- }
- oq.pendingPDUs = append(
- []*gomatrixserverlib.HeaderedEvent{},
- oq.pendingPDUs[numPDUs:]...,
- )
- oq.pendingEDUs = append(
- []*gomatrixserverlib.EDU{},
- oq.pendingEDUs[numEDUs:]...,
- )
+ // Clean up the in-memory buffers.
+ oq.cleanPendingEDUs()
}
}
// Try sending the next invite and see what happens.
- if numInvites > 0 {
+ if len(oq.pendingInvites) > 0 {
sent, ierr := oq.nextInvites(oq.pendingInvites)
if ierr != nil {
// We failed to send the transaction so increase the
@@ -249,59 +264,117 @@ func (oq *destinationQueue) backgroundSend() {
oq.statistics.Success()
// Reallocate so that the underlying array can be GC'd, as
// opposed to growing forever.
- oq.pendingInvites = append(
- []*gomatrixserverlib.InviteV2Request{},
- oq.pendingInvites[sent:]...,
- )
+ oq.cleanPendingInvites()
}
}
}
}
+// cleanPendingEDUs cleans out the pending EDU buffer, removing
+// all references so that the underlying objects can be GC'd.
+func (oq *destinationQueue) cleanPendingEDUs() {
+ for i := 0; i < len(oq.pendingEDUs); i++ {
+ oq.pendingEDUs[i] = nil
+ }
+ oq.pendingEDUs = []*gomatrixserverlib.EDU{}
+}
+
+// cleanPendingInvites cleans out the pending invite buffer,
+// removing all references so that the underlying objects can
+// be GC'd.
+func (oq *destinationQueue) cleanPendingInvites() {
+ for i := 0; i < len(oq.pendingInvites); i++ {
+ oq.pendingInvites[i] = nil
+ }
+ oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
+}
+
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
func (oq *destinationQueue) nextTransaction(
- pendingPDUs []*gomatrixserverlib.HeaderedEvent,
pendingEDUs []*gomatrixserverlib.EDU,
- sentCounter uint32,
) (bool, error) {
+ // Before we do anything, we need to roll over the transaction
+ // ID that is being used to coalesce events into the next TX.
+ // Otherwise it's possible that we'll pick up an incomplete
+ // transaction and end up nuking the rest of the events at the
+ // cleanup stage.
+ oq.transactionIDMutex.Lock()
+ oq.transactionID = ""
+ oq.transactionIDMutex.Unlock()
+ oq.transactionCount.Store(0)
+
+ // Create the transaction.
t := gomatrixserverlib.Transaction{
PDUs: []json.RawMessage{},
EDUs: []gomatrixserverlib.EDU{},
}
- now := gomatrixserverlib.AsTimestamp(time.Now())
- t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter))
t.Origin = oq.origin
t.Destination = oq.destination
- t.OriginServerTS = now
- t.PreviousIDs = oq.lastTransactionIDs
- if t.PreviousIDs == nil {
- t.PreviousIDs = []gomatrixserverlib.TransactionID{}
+ t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
+
+ // Ask the database for any pending PDUs from the next transaction.
+ // maxPDUsPerTransaction is an upper limit but we probably won't
+ // actually retrieve that many events.
+ txid, pdus, err := oq.db.GetNextTransactionPDUs(
+ context.TODO(), // context
+ oq.destination, // server name
+ maxPDUsPerTransaction, // max events to retrieve
+ )
+ if err != nil {
+ log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
+ return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
}
- oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
+ // If we didn't get anything from the database and there are no
+ // pending EDUs then there's nothing to do - stop here.
+ if len(pdus) == 0 && len(pendingEDUs) == 0 {
+ return false, nil
+ }
- for _, pdu := range pendingPDUs {
+ // Pick out the transaction ID from the database. If we didn't
+ // get a transaction ID (i.e. because there are no PDUs but only
+ // EDUs) then generate a transaction ID.
+ t.TransactionID = txid
+ if t.TransactionID == "" {
+ now := gomatrixserverlib.AsTimestamp(time.Now())
+ t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
+ }
+
+ // Go through PDUs that we retrieved from the database, if any,
+ // and add them into the transaction.
+ for _, pdu := range pdus {
// Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct
t.PDUs = append(t.PDUs, (*pdu).JSON())
}
+ // Do the same for pending EDUS in the queue.
for _, edu := range pendingEDUs {
t.EDUs = append(t.EDUs, *edu)
}
logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
+ // Try to send the transaction to the destination server.
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
- _, err := oq.client.SendTransaction(context.TODO(), t)
+ _, err = oq.client.SendTransaction(context.TODO(), t)
switch e := err.(type) {
case nil:
// No error was returned so the transaction looks to have
// been successfully sent.
+ oq.pendingPDUs.Sub(int32(len(t.PDUs)))
+ // Clean up the transaction in the database.
+ if err = oq.db.CleanTransactionPDUs(
+ context.TODO(),
+ t.Destination,
+ t.TransactionID,
+ ); err != nil {
+ log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination)
+ }
return true, nil
case gomatrix.HTTPError:
// We received a HTTP error back. In this instance we only
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 24034355..492d5f55 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -15,10 +15,13 @@
package queue
import (
+ "context"
"crypto/ed25519"
+ "encoding/json"
"fmt"
"sync"
+ "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -29,6 +32,7 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers
type OutgoingQueues struct {
+ db storage.Database
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
@@ -40,6 +44,7 @@ type OutgoingQueues struct {
// NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(
+ db storage.Database,
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI,
@@ -47,6 +52,7 @@ func NewOutgoingQueues(
signing *SigningInfo,
) *OutgoingQueues {
return &OutgoingQueues{
+ db: db,
rsAPI: rsAPI,
origin: origin,
client: client,
@@ -76,14 +82,15 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
+ db: oqs.db,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
- incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
+ wakeServerCh: make(chan bool, 128),
retryServerCh: make(chan bool),
signing: oqs.signing,
}
@@ -115,8 +122,18 @@ func (oqs *OutgoingQueues) SendEvent(
"destinations": destinations, "event": ev.EventID(),
}).Info("Sending event")
+ headeredJSON, err := json.Marshal(ev)
+ if err != nil {
+ return fmt.Errorf("json.Marshal: %w", err)
+ }
+
+ nid, err := oqs.db.StoreJSON(context.TODO(), string(headeredJSON))
+ if err != nil {
+ return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
+ }
+
for _, destination := range destinations {
- oqs.getQueue(destination).sendEvent(ev)
+ oqs.getQueue(destination).sendEvent(nid)
}
return nil