diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-19 15:38:27 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-19 15:38:27 +0100 |
commit | b24747b305a0770fdd746655e702aa1c1c049765 (patch) | |
tree | 88d94b762fafb4852421eb243313edbfc96ccfa9 /internal/sqlutil | |
parent | 775b04d776ddc06fdee5ece6a407008f00edb7f2 (diff) |
Transaction writer changes, move roomserver writers (#1285)
* Updated TransactionWriters, moved locks in roomserver, various other tweaks
* Fix redaction deadlocks
* Fix lint issue
* Rename SQLiteTransactionWriter to ExclusiveTransactionWriter
* Fix us not sending transactions through in latest events updater
Diffstat (limited to 'internal/sqlutil')
-rw-r--r-- | internal/sqlutil/sql.go | 71 | ||||
-rw-r--r-- | internal/sqlutil/writer_dummy.go | 22 | ||||
-rw-r--r-- | internal/sqlutil/writer_exclusive.go | 75 |
3 files changed, 100 insertions, 68 deletions
diff --git a/internal/sqlutil/sql.go b/internal/sqlutil/sql.go index 95467c63..002d7718 100644 --- a/internal/sqlutil/sql.go +++ b/internal/sqlutil/sql.go @@ -19,8 +19,6 @@ import ( "errors" "fmt" "runtime" - - "go.uber.org/atomic" ) // ErrUserExists is returned if a username already exists in the database. @@ -52,7 +50,7 @@ func EndTransaction(txn Transaction, succeeded *bool) error { func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil { - return + return fmt.Errorf("sqlutil.WithTransaction.Begin: %w", err) } succeeded := false defer func() { @@ -106,69 +104,6 @@ func SQLiteDriverName() string { return "sqlite3" } -// TransactionWriter allows queuing database writes so that you don't -// contend on database locks in, e.g. SQLite. Only one task will run -// at a time on a given TransactionWriter. -type TransactionWriter struct { - running atomic.Bool - todo chan transactionWriterTask -} - -func NewTransactionWriter() *TransactionWriter { - return &TransactionWriter{ - todo: make(chan transactionWriterTask), - } -} - -// transactionWriterTask represents a specific task. -type transactionWriterTask struct { - db *sql.DB - txn *sql.Tx - f func(txn *sql.Tx) error - wait chan error -} - -// Do queues a task to be run by a TransactionWriter. The function -// provided will be ran within a transaction as supplied by the -// txn parameter if one is supplied, and if not, will take out a -// new transaction from the database supplied in the database -// parameter. Either way, this will block until the task is done. -func (w *TransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { - if w.todo == nil { - return errors.New("not initialised") - } - if !w.running.Load() { - go w.run() - } - task := transactionWriterTask{ - db: db, - txn: txn, - f: f, - wait: make(chan error, 1), - } - w.todo <- task - return <-task.wait -} - -// run processes the tasks for a given transaction writer. Only one -// of these goroutines will run at a time. A transaction will be -// opened using the database object from the task and then this will -// be passed as a parameter to the task function. -func (w *TransactionWriter) run() { - if !w.running.CAS(false, true) { - return - } - defer w.running.Store(false) - for task := range w.todo { - if task.txn != nil { - task.wait <- task.f(task.txn) - } else if task.db != nil { - task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { - return task.f(txn) - }) - } else { - panic("expected database or transaction but got neither") - } - close(task.wait) - } +type TransactionWriter interface { + Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error } diff --git a/internal/sqlutil/writer_dummy.go b/internal/sqlutil/writer_dummy.go new file mode 100644 index 00000000..e6ab81f6 --- /dev/null +++ b/internal/sqlutil/writer_dummy.go @@ -0,0 +1,22 @@ +package sqlutil + +import ( + "database/sql" +) + +type DummyTransactionWriter struct { +} + +func NewDummyTransactionWriter() TransactionWriter { + return &DummyTransactionWriter{} +} + +func (w *DummyTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { + if txn == nil { + return WithTransaction(db, func(txn *sql.Tx) error { + return f(txn) + }) + } else { + return f(txn) + } +} diff --git a/internal/sqlutil/writer_exclusive.go b/internal/sqlutil/writer_exclusive.go new file mode 100644 index 00000000..2e3666ae --- /dev/null +++ b/internal/sqlutil/writer_exclusive.go @@ -0,0 +1,75 @@ +package sqlutil + +import ( + "database/sql" + "errors" + + "go.uber.org/atomic" +) + +// ExclusiveTransactionWriter allows queuing database writes so that you don't +// contend on database locks in, e.g. SQLite. Only one task will run +// at a time on a given ExclusiveTransactionWriter. +type ExclusiveTransactionWriter struct { + running atomic.Bool + todo chan transactionWriterTask +} + +func NewTransactionWriter() TransactionWriter { + return &ExclusiveTransactionWriter{ + todo: make(chan transactionWriterTask), + } +} + +// transactionWriterTask represents a specific task. +type transactionWriterTask struct { + db *sql.DB + txn *sql.Tx + f func(txn *sql.Tx) error + wait chan error +} + +// Do queues a task to be run by a TransactionWriter. The function +// provided will be ran within a transaction as supplied by the +// txn parameter if one is supplied, and if not, will take out a +// new transaction from the database supplied in the database +// parameter. Either way, this will block until the task is done. +func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error { + if w.todo == nil { + return errors.New("not initialised") + } + if !w.running.Load() { + go w.run() + } + task := transactionWriterTask{ + db: db, + txn: txn, + f: f, + wait: make(chan error, 1), + } + w.todo <- task + return <-task.wait +} + +// run processes the tasks for a given transaction writer. Only one +// of these goroutines will run at a time. A transaction will be +// opened using the database object from the task and then this will +// be passed as a parameter to the task function. +func (w *ExclusiveTransactionWriter) run() { + if !w.running.CAS(false, true) { + return + } + defer w.running.Store(false) + for task := range w.todo { + if task.txn != nil { + task.wait <- task.f(task.txn) + } else if task.db != nil { + task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { + return task.f(txn) + }) + } else { + panic("expected database or transaction but got neither") + } + close(task.wait) + } +} |