diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-02 17:43:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-02 17:43:07 +0100 |
commit | 38caf8e5b7623c090f8949076b57d769e42011ad (patch) | |
tree | 49d5434f09945bb7ea9313b153808f1a1abc6204 /federationsender | |
parent | 9c1f38621c4d787761092bc841e06ca424fbbf35 (diff) |
Yggdrasil+QUIC demo, federation sender tweaks (#1177)
* Initial QUIC work
* Update Yggdrasil demo
* Make sure that the federation sender knows how many pending events are in the database when the worker starts
* QUIC tunables
* pprof
* Don't spin
* Set build info for Yggdrasil
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/queue/destinationqueue.go | 115 | ||||
-rw-r--r-- | federationsender/storage/interface.go | 1 | ||||
-rw-r--r-- | federationsender/storage/postgres/queue_pdus_table.go | 23 | ||||
-rw-r--r-- | federationsender/storage/postgres/storage.go | 9 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/queue_pdus_table.go | 23 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/storage.go | 9 |
6 files changed, 130 insertions, 50 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index a736b385..ce706768 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -52,7 +52,7 @@ type destinationQueue struct { transactionIDMutex sync.Mutex // protects transactionID transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount atomic.Int32 // how many events in this transaction so far - pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent + pendingPDUs atomic.Int64 // how many PDUs are waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend wakeServerCh chan bool // interrupts idle wait @@ -91,6 +91,7 @@ func (oq *destinationQueue) sendEvent(nid int64) { // If the destination is blacklisted then drop the event. return } + oq.wakeQueueIfNeeded() // Create a transaction ID. We'll either do this if we don't have // one made up yet, or if we've exceeded the number of maximum // events allowed in a single tranaction. We'll reset the counter @@ -117,10 +118,6 @@ func (oq *destinationQueue) sendEvent(nid int64) { // We've successfully added a PDU to the transaction so increase // the counter. oq.transactionCount.Add(1) - // If the queue isn't running at this point then start it. - if !oq.running.Load() { - go oq.backgroundSend() - } // Signal that we've sent a new PDU. This will cause the queue to // wake up if it's asleep. The return to the Add function will only // be 1 if the previous value was 0, e.g. nothing was waiting before. @@ -137,9 +134,7 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { // If the destination is blacklisted then drop the event. return } - if !oq.running.Load() { - go oq.backgroundSend() - } + oq.wakeQueueIfNeeded() oq.incomingEDUs <- ev } @@ -151,10 +146,30 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { // If the destination is blacklisted then drop the event. return } + oq.wakeQueueIfNeeded() + oq.incomingInvites <- ev +} + +func (oq *destinationQueue) wakeQueueIfNeeded() { if !oq.running.Load() { + // Look up how many events are pending in this queue. We need + // to do this so that the queue thinks it has work to do. + count, err := oq.db.GetPendingPDUCount( + context.TODO(), + oq.destination, + ) + if err == nil { + oq.pendingPDUs.Store(count) + log.Printf("Destination queue %q has %d pending PDUs", oq.destination, count) + } else { + log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination) + } + if count > 0 { + oq.wakeServerCh <- true + } + // Then start the queue. go oq.backgroundSend() } - oq.incomingInvites <- ev } // backgroundSend is the worker goroutine for sending events. @@ -170,46 +185,44 @@ func (oq *destinationQueue) backgroundSend() { for { // If we have nothing to do then wait either for incoming events, or // until we hit an idle timeout. - if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 { - select { - case <-oq.wakeServerCh: - // We were woken up because there are new PDUs waiting in the - // database. - case edu := <-oq.incomingEDUs: - // EDUs are handled in-memory for now. We will try to keep - // the ordering intact. - // TODO: Certain EDU types need persistence, e.g. send-to-device - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) - } - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } - case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll get - // restarted automatically the next time we have an event to - // send. - return + select { + case <-oq.wakeServerCh: + // We were woken up because there are new PDUs waiting in the + // database. + case edu := <-oq.incomingEDUs: + // EDUs are handled in-memory for now. We will try to keep + // the ordering intact. + // TODO: Certain EDU types need persistence, e.g. send-to-device + oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + } + case invite := <-oq.incomingInvites: + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) } + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll get + // restarted automatically the next time we have an event to + // send. + return } // If we are backing off this server then wait for the @@ -317,8 +330,10 @@ func (oq *destinationQueue) nextTransaction( // Ask the database for any pending PDUs from the next transaction. // maxPDUsPerTransaction is an upper limit but we probably won't // actually retrieve that many events. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() txid, pdus, err := oq.db.GetNextTransactionPDUs( - context.TODO(), // context + ctx, // context oq.destination, // server name maxPDUsPerTransaction, // max events to retrieve ) @@ -366,7 +381,7 @@ func (oq *destinationQueue) nextTransaction( case nil: // No error was returned so the transaction looks to have // been successfully sent. - oq.pendingPDUs.Sub(int32(len(t.PDUs))) + oq.pendingPDUs.Sub(int64(len(t.PDUs))) // Clean up the transaction in the database. if err = oq.db.CleanTransactionPDUs( context.TODO(), diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index f4df93fa..09d74ed7 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -30,4 +30,5 @@ type Database interface { AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) CleanTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID) error + GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) } diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go index ef7a9f41..bc22825d 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -59,12 +59,17 @@ const selectQueueReferenceJSONCountSQL = "" + "SELECT COUNT(*) FROM federationsender_queue_pdus" + " WHERE json_nid = $1" +const selectQueuePDUsCountSQL = "" + + "SELECT COUNT(*) FROM federationsender_queue_pdus" + + " WHERE server_name = $1" + type queuePDUsStatements struct { insertQueuePDUStmt *sql.Stmt deleteQueueTransactionPDUsStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt selectQueuePDUsByTransactionStmt *sql.Stmt selectQueueReferenceJSONCountStmt *sql.Stmt + selectQueuePDUsCountStmt *sql.Stmt } func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { @@ -87,6 +92,9 @@ func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil { return } + if s.selectQueuePDUsCountStmt, err = db.Prepare(selectQueuePDUsCountSQL); err != nil { + return + } return } @@ -144,6 +152,21 @@ func (s *queuePDUsStatements) selectQueueReferenceJSONCount( return count, err } +func (s *queuePDUsStatements) selectQueuePDUCount( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, +) (int64, error) { + var count int64 + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt) + err := stmt.QueryRowContext(ctx, serverName).Scan(&count) + if err == sql.ErrNoRows { + // It's acceptable for there to be no rows referencing a given + // JSON NID but it's not an error condition. Just return as if + // there's a zero count. + return 0, nil + } + return count, err +} + func (s *queuePDUsStatements) selectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 18d1532a..be28c15d 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -255,3 +255,12 @@ func (d *Database) CleanTransactionPDUs( return nil }) } + +// GetPendingPDUCount returns the number of PDUs waiting to be +// sent for a given servername. +func (d *Database) GetPendingPDUCount( + ctx context.Context, + serverName gomatrixserverlib.ServerName, +) (int64, error) { + return d.selectQueuePDUCount(ctx, nil, serverName) +} diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go index dc08fd70..955ff507 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -60,12 +60,17 @@ const selectQueueReferenceJSONCountSQL = "" + "SELECT COUNT(*) FROM federationsender_queue_pdus" + " WHERE json_nid = $1" +const selectQueuePDUsCountSQL = "" + + "SELECT COUNT(*) FROM federationsender_queue_pdus" + + " WHERE server_name = $1" + type queuePDUsStatements struct { insertQueuePDUStmt *sql.Stmt deleteQueueTransactionPDUsStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt selectQueuePDUsByTransactionStmt *sql.Stmt selectQueueReferenceJSONCountStmt *sql.Stmt + selectQueuePDUsCountStmt *sql.Stmt } func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { @@ -88,6 +93,9 @@ func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil { return } + if s.selectQueuePDUsCountStmt, err = db.Prepare(selectQueuePDUsCountSQL); err != nil { + return + } return } @@ -142,6 +150,21 @@ func (s *queuePDUsStatements) selectQueueReferenceJSONCount( return count, err } +func (s *queuePDUsStatements) selectQueuePDUCount( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, +) (int64, error) { + var count int64 + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt) + err := stmt.QueryRowContext(ctx, serverName).Scan(&count) + if err == sql.ErrNoRows { + // It's acceptable for there to be no rows referencing a given + // JSON NID but it's not an error condition. Just return as if + // there's a zero count. + return 0, nil + } + return count, err +} + func (s *queuePDUsStatements) selectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 7629ecd2..30ac81bf 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -261,3 +261,12 @@ func (d *Database) CleanTransactionPDUs( return nil }) } + +// GetPendingPDUCount returns the number of PDUs waiting to be +// sent for a given servername. +func (d *Database) GetPendingPDUCount( + ctx context.Context, + serverName gomatrixserverlib.ServerName, +) (int64, error) { + return d.selectQueuePDUCount(ctx, nil, serverName) +} |