diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-21 10:42:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-21 10:42:08 +0100 |
commit | 9d53351dc20283103bf2eec6b92831033d06c5a8 (patch) | |
tree | 653cf0ddca3f777bcdba188187fb78fe39ae2b02 /internal | |
parent | 5aaf32bbed4d704d5a22ad7dff79f7a68002a213 (diff) |
Component-wide TransactionWriters (#1290)
* Offset updates take place using TransactionWriter
* Refactor TransactionWriter in current state server
* Refactor TransactionWriter in federation sender
* Refactor TransactionWriter in key server
* Refactor TransactionWriter in media API
* Refactor TransactionWriter in server key API
* Refactor TransactionWriter in sync API
* Refactor TransactionWriter in user API
* Fix deadlocking Sync API tests
* Un-deadlock device database
* Fix appservice API
* Rename TransactionWriters to Writers
* Move writers up a layer in sync API
* Document sqlutil.Writer interface
* Add note to Writer documentation
Diffstat (limited to 'internal')
-rw-r--r-- | internal/sqlutil/partition_offset_table.go | 13 | ||||
-rw-r--r-- | internal/sqlutil/sql.go | 4 | ||||
-rw-r--r-- | internal/sqlutil/writer.go | 46 | ||||
-rw-r--r-- | internal/sqlutil/writer_dummy.go | 16 | ||||
-rw-r--r-- | internal/sqlutil/writer_exclusive.go | 21 |
5 files changed, 78 insertions, 22 deletions
diff --git a/internal/sqlutil/partition_offset_table.go b/internal/sqlutil/partition_offset_table.go index 34882902..be079442 100644 --- a/internal/sqlutil/partition_offset_table.go +++ b/internal/sqlutil/partition_offset_table.go @@ -53,6 +53,8 @@ const upsertPartitionOffsetsSQL = "" + // PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. type PartitionOffsetStatements struct { + db *sql.DB + writer Writer selectPartitionOffsetsStmt *sql.Stmt upsertPartitionOffsetStmt *sql.Stmt } @@ -60,7 +62,9 @@ type PartitionOffsetStatements struct { // Prepare converts the raw SQL statements into prepared statements. // Takes a prefix to prepend to the table name used to store the partition offsets. // This allows multiple components to share the same database schema. -func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) { +func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) { + s.db = db + s.writer = writer _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) if err != nil { return @@ -121,6 +125,9 @@ func (s *PartitionOffsetStatements) selectPartitionOffsets( func (s *PartitionOffsetStatements) upsertPartitionOffset( ctx context.Context, topic string, partition int32, offset int64, ) error { - _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) - return err + return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { + stmt := TxStmt(txn, s.upsertPartitionOffsetStmt) + _, err := stmt.ExecContext(ctx, topic, partition, offset) + return err + }) } diff --git a/internal/sqlutil/sql.go b/internal/sqlutil/sql.go index 002d7718..d296c418 100644 --- a/internal/sqlutil/sql.go +++ b/internal/sqlutil/sql.go @@ -103,7 +103,3 @@ func SQLiteDriverName() string { } return "sqlite3" } - -type TransactionWriter interface { - Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error -} diff --git a/internal/sqlutil/writer.go b/internal/sqlutil/writer.go new file mode 100644 index 00000000..5d93fef4 --- /dev/null +++ b/internal/sqlutil/writer.go @@ -0,0 +1,46 @@ +package sqlutil + +import "database/sql" + +// The Writer interface is designed to solve the problem of how +// to handle database writes for database engines that don't allow +// concurrent writes, e.g. SQLite. +// +// The interface has a single Do function which takes an optional +// database parameter, an optional transaction parameter and a +// required function parameter. The Writer will call the function +// provided when it is safe to do so, optionally providing a +// transaction to use. +// +// Depending on the combination of parameters provided, the Writer +// will behave in one of three ways: +// +// 1. `db` provided, `txn` provided: +// +// The Writer will call f() when it is safe to do so. The supplied +// "txn" will ALWAYS be passed through to f(). Use this when you +// already have a transaction open. +// +// 2. `db` provided, `txn` not provided (nil): +// +// The Writer will open a new transaction on the provided database +// and then will call f() when it is safe to do so. The new +// transaction will ALWAYS be passed through to f(). Use this if +// you plan to perform more than one SQL query within f(). +// +// 3. `db` not provided (nil), `txn` not provided (nil): +// +// The Writer will call f() when it is safe to do so, but will +// not make any attempt to open a new database transaction or to +// pass through an existing one. The "txn" parameter within f() +// will ALWAYS be nil in this mode. This is useful if you just +// want to perform a single query on an already-prepared statement +// without the overhead of opening a new transaction to do it in. +// +// You MUST take particular care not to call Do() from within f() +// on the same Writer, or it will likely result in a deadlock. +type Writer interface { + // Queue up one or more database write operations within the + // provided function to be executed when it is safe to do so. + Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error +} diff --git a/internal/sqlutil/writer_dummy.go b/internal/sqlutil/writer_dummy.go index e6ab81f6..f426c2bc 100644 --- a/internal/sqlutil/writer_dummy.go +++ b/internal/sqlutil/writer_dummy.go @@ -4,15 +4,21 @@ import ( "database/sql" ) -type DummyTransactionWriter struct { +// DummyWriter implements sqlutil.Writer. +// The DummyWriter is designed to allow reuse of the sqlutil.Writer +// interface but, unlike ExclusiveWriter, it will not guarantee +// writer exclusivity. This is fine in PostgreSQL where overlapping +// transactions and writes are acceptable. +type DummyWriter struct { } -func NewDummyTransactionWriter() TransactionWriter { - return &DummyTransactionWriter{} +// NewDummyWriter returns a new dummy writer. +func NewDummyWriter() Writer { + return &DummyWriter{} } -func (w *DummyTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { - if txn == nil { +func (w *DummyWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { + if db != nil && txn == nil { return WithTransaction(db, func(txn *sql.Tx) error { return f(txn) }) diff --git a/internal/sqlutil/writer_exclusive.go b/internal/sqlutil/writer_exclusive.go index 2e3666ae..002bc32c 100644 --- a/internal/sqlutil/writer_exclusive.go +++ b/internal/sqlutil/writer_exclusive.go @@ -7,16 +7,17 @@ import ( "go.uber.org/atomic" ) -// ExclusiveTransactionWriter allows queuing database writes so that you don't +// ExclusiveWriter implements sqlutil.Writer. +// ExclusiveWriter allows queuing database writes so that you don't // contend on database locks in, e.g. SQLite. Only one task will run -// at a time on a given ExclusiveTransactionWriter. -type ExclusiveTransactionWriter struct { +// at a time on a given ExclusiveWriter. +type ExclusiveWriter struct { running atomic.Bool todo chan transactionWriterTask } -func NewTransactionWriter() TransactionWriter { - return &ExclusiveTransactionWriter{ +func NewExclusiveWriter() Writer { + return &ExclusiveWriter{ todo: make(chan transactionWriterTask), } } @@ -34,7 +35,7 @@ type transactionWriterTask struct { // txn parameter if one is supplied, and if not, will take out a // new transaction from the database supplied in the database // parameter. Either way, this will block until the task is done. -func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { +func (w *ExclusiveWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { if w.todo == nil { return errors.New("not initialised") } @@ -55,20 +56,20 @@ func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql // of these goroutines will run at a time. A transaction will be // opened using the database object from the task and then this will // be passed as a parameter to the task function. -func (w *ExclusiveTransactionWriter) run() { +func (w *ExclusiveWriter) run() { if !w.running.CAS(false, true) { return } defer w.running.Store(false) for task := range w.todo { - if task.txn != nil { + if task.db != nil && task.txn != nil { task.wait <- task.f(task.txn) - } else if task.db != nil { + } else if task.db != nil && task.txn == nil { task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { return task.f(txn) }) } else { - panic("expected database or transaction but got neither") + task.wait <- task.f(nil) } close(task.wait) } |