diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-21 10:42:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-21 10:42:08 +0100 |
commit | 9d53351dc20283103bf2eec6b92831033d06c5a8 (patch) | |
tree | 653cf0ddca3f777bcdba188187fb78fe39ae2b02 /federationsender/storage | |
parent | 5aaf32bbed4d704d5a22ad7dff79f7a68002a213 (diff) |
Component-wide TransactionWriters (#1290)
* Offset updates take place using TransactionWriter
* Refactor TransactionWriter in current state server
* Refactor TransactionWriter in federation sender
* Refactor TransactionWriter in key server
* Refactor TransactionWriter in media API
* Refactor TransactionWriter in server key API
* Refactor TransactionWriter in sync API
* Refactor TransactionWriter in user API
* Fix deadlocking Sync API tests
* Un-deadlock device database
* Fix appservice API
* Rename TransactionWriters to Writers
* Move writers up a layer in sync API
* Document sqlutil.Writer interface
* Add note to Writer documentation
Diffstat (limited to 'federationsender/storage')
-rw-r--r-- | federationsender/storage/postgres/storage.go | 7 | ||||
-rw-r--r-- | federationsender/storage/shared/storage.go | 18 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/blacklist_table.go | 20 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/joined_hosts_table.go | 26 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/queue_edus_table.go | 30 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/queue_json_table.go | 33 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/queue_pdus_table.go | 30 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/room_table.go | 18 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/storage.go | 7 |
9 files changed, 84 insertions, 105 deletions
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index b65ff0b6..b3b4da39 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -27,7 +27,8 @@ import ( type Database struct { shared.Database sqlutil.PartitionOffsetStatements - db *sql.DB + db *sql.DB + writer sqlutil.Writer } // NewDatabase opens a new database @@ -37,6 +38,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + d.writer = sqlutil.NewDummyWriter() joinedHosts, err := NewPostgresJoinedHostsTable(d.db) if err != nil { return nil, err @@ -63,6 +65,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { } d.Database = shared.Database{ DB: d.db, + Writer: d.writer, FederationSenderJoinedHosts: joinedHosts, FederationSenderQueuePDUs: queuePDUs, FederationSenderQueueEDUs: queueEDUs, @@ -70,7 +73,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, } - if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil { + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err } return &d, nil diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 4a681de6..4e347259 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -28,6 +28,7 @@ import ( type Database struct { DB *sql.DB + Writer sqlutil.Writer FederationSenderQueuePDUs tables.FederationSenderQueuePDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueJSON tables.FederationSenderQueueJSON @@ -64,7 +65,7 @@ func (d *Database) UpdateRoom( addHosts []types.JoinedHost, removeHosts []string, ) (joinedHosts []types.JoinedHost, err error) { - err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID) if err != nil { return err @@ -133,7 +134,12 @@ func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) func (d *Database) StoreJSON( ctx context.Context, js string, ) (*Receipt, error) { - nid, err := d.FederationSenderQueueJSON.InsertQueueJSON(ctx, nil, js) + var nid int64 + var err error + _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js) + return nil + }) if err != nil { return nil, fmt.Errorf("d.insertQueueJSON: %w", err) } @@ -143,11 +149,15 @@ func (d *Database) StoreJSON( } func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { - return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), nil, serverName) + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName) + }) } func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error { - return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), nil, serverName) + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), txn, serverName) + }) } func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { diff --git a/federationsender/storage/sqlite3/blacklist_table.go b/federationsender/storage/sqlite3/blacklist_table.go index b23bfcba..90b44ac9 100644 --- a/federationsender/storage/sqlite3/blacklist_table.go +++ b/federationsender/storage/sqlite3/blacklist_table.go @@ -42,7 +42,6 @@ const deleteBlacklistSQL = "" + type blacklistStatements struct { db *sql.DB - writer sqlutil.TransactionWriter insertBlacklistStmt *sql.Stmt selectBlacklistStmt *sql.Stmt deleteBlacklistStmt *sql.Stmt @@ -50,8 +49,7 @@ type blacklistStatements struct { func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { s = &blacklistStatements{ - db: db, - writer: sqlutil.NewTransactionWriter(), + db: db, } _, err = db.Exec(blacklistSchema) if err != nil { @@ -75,11 +73,9 @@ func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { func (s *blacklistStatements) InsertBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.insertBlacklistStmt) - _, err := stmt.ExecContext(ctx, serverName) - return err - }) + stmt := sqlutil.TxStmt(txn, s.insertBlacklistStmt) + _, err := stmt.ExecContext(ctx, serverName) + return err } // selectRoomForUpdate locks the row for the room and returns the last_event_id. @@ -105,9 +101,7 @@ func (s *blacklistStatements) SelectBlacklist( func (s *blacklistStatements) DeleteBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.deleteBlacklistStmt) - _, err := stmt.ExecContext(ctx, serverName) - return err - }) + stmt := sqlutil.TxStmt(txn, s.deleteBlacklistStmt) + _, err := stmt.ExecContext(ctx, serverName) + return err } diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go index 5dc18f4e..3bc45e7d 100644 --- a/federationsender/storage/sqlite3/joined_hosts_table.go +++ b/federationsender/storage/sqlite3/joined_hosts_table.go @@ -65,7 +65,6 @@ const selectJoinedHostsForRoomsSQL = "" + type joinedHostsStatements struct { db *sql.DB - writer sqlutil.TransactionWriter insertJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt @@ -75,8 +74,7 @@ type joinedHostsStatements struct { func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) { s = &joinedHostsStatements{ - db: db, - writer: sqlutil.NewTransactionWriter(), + db: db, } _, err = db.Exec(joinedHostsSchema) if err != nil { @@ -103,25 +101,21 @@ func (s *joinedHostsStatements) InsertJoinedHosts( roomID, eventID string, serverName gomatrixserverlib.ServerName, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt) - _, err := stmt.ExecContext(ctx, roomID, eventID, serverName) - return err - }) + stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt) + _, err := stmt.ExecContext(ctx, roomID, eventID, serverName) + return err } func (s *joinedHostsStatements) DeleteJoinedHosts( ctx context.Context, txn *sql.Tx, eventIDs []string, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - for _, eventID := range eventIDs { - stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt) - if _, err := stmt.ExecContext(ctx, eventID); err != nil { - return err - } + for _, eventID := range eventIDs { + stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt) + if _, err := stmt.ExecContext(ctx, eventID); err != nil { + return err } - return nil - }) + } + return nil } func (s *joinedHostsStatements) SelectJoinedHostsWithTx( diff --git a/federationsender/storage/sqlite3/queue_edus_table.go b/federationsender/storage/sqlite3/queue_edus_table.go index 2abcc105..a6d60950 100644 --- a/federationsender/storage/sqlite3/queue_edus_table.go +++ b/federationsender/storage/sqlite3/queue_edus_table.go @@ -64,7 +64,6 @@ const selectQueueServerNamesSQL = "" + type queueEDUsStatements struct { db *sql.DB - writer sqlutil.TransactionWriter insertQueueEDUStmt *sql.Stmt selectQueueEDUStmt *sql.Stmt selectQueueEDUReferenceJSONCountStmt *sql.Stmt @@ -74,8 +73,7 @@ type queueEDUsStatements struct { func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { s = &queueEDUsStatements{ - db: db, - writer: sqlutil.NewTransactionWriter(), + db: db, } _, err = db.Exec(queueEDUsSchema) if err != nil { @@ -106,16 +104,14 @@ func (s *queueEDUsStatements) InsertQueueEDU( serverName gomatrixserverlib.ServerName, nid int64, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) - _, err := stmt.ExecContext( - ctx, - eduType, // the EDU type - serverName, // destination server name - nid, // JSON blob NID - ) - return err - }) + stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) + _, err := stmt.ExecContext( + ctx, + eduType, // the EDU type + serverName, // destination server name + nid, // JSON blob NID + ) + return err } func (s *queueEDUsStatements) DeleteQueueEDUs( @@ -135,11 +131,9 @@ func (s *queueEDUsStatements) DeleteQueueEDUs( params[k+1] = v } - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, deleteStmt) - _, err := stmt.ExecContext(ctx, params...) - return err - }) + stmt := sqlutil.TxStmt(txn, deleteStmt) + _, err = stmt.ExecContext(ctx, params...) + return err } func (s *queueEDUsStatements) SelectQueueEDUs( diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go index 867ffd44..3e3f60f6 100644 --- a/federationsender/storage/sqlite3/queue_json_table.go +++ b/federationsender/storage/sqlite3/queue_json_table.go @@ -50,7 +50,6 @@ const selectJSONSQL = "" + type queueJSONStatements struct { db *sql.DB - writer sqlutil.TransactionWriter insertJSONStmt *sql.Stmt //deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic //selectJSONStmt *sql.Stmt - prepared at runtime due to variadic @@ -58,8 +57,7 @@ type queueJSONStatements struct { func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) { s = &queueJSONStatements{ - db: db, - writer: sqlutil.NewTransactionWriter(), + db: db, } _, err = db.Exec(queueJSONSchema) if err != nil { @@ -74,18 +72,15 @@ func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) { func (s *queueJSONStatements) InsertQueueJSON( ctx context.Context, txn *sql.Tx, json string, ) (lastid int64, err error) { - err = s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) - res, err := stmt.ExecContext(ctx, json) - if err != nil { - return fmt.Errorf("stmt.QueryContext: %w", err) - } - lastid, err = res.LastInsertId() - if err != nil { - return fmt.Errorf("res.LastInsertId: %w", err) - } - return nil - }) + stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) + res, err := stmt.ExecContext(ctx, json) + if err != nil { + return 0, fmt.Errorf("stmt.QueryContext: %w", err) + } + lastid, err = res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("res.LastInsertId: %w", err) + } return } @@ -103,11 +98,9 @@ func (s *queueJSONStatements) DeleteQueueJSON( iNIDs[k] = v } - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, deleteStmt) - _, err = stmt.ExecContext(ctx, iNIDs...) - return err - }) + stmt := sqlutil.TxStmt(txn, deleteStmt) + _, err = stmt.ExecContext(ctx, iNIDs...) + return err } func (s *queueJSONStatements) SelectQueueJSON( diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go index 538ba3db..70519c9e 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -71,7 +71,6 @@ const selectQueuePDUsServerNamesSQL = "" + type queuePDUsStatements struct { db *sql.DB - writer sqlutil.TransactionWriter insertQueuePDUStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt selectQueuePDUsByTransactionStmt *sql.Stmt @@ -83,8 +82,7 @@ type queuePDUsStatements struct { func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) { s = &queuePDUsStatements{ - db: db, - writer: sqlutil.NewTransactionWriter(), + db: db, } _, err = db.Exec(queuePDUsSchema) if err != nil { @@ -121,16 +119,14 @@ func (s *queuePDUsStatements) InsertQueuePDU( serverName gomatrixserverlib.ServerName, nid int64, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) - _, err := stmt.ExecContext( - ctx, - transactionID, // the transaction ID that we initially attempted - serverName, // destination server name - nid, // JSON blob NID - ) - return err - }) + stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) + _, err := stmt.ExecContext( + ctx, + transactionID, // the transaction ID that we initially attempted + serverName, // destination server name + nid, // JSON blob NID + ) + return err } func (s *queuePDUsStatements) DeleteQueuePDUs( @@ -150,11 +146,9 @@ func (s *queuePDUsStatements) DeleteQueuePDUs( params[k+1] = v } - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, deleteStmt) - _, err := stmt.ExecContext(ctx, params...) - return err - }) + stmt := sqlutil.TxStmt(txn, deleteStmt) + _, err = stmt.ExecContext(ctx, params...) + return err } func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID( diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go index 9a439fad..0710ccca 100644 --- a/federationsender/storage/sqlite3/room_table.go +++ b/federationsender/storage/sqlite3/room_table.go @@ -44,7 +44,6 @@ const updateRoomSQL = "" + type roomStatements struct { db *sql.DB - writer sqlutil.TransactionWriter insertRoomStmt *sql.Stmt selectRoomForUpdateStmt *sql.Stmt updateRoomStmt *sql.Stmt @@ -52,8 +51,7 @@ type roomStatements struct { func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) { s = &roomStatements{ - db: db, - writer: sqlutil.NewTransactionWriter(), + db: db, } _, err = db.Exec(roomSchema) if err != nil { @@ -77,10 +75,8 @@ func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) { func (s *roomStatements) InsertRoom( ctx context.Context, txn *sql.Tx, roomID string, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) - return err - }) + _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) + return err } // selectRoomForUpdate locks the row for the room and returns the last_event_id. @@ -103,9 +99,7 @@ func (s *roomStatements) SelectRoomForUpdate( func (s *roomStatements) UpdateRoom( ctx context.Context, txn *sql.Tx, roomID, lastEventID string, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) - _, err := stmt.ExecContext(ctx, roomID, lastEventID) - return err - }) + stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) + _, err := stmt.ExecContext(ctx, roomID, lastEventID) + return err } diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 41b91871..ba467f02 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -29,7 +29,8 @@ import ( type Database struct { shared.Database sqlutil.PartitionOffsetStatements - db *sql.DB + db *sql.DB + writer sqlutil.Writer } // NewDatabase opens a new database @@ -39,6 +40,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + d.writer = sqlutil.NewExclusiveWriter() joinedHosts, err := NewSQLiteJoinedHostsTable(d.db) if err != nil { return nil, err @@ -65,6 +67,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { } d.Database = shared.Database{ DB: d.db, + Writer: d.writer, FederationSenderJoinedHosts: joinedHosts, FederationSenderQueuePDUs: queuePDUs, FederationSenderQueueEDUs: queueEDUs, @@ -72,7 +75,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, } - if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil { + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err } return &d, nil |