aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-27 11:05:41 +0100
committerGitHub <noreply@github.com>2020-08-27 11:05:41 +0100
commit7466e6b7186610ee8696c2d4db7aa1138c24adbe (patch)
tree37d4ee37c064134c54b1a21bc777de7b20a3ff70 /federationsender
parent421b6b2313594a1810f414c21b43706b2eaaba2b (diff)
Fix lock errors in federation sender (#1347)
* Fix lock errors in federation sender * Additional fix to writers
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/storage/shared/storage_edus.go7
-rw-r--r--federationsender/storage/shared/storage_pdus.go4
2 files changed, 5 insertions, 6 deletions
diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go
index 75a6dd51..529b46aa 100644
--- a/federationsender/storage/shared/storage_edus.go
+++ b/federationsender/storage/shared/storage_edus.go
@@ -21,7 +21,6 @@ import (
"errors"
"fmt"
- "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -33,7 +32,7 @@ func (d *Database) AssociateEDUWithDestination(
serverName gomatrixserverlib.ServerName,
receipt *Receipt,
) error {
- return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
for _, nid := range receipt.nids {
if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
ctx, // context
@@ -60,7 +59,7 @@ func (d *Database) GetNextTransactionEDUs(
receipt *Receipt,
err error,
) {
- err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err)
@@ -99,7 +98,7 @@ func (d *Database) CleanEDUs(
return errors.New("expected receipt")
}
- return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil {
return err
}
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 00588956..4b51146d 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -34,7 +34,7 @@ func (d *Database) AssociatePDUWithDestination(
serverName gomatrixserverlib.ServerName,
receipt *Receipt,
) error {
- return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
for _, nid := range receipt.nids {
if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
ctx, // context
@@ -111,7 +111,7 @@ func (d *Database) CleanPDUs(
return errors.New("expected receipt")
}
- return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
return err
}