aboutsummaryrefslogtreecommitdiff
path: root/internal/sqlutil/sql.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-19 15:38:27 +0100
committerGitHub <noreply@github.com>2020-08-19 15:38:27 +0100
commitb24747b305a0770fdd746655e702aa1c1c049765 (patch)
tree88d94b762fafb4852421eb243313edbfc96ccfa9 /internal/sqlutil/sql.go
parent775b04d776ddc06fdee5ece6a407008f00edb7f2 (diff)
Transaction writer changes, move roomserver writers (#1285)
* Updated TransactionWriters, moved locks in roomserver, various other tweaks * Fix redaction deadlocks * Fix lint issue * Rename SQLiteTransactionWriter to ExclusiveTransactionWriter * Fix us not sending transactions through in latest events updater
Diffstat (limited to 'internal/sqlutil/sql.go')
-rw-r--r--internal/sqlutil/sql.go71
1 files changed, 3 insertions, 68 deletions
diff --git a/internal/sqlutil/sql.go b/internal/sqlutil/sql.go
index 95467c63..002d7718 100644
--- a/internal/sqlutil/sql.go
+++ b/internal/sqlutil/sql.go
@@ -19,8 +19,6 @@ import (
"errors"
"fmt"
"runtime"
-
- "go.uber.org/atomic"
)
// ErrUserExists is returned if a username already exists in the database.
@@ -52,7 +50,7 @@ func EndTransaction(txn Transaction, succeeded *bool) error {
func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
txn, err := db.Begin()
if err != nil {
- return
+ return fmt.Errorf("sqlutil.WithTransaction.Begin: %w", err)
}
succeeded := false
defer func() {
@@ -106,69 +104,6 @@ func SQLiteDriverName() string {
return "sqlite3"
}
-// TransactionWriter allows queuing database writes so that you don't
-// contend on database locks in, e.g. SQLite. Only one task will run
-// at a time on a given TransactionWriter.
-type TransactionWriter struct {
- running atomic.Bool
- todo chan transactionWriterTask
-}
-
-func NewTransactionWriter() *TransactionWriter {
- return &TransactionWriter{
- todo: make(chan transactionWriterTask),
- }
-}
-
-// transactionWriterTask represents a specific task.
-type transactionWriterTask struct {
- db *sql.DB
- txn *sql.Tx
- f func(txn *sql.Tx) error
- wait chan error
-}
-
-// Do queues a task to be run by a TransactionWriter. The function
-// provided will be ran within a transaction as supplied by the
-// txn parameter if one is supplied, and if not, will take out a
-// new transaction from the database supplied in the database
-// parameter. Either way, this will block until the task is done.
-func (w *TransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
- if w.todo == nil {
- return errors.New("not initialised")
- }
- if !w.running.Load() {
- go w.run()
- }
- task := transactionWriterTask{
- db: db,
- txn: txn,
- f: f,
- wait: make(chan error, 1),
- }
- w.todo <- task
- return <-task.wait
-}
-
-// run processes the tasks for a given transaction writer. Only one
-// of these goroutines will run at a time. A transaction will be
-// opened using the database object from the task and then this will
-// be passed as a parameter to the task function.
-func (w *TransactionWriter) run() {
- if !w.running.CAS(false, true) {
- return
- }
- defer w.running.Store(false)
- for task := range w.todo {
- if task.txn != nil {
- task.wait <- task.f(task.txn)
- } else if task.db != nil {
- task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
- return task.f(txn)
- })
- } else {
- panic("expected database or transaction but got neither")
- }
- close(task.wait)
- }
+type TransactionWriter interface {
+ Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error
}