From 08e9d996b6e9f726425d815820e3857c12ebc0b3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 10 Jul 2020 16:28:18 +0100 Subject: Yggdrasil demo updates Squashed commit of the following: commit 6c2c48f862c1b6f8e741c57804282eceffe02487 Author: Neil Alexander Date: Fri Jul 10 16:28:09 2020 +0100 Add README.md commit 5eeefdadf8e3881dd7a32559a92be49bd7ddaf47 Author: Neil Alexander Date: Fri Jul 10 10:18:50 2020 +0100 Fix wedge in federation sender commit e2ebffbfba25cf82378393940a613ec32bfb909f Merge: 0883ef88 abf26c12 Author: Neil Alexander Date: Fri Jul 10 09:51:23 2020 +0100 Merge branch 'master' into neilalexander/yggdrasil commit 0883ef8870e340f2ae9a0c37ed939dc2ab9911f6 Author: Neil Alexander Date: Fri Jul 10 09:51:06 2020 +0100 Adjust timeouts commit ba2d53199910f13b60cc892debe96a962e8c9acb Author: Neil Alexander Date: Thu Jul 9 16:34:40 2020 +0100 Try to wake up from peers/sessions properly commit 73f42eb494741ba5b0e0cef43654708e3c8eb399 Author: Neil Alexander Date: Thu Jul 9 15:43:38 2020 +0100 Use TransactionWriter to reduce database lock issues on SQLite commit 08bfe63241a18c58c539c91b9f52edccda63a611 Author: Neil Alexander Date: Thu Jul 9 12:38:02 2020 +0100 Un-wedge federation Squashed commit of the following: commit aee933f8785e7a7998105f6090f514d18051a1bd Author: Neil Alexander Date: Thu Jul 9 12:22:41 2020 +0100 Un-goroutine the goroutines commit 478374e5d18a3056cac6682ef9095d41352d1295 Author: Neil Alexander Date: Thu Jul 9 12:09:31 2020 +0100 Reduce federation sender wedges commit 40cc62c54d9e3a863868214c48b7c18e522a4772 Author: Neil Alexander Date: Thu Jul 9 10:02:52 2020 +0100 Handle switching in/out background more reliably --- federationsender/queue/destinationqueue.go | 7 +++-- federationsender/storage/sqlite3/storage.go | 40 ++++++++++++++--------------- 2 files changed, 24 insertions(+), 23 deletions(-) (limited to 'federationsender') diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 82cb343f..57c8cff6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -256,7 +256,10 @@ func (oq *destinationQueue) backgroundSend() { // PDUs waiting to be sent. By sending a message into the wake chan, // the next loop iteration will try processing these PDUs again, // subject to the backoff. - oq.notifyPDUs <- true + select { + case oq.notifyPDUs <- true: + default: + } } } else if transaction { // If we successfully sent the transaction then clear out @@ -384,7 +387,7 @@ func (oq *destinationQueue) nextTransaction( // TODO: we should check for 500-ish fails vs 400-ish here, // since we shouldn't queue things indefinitely in response // to a 400-ish error - ctx, cancel = context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() _, err = oq.client.SendTransaction(ctx, t) switch err.(type) { diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 7fe6b65b..1a4715bf 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -239,39 +239,37 @@ func (d *Database) CleanTransactionPDUs( serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, ) error { - var err error - var nids []int64 var deleteNIDs []int64 + nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error { - nids, err = d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) - if err != nil { - return fmt.Errorf("d.selectQueuePDUs: %w", err) - } if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { return fmt.Errorf("d.deleteQueueTransaction: %w", err) } - var count int64 - for _, nid := range nids { - count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid) - if err != nil { - return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) - } - if count == 0 { - deleteNIDs = append(deleteNIDs, nid) - } - } return nil }); err != nil { return err } - err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error { - if len(deleteNIDs) > 0 { + var count int64 + for _, nid := range nids { + count, err = d.selectQueueReferenceJSONCount(ctx, nil, nid) + if err != nil { + return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) + } + if count == 0 { + deleteNIDs = append(deleteNIDs, nid) + } + } + if len(deleteNIDs) > 0 { + err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error { if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { return fmt.Errorf("d.deleteQueueJSON: %w", err) } - } - return nil - }) + return nil + }) + } return err } -- cgit v1.2.3