diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-27 11:05:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-27 11:05:41 +0100 |
commit | 7466e6b7186610ee8696c2d4db7aa1138c24adbe (patch) | |
tree | 37d4ee37c064134c54b1a21bc777de7b20a3ff70 /federationsender/storage/shared | |
parent | 421b6b2313594a1810f414c21b43706b2eaaba2b (diff) |
Fix lock errors in federation sender (#1347)
* Fix lock errors in federation sender
* Additional fix to writers
Diffstat (limited to 'federationsender/storage/shared')
-rw-r--r-- | federationsender/storage/shared/storage_edus.go | 7 | ||||
-rw-r--r-- | federationsender/storage/shared/storage_pdus.go | 4 |
2 files changed, 5 insertions, 6 deletions
diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go index 75a6dd51..529b46aa 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationsender/storage/shared/storage_edus.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) @@ -33,7 +32,7 @@ func (d *Database) AssociateEDUWithDestination( serverName gomatrixserverlib.ServerName, receipt *Receipt, ) error { - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { for _, nid := range receipt.nids { if err := d.FederationSenderQueueEDUs.InsertQueueEDU( ctx, // context @@ -60,7 +59,7 @@ func (d *Database) GetNextTransactionEDUs( receipt *Receipt, err error, ) { - err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueueEDUs: %w", err) @@ -99,7 +98,7 @@ func (d *Database) CleanEDUs( return errors.New("expected receipt") } - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil { return err } diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go index 00588956..4b51146d 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationsender/storage/shared/storage_pdus.go @@ -34,7 +34,7 @@ func (d *Database) AssociatePDUWithDestination( serverName gomatrixserverlib.ServerName, receipt *Receipt, ) error { - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { for _, nid := range receipt.nids { if err := d.FederationSenderQueuePDUs.InsertQueuePDU( ctx, // context @@ -111,7 +111,7 @@ func (d *Database) CleanPDUs( return errors.New("expected receipt") } - return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil { return err } |