aboutsummaryrefslogtreecommitdiff
path: root/internal/sqlutil/writer_exclusive.go
blob: 91dd77e4de391bfcd7121b75e0c2887610e014b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package sqlutil

import (
	"database/sql"
	"errors"

	"go.uber.org/atomic"
)

// ExclusiveWriter implements sqlutil.Writer.
// ExclusiveWriter allows queuing database writes so that you don't
// contend on database locks in, e.g. SQLite. Only one task will run
// at a time on a given ExclusiveWriter.
type ExclusiveWriter struct {
	running atomic.Bool
	todo    chan transactionWriterTask
}

func NewExclusiveWriter() Writer {
	return &ExclusiveWriter{
		todo: make(chan transactionWriterTask),
	}
}

// transactionWriterTask represents a specific task.
type transactionWriterTask struct {
	db   *sql.DB
	txn  *sql.Tx
	f    func(txn *sql.Tx) error
	wait chan error
}

// Do queues a task to be run by a TransactionWriter. The function
// provided will be ran within a transaction as supplied by the
// txn parameter if one is supplied, and if not, will take out a
// new transaction from the database supplied in the database
// parameter. Either way, this will block until the task is done.
func (w *ExclusiveWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
	if w.todo == nil {
		return errors.New("not initialised")
	}
	if !w.running.Load() {
		go w.run()
	}
	task := transactionWriterTask{
		db:   db,
		txn:  txn,
		f:    f,
		wait: make(chan error, 1),
	}
	w.todo <- task
	return <-task.wait
}

// run processes the tasks for a given transaction writer. Only one
// of these goroutines will run at a time. A transaction will be
// opened using the database object from the task and then this will
// be passed as a parameter to the task function.
func (w *ExclusiveWriter) run() {
	if !w.running.CAS(false, true) {
		return
	}
	if tracingEnabled {
		gid := goid()
		goidToWriter.Store(gid, w)
		defer goidToWriter.Delete(gid)
	}

	defer w.running.Store(false)
	for task := range w.todo {
		if task.db != nil && task.txn != nil {
			task.wait <- task.f(task.txn)
		} else if task.db != nil && task.txn == nil {
			task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
				return task.f(txn)
			})
		} else {
			task.wait <- task.f(nil)
		}
		close(task.wait)
	}
}