diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-10 16:28:18 +0100 |
---|---|---|
committer | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-10 16:28:18 +0100 |
commit | 08e9d996b6e9f726425d815820e3857c12ebc0b3 (patch) | |
tree | 6202a6b0c68d0ba2b36688732ff899b91f157f15 /federationsender | |
parent | abf26c12f1a97fd2894a0509de9cf4a91c79d3ab (diff) |
Yggdrasil demo updates
Squashed commit of the following:
commit 6c2c48f862c1b6f8e741c57804282eceffe02487
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Fri Jul 10 16:28:09 2020 +0100
Add README.md
commit 5eeefdadf8e3881dd7a32559a92be49bd7ddaf47
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Fri Jul 10 10:18:50 2020 +0100
Fix wedge in federation sender
commit e2ebffbfba25cf82378393940a613ec32bfb909f
Merge: 0883ef88 abf26c12
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Fri Jul 10 09:51:23 2020 +0100
Merge branch 'master' into neilalexander/yggdrasil
commit 0883ef8870e340f2ae9a0c37ed939dc2ab9911f6
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Fri Jul 10 09:51:06 2020 +0100
Adjust timeouts
commit ba2d53199910f13b60cc892debe96a962e8c9acb
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 9 16:34:40 2020 +0100
Try to wake up from peers/sessions properly
commit 73f42eb494741ba5b0e0cef43654708e3c8eb399
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 9 15:43:38 2020 +0100
Use TransactionWriter to reduce database lock issues on SQLite
commit 08bfe63241a18c58c539c91b9f52edccda63a611
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 9 12:38:02 2020 +0100
Un-wedge federation
Squashed commit of the following:
commit aee933f8785e7a7998105f6090f514d18051a1bd
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 9 12:22:41 2020 +0100
Un-goroutine the goroutines
commit 478374e5d18a3056cac6682ef9095d41352d1295
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 9 12:09:31 2020 +0100
Reduce federation sender wedges
commit 40cc62c54d9e3a863868214c48b7c18e522a4772
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Thu Jul 9 10:02:52 2020 +0100
Handle switching in/out background more reliably
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/queue/destinationqueue.go | 7 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/storage.go | 40 |
2 files changed, 24 insertions, 23 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 82cb343f..57c8cff6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -256,7 +256,10 @@ func (oq *destinationQueue) backgroundSend() { // PDUs waiting to be sent. By sending a message into the wake chan, // the next loop iteration will try processing these PDUs again, // subject to the backoff. - oq.notifyPDUs <- true + select { + case oq.notifyPDUs <- true: + default: + } } } else if transaction { // If we successfully sent the transaction then clear out @@ -384,7 +387,7 @@ func (oq *destinationQueue) nextTransaction( // TODO: we should check for 500-ish fails vs 400-ish here, // since we shouldn't queue things indefinitely in response // to a 400-ish error - ctx, cancel = context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() _, err = oq.client.SendTransaction(ctx, t) switch err.(type) { diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 7fe6b65b..1a4715bf 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -239,39 +239,37 @@ func (d *Database) CleanTransactionPDUs( serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, ) error { - var err error - var nids []int64 var deleteNIDs []int64 + nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error { - nids, err = d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) - if err != nil { - return fmt.Errorf("d.selectQueuePDUs: %w", err) - } if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { return fmt.Errorf("d.deleteQueueTransaction: %w", err) } - var count int64 - for _, nid := range nids { - count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid) - if err != nil { - return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) - } - if count == 0 { - deleteNIDs = append(deleteNIDs, nid) - } - } return nil }); err != nil { return err } - err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error { - if len(deleteNIDs) > 0 { + var count int64 + for _, nid := range nids { + count, err = d.selectQueueReferenceJSONCount(ctx, nil, nid) + if err != nil { + return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) + } + if count == 0 { + deleteNIDs = append(deleteNIDs, nid) + } + } + if len(deleteNIDs) > 0 { + err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error { if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { return fmt.Errorf("d.deleteQueueJSON: %w", err) } - } - return nil - }) + return nil + }) + } return err } |