aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-10 16:28:18 +0100
committerNeil Alexander <neilalexander@users.noreply.github.com>2020-07-10 16:28:18 +0100
commit08e9d996b6e9f726425d815820e3857c12ebc0b3 (patch)
tree6202a6b0c68d0ba2b36688732ff899b91f157f15 /federationsender
parentabf26c12f1a97fd2894a0509de9cf4a91c79d3ab (diff)
Yggdrasil demo updates
Squashed commit of the following: commit 6c2c48f862c1b6f8e741c57804282eceffe02487 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 16:28:09 2020 +0100 Add README.md commit 5eeefdadf8e3881dd7a32559a92be49bd7ddaf47 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 10:18:50 2020 +0100 Fix wedge in federation sender commit e2ebffbfba25cf82378393940a613ec32bfb909f Merge: 0883ef88 abf26c12 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 09:51:23 2020 +0100 Merge branch 'master' into neilalexander/yggdrasil commit 0883ef8870e340f2ae9a0c37ed939dc2ab9911f6 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 09:51:06 2020 +0100 Adjust timeouts commit ba2d53199910f13b60cc892debe96a962e8c9acb Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 16:34:40 2020 +0100 Try to wake up from peers/sessions properly commit 73f42eb494741ba5b0e0cef43654708e3c8eb399 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 15:43:38 2020 +0100 Use TransactionWriter to reduce database lock issues on SQLite commit 08bfe63241a18c58c539c91b9f52edccda63a611 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 12:38:02 2020 +0100 Un-wedge federation Squashed commit of the following: commit aee933f8785e7a7998105f6090f514d18051a1bd Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 12:22:41 2020 +0100 Un-goroutine the goroutines commit 478374e5d18a3056cac6682ef9095d41352d1295 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 12:09:31 2020 +0100 Reduce federation sender wedges commit 40cc62c54d9e3a863868214c48b7c18e522a4772 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 10:02:52 2020 +0100 Handle switching in/out background more reliably
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/queue/destinationqueue.go7
-rw-r--r--federationsender/storage/sqlite3/storage.go40
2 files changed, 24 insertions, 23 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 82cb343f..57c8cff6 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -256,7 +256,10 @@ func (oq *destinationQueue) backgroundSend() {
// PDUs waiting to be sent. By sending a message into the wake chan,
// the next loop iteration will try processing these PDUs again,
// subject to the backoff.
- oq.notifyPDUs <- true
+ select {
+ case oq.notifyPDUs <- true:
+ default:
+ }
}
} else if transaction {
// If we successfully sent the transaction then clear out
@@ -384,7 +387,7 @@ func (oq *destinationQueue) nextTransaction(
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
- ctx, cancel = context.WithTimeout(context.Background(), time.Second*15)
+ ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
_, err = oq.client.SendTransaction(ctx, t)
switch err.(type) {
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index 7fe6b65b..1a4715bf 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -239,39 +239,37 @@ func (d *Database) CleanTransactionPDUs(
serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
) error {
- var err error
- var nids []int64
var deleteNIDs []int64
+ nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50)
+ if err != nil {
+ return fmt.Errorf("d.selectQueuePDUs: %w", err)
+ }
if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
- nids, err = d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
- if err != nil {
- return fmt.Errorf("d.selectQueuePDUs: %w", err)
- }
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
}
- var count int64
- for _, nid := range nids {
- count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
- if err != nil {
- return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
- }
- if count == 0 {
- deleteNIDs = append(deleteNIDs, nid)
- }
- }
return nil
}); err != nil {
return err
}
- err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
- if len(deleteNIDs) > 0 {
+ var count int64
+ for _, nid := range nids {
+ count, err = d.selectQueueReferenceJSONCount(ctx, nil, nid)
+ if err != nil {
+ return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
+ }
+ if count == 0 {
+ deleteNIDs = append(deleteNIDs, nid)
+ }
+ }
+ if len(deleteNIDs) > 0 {
+ err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
return fmt.Errorf("d.deleteQueueJSON: %w", err)
}
- }
- return nil
- })
+ return nil
+ })
+ }
return err
}