aboutsummaryrefslogtreecommitdiff
path: root/internal/sql.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/sql.go')
-rw-r--r--internal/sql.go62
1 files changed, 62 insertions, 0 deletions
diff --git a/internal/sql.go b/internal/sql.go
index d6a5a308..546954bd 100644
--- a/internal/sql.go
+++ b/internal/sql.go
@@ -1,4 +1,6 @@
// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,9 +18,12 @@ package internal
import (
"database/sql"
+ "errors"
"fmt"
"runtime"
"time"
+
+ "go.uber.org/atomic"
)
// A Transaction is something that can be committed or rolledback.
@@ -107,3 +112,60 @@ type DbProperties interface {
MaxOpenConns() int
ConnMaxLifetime() time.Duration
}
+
+// TransactionWriter allows queuing database writes so that you don't
+// contend on database locks in, e.g. SQLite. Only one task will run
+// at a time on a given TransactionWriter.
+type TransactionWriter struct {
+ running atomic.Bool
+ todo chan transactionWriterTask
+}
+
+func NewTransactionWriter() *TransactionWriter {
+ return &TransactionWriter{
+ todo: make(chan transactionWriterTask),
+ }
+}
+
+// transactionWriterTask represents a specific task.
+type transactionWriterTask struct {
+ db *sql.DB
+ f func(txn *sql.Tx) error
+ wait chan error
+}
+
+// Do queues a task to be run by a TransactionWriter. The function
+// provided will be ran within a transaction as supplied by the
+// database parameter. This will block until the task is finished.
+func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error {
+ if w.todo == nil {
+ return errors.New("not initialised")
+ }
+ if !w.running.Load() {
+ go w.run()
+ }
+ task := transactionWriterTask{
+ db: db,
+ f: f,
+ wait: make(chan error, 1),
+ }
+ w.todo <- task
+ return <-task.wait
+}
+
+// run processes the tasks for a given transaction writer. Only one
+// of these goroutines will run at a time. A transaction will be
+// opened using the database object from the task and then this will
+// be passed as a parameter to the task function.
+func (w *TransactionWriter) run() {
+ if !w.running.CAS(false, true) {
+ return
+ }
+ defer w.running.Store(false)
+ for task := range w.todo {
+ task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
+ return task.f(txn)
+ })
+ close(task.wait)
+ }
+}