aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-21 10:42:08 +0100
committerGitHub <noreply@github.com>2020-08-21 10:42:08 +0100
commit9d53351dc20283103bf2eec6b92831033d06c5a8 (patch)
tree653cf0ddca3f777bcdba188187fb78fe39ae2b02 /federationsender
parent5aaf32bbed4d704d5a22ad7dff79f7a68002a213 (diff)
Component-wide TransactionWriters (#1290)
* Offset updates take place using TransactionWriter * Refactor TransactionWriter in current state server * Refactor TransactionWriter in federation sender * Refactor TransactionWriter in key server * Refactor TransactionWriter in media API * Refactor TransactionWriter in server key API * Refactor TransactionWriter in sync API * Refactor TransactionWriter in user API * Fix deadlocking Sync API tests * Un-deadlock device database * Fix appservice API * Rename TransactionWriters to Writers * Move writers up a layer in sync API * Document sqlutil.Writer interface * Add note to Writer documentation
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/storage/postgres/storage.go7
-rw-r--r--federationsender/storage/shared/storage.go18
-rw-r--r--federationsender/storage/sqlite3/blacklist_table.go20
-rw-r--r--federationsender/storage/sqlite3/joined_hosts_table.go26
-rw-r--r--federationsender/storage/sqlite3/queue_edus_table.go30
-rw-r--r--federationsender/storage/sqlite3/queue_json_table.go33
-rw-r--r--federationsender/storage/sqlite3/queue_pdus_table.go30
-rw-r--r--federationsender/storage/sqlite3/room_table.go18
-rw-r--r--federationsender/storage/sqlite3/storage.go7
9 files changed, 84 insertions, 105 deletions
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index b65ff0b6..b3b4da39 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -27,7 +27,8 @@ import (
type Database struct {
shared.Database
sqlutil.PartitionOffsetStatements
- db *sql.DB
+ db *sql.DB
+ writer sqlutil.Writer
}
// NewDatabase opens a new database
@@ -37,6 +38,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
+ d.writer = sqlutil.NewDummyWriter()
joinedHosts, err := NewPostgresJoinedHostsTable(d.db)
if err != nil {
return nil, err
@@ -63,6 +65,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
}
d.Database = shared.Database{
DB: d.db,
+ Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs,
@@ -70,7 +73,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
}
- if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil {
+ if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
}
return &d, nil
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index 4a681de6..4e347259 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -28,6 +28,7 @@ import (
type Database struct {
DB *sql.DB
+ Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON
@@ -64,7 +65,7 @@ func (d *Database) UpdateRoom(
addHosts []types.JoinedHost,
removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) {
- err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID)
if err != nil {
return err
@@ -133,7 +134,12 @@ func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string)
func (d *Database) StoreJSON(
ctx context.Context, js string,
) (*Receipt, error) {
- nid, err := d.FederationSenderQueueJSON.InsertQueueJSON(ctx, nil, js)
+ var nid int64
+ var err error
+ _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js)
+ return nil
+ })
if err != nil {
return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
}
@@ -143,11 +149,15 @@ func (d *Database) StoreJSON(
}
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
- return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), nil, serverName)
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName)
+ })
}
func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
- return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), nil, serverName)
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), txn, serverName)
+ })
}
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
diff --git a/federationsender/storage/sqlite3/blacklist_table.go b/federationsender/storage/sqlite3/blacklist_table.go
index b23bfcba..90b44ac9 100644
--- a/federationsender/storage/sqlite3/blacklist_table.go
+++ b/federationsender/storage/sqlite3/blacklist_table.go
@@ -42,7 +42,6 @@ const deleteBlacklistSQL = "" +
type blacklistStatements struct {
db *sql.DB
- writer sqlutil.TransactionWriter
insertBlacklistStmt *sql.Stmt
selectBlacklistStmt *sql.Stmt
deleteBlacklistStmt *sql.Stmt
@@ -50,8 +49,7 @@ type blacklistStatements struct {
func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
s = &blacklistStatements{
- db: db,
- writer: sqlutil.NewTransactionWriter(),
+ db: db,
}
_, err = db.Exec(blacklistSchema)
if err != nil {
@@ -75,11 +73,9 @@ func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
func (s *blacklistStatements) InsertBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.insertBlacklistStmt)
- _, err := stmt.ExecContext(ctx, serverName)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, s.insertBlacklistStmt)
+ _, err := stmt.ExecContext(ctx, serverName)
+ return err
}
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
@@ -105,9 +101,7 @@ func (s *blacklistStatements) SelectBlacklist(
func (s *blacklistStatements) DeleteBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.deleteBlacklistStmt)
- _, err := stmt.ExecContext(ctx, serverName)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, s.deleteBlacklistStmt)
+ _, err := stmt.ExecContext(ctx, serverName)
+ return err
}
diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go
index 5dc18f4e..3bc45e7d 100644
--- a/federationsender/storage/sqlite3/joined_hosts_table.go
+++ b/federationsender/storage/sqlite3/joined_hosts_table.go
@@ -65,7 +65,6 @@ const selectJoinedHostsForRoomsSQL = "" +
type joinedHostsStatements struct {
db *sql.DB
- writer sqlutil.TransactionWriter
insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt
@@ -75,8 +74,7 @@ type joinedHostsStatements struct {
func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
s = &joinedHostsStatements{
- db: db,
- writer: sqlutil.NewTransactionWriter(),
+ db: db,
}
_, err = db.Exec(joinedHostsSchema)
if err != nil {
@@ -103,25 +101,21 @@ func (s *joinedHostsStatements) InsertJoinedHosts(
roomID, eventID string,
serverName gomatrixserverlib.ServerName,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt)
- _, err := stmt.ExecContext(ctx, roomID, eventID, serverName)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt)
+ _, err := stmt.ExecContext(ctx, roomID, eventID, serverName)
+ return err
}
func (s *joinedHostsStatements) DeleteJoinedHosts(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- for _, eventID := range eventIDs {
- stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt)
- if _, err := stmt.ExecContext(ctx, eventID); err != nil {
- return err
- }
+ for _, eventID := range eventIDs {
+ stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt)
+ if _, err := stmt.ExecContext(ctx, eventID); err != nil {
+ return err
}
- return nil
- })
+ }
+ return nil
}
func (s *joinedHostsStatements) SelectJoinedHostsWithTx(
diff --git a/federationsender/storage/sqlite3/queue_edus_table.go b/federationsender/storage/sqlite3/queue_edus_table.go
index 2abcc105..a6d60950 100644
--- a/federationsender/storage/sqlite3/queue_edus_table.go
+++ b/federationsender/storage/sqlite3/queue_edus_table.go
@@ -64,7 +64,6 @@ const selectQueueServerNamesSQL = "" +
type queueEDUsStatements struct {
db *sql.DB
- writer sqlutil.TransactionWriter
insertQueueEDUStmt *sql.Stmt
selectQueueEDUStmt *sql.Stmt
selectQueueEDUReferenceJSONCountStmt *sql.Stmt
@@ -74,8 +73,7 @@ type queueEDUsStatements struct {
func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
s = &queueEDUsStatements{
- db: db,
- writer: sqlutil.NewTransactionWriter(),
+ db: db,
}
_, err = db.Exec(queueEDUsSchema)
if err != nil {
@@ -106,16 +104,14 @@ func (s *queueEDUsStatements) InsertQueueEDU(
serverName gomatrixserverlib.ServerName,
nid int64,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
- _, err := stmt.ExecContext(
- ctx,
- eduType, // the EDU type
- serverName, // destination server name
- nid, // JSON blob NID
- )
- return err
- })
+ stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
+ _, err := stmt.ExecContext(
+ ctx,
+ eduType, // the EDU type
+ serverName, // destination server name
+ nid, // JSON blob NID
+ )
+ return err
}
func (s *queueEDUsStatements) DeleteQueueEDUs(
@@ -135,11 +131,9 @@ func (s *queueEDUsStatements) DeleteQueueEDUs(
params[k+1] = v
}
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, deleteStmt)
- _, err := stmt.ExecContext(ctx, params...)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, deleteStmt)
+ _, err = stmt.ExecContext(ctx, params...)
+ return err
}
func (s *queueEDUsStatements) SelectQueueEDUs(
diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go
index 867ffd44..3e3f60f6 100644
--- a/federationsender/storage/sqlite3/queue_json_table.go
+++ b/federationsender/storage/sqlite3/queue_json_table.go
@@ -50,7 +50,6 @@ const selectJSONSQL = "" +
type queueJSONStatements struct {
db *sql.DB
- writer sqlutil.TransactionWriter
insertJSONStmt *sql.Stmt
//deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic
//selectJSONStmt *sql.Stmt - prepared at runtime due to variadic
@@ -58,8 +57,7 @@ type queueJSONStatements struct {
func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) {
s = &queueJSONStatements{
- db: db,
- writer: sqlutil.NewTransactionWriter(),
+ db: db,
}
_, err = db.Exec(queueJSONSchema)
if err != nil {
@@ -74,18 +72,15 @@ func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) {
func (s *queueJSONStatements) InsertQueueJSON(
ctx context.Context, txn *sql.Tx, json string,
) (lastid int64, err error) {
- err = s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
- res, err := stmt.ExecContext(ctx, json)
- if err != nil {
- return fmt.Errorf("stmt.QueryContext: %w", err)
- }
- lastid, err = res.LastInsertId()
- if err != nil {
- return fmt.Errorf("res.LastInsertId: %w", err)
- }
- return nil
- })
+ stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
+ res, err := stmt.ExecContext(ctx, json)
+ if err != nil {
+ return 0, fmt.Errorf("stmt.QueryContext: %w", err)
+ }
+ lastid, err = res.LastInsertId()
+ if err != nil {
+ return 0, fmt.Errorf("res.LastInsertId: %w", err)
+ }
return
}
@@ -103,11 +98,9 @@ func (s *queueJSONStatements) DeleteQueueJSON(
iNIDs[k] = v
}
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, deleteStmt)
- _, err = stmt.ExecContext(ctx, iNIDs...)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, deleteStmt)
+ _, err = stmt.ExecContext(ctx, iNIDs...)
+ return err
}
func (s *queueJSONStatements) SelectQueueJSON(
diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go
index 538ba3db..70519c9e 100644
--- a/federationsender/storage/sqlite3/queue_pdus_table.go
+++ b/federationsender/storage/sqlite3/queue_pdus_table.go
@@ -71,7 +71,6 @@ const selectQueuePDUsServerNamesSQL = "" +
type queuePDUsStatements struct {
db *sql.DB
- writer sqlutil.TransactionWriter
insertQueuePDUStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt
selectQueuePDUsByTransactionStmt *sql.Stmt
@@ -83,8 +82,7 @@ type queuePDUsStatements struct {
func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
s = &queuePDUsStatements{
- db: db,
- writer: sqlutil.NewTransactionWriter(),
+ db: db,
}
_, err = db.Exec(queuePDUsSchema)
if err != nil {
@@ -121,16 +119,14 @@ func (s *queuePDUsStatements) InsertQueuePDU(
serverName gomatrixserverlib.ServerName,
nid int64,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
- _, err := stmt.ExecContext(
- ctx,
- transactionID, // the transaction ID that we initially attempted
- serverName, // destination server name
- nid, // JSON blob NID
- )
- return err
- })
+ stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
+ _, err := stmt.ExecContext(
+ ctx,
+ transactionID, // the transaction ID that we initially attempted
+ serverName, // destination server name
+ nid, // JSON blob NID
+ )
+ return err
}
func (s *queuePDUsStatements) DeleteQueuePDUs(
@@ -150,11 +146,9 @@ func (s *queuePDUsStatements) DeleteQueuePDUs(
params[k+1] = v
}
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, deleteStmt)
- _, err := stmt.ExecContext(ctx, params...)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, deleteStmt)
+ _, err = stmt.ExecContext(ctx, params...)
+ return err
}
func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID(
diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go
index 9a439fad..0710ccca 100644
--- a/federationsender/storage/sqlite3/room_table.go
+++ b/federationsender/storage/sqlite3/room_table.go
@@ -44,7 +44,6 @@ const updateRoomSQL = "" +
type roomStatements struct {
db *sql.DB
- writer sqlutil.TransactionWriter
insertRoomStmt *sql.Stmt
selectRoomForUpdateStmt *sql.Stmt
updateRoomStmt *sql.Stmt
@@ -52,8 +51,7 @@ type roomStatements struct {
func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) {
s = &roomStatements{
- db: db,
- writer: sqlutil.NewTransactionWriter(),
+ db: db,
}
_, err = db.Exec(roomSchema)
if err != nil {
@@ -77,10 +75,8 @@ func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) {
func (s *roomStatements) InsertRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
- return err
- })
+ _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
+ return err
}
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
@@ -103,9 +99,7 @@ func (s *roomStatements) SelectRoomForUpdate(
func (s *roomStatements) UpdateRoom(
ctx context.Context, txn *sql.Tx, roomID, lastEventID string,
) error {
- return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
- stmt := sqlutil.TxStmt(txn, s.updateRoomStmt)
- _, err := stmt.ExecContext(ctx, roomID, lastEventID)
- return err
- })
+ stmt := sqlutil.TxStmt(txn, s.updateRoomStmt)
+ _, err := stmt.ExecContext(ctx, roomID, lastEventID)
+ return err
}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index 41b91871..ba467f02 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -29,7 +29,8 @@ import (
type Database struct {
shared.Database
sqlutil.PartitionOffsetStatements
- db *sql.DB
+ db *sql.DB
+ writer sqlutil.Writer
}
// NewDatabase opens a new database
@@ -39,6 +40,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
+ d.writer = sqlutil.NewExclusiveWriter()
joinedHosts, err := NewSQLiteJoinedHostsTable(d.db)
if err != nil {
return nil, err
@@ -65,6 +67,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
}
d.Database = shared.Database{
DB: d.db,
+ Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs,
@@ -72,7 +75,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
}
- if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil {
+ if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
}
return &d, nil