aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-21 10:42:08 +0100
committerGitHub <noreply@github.com>2020-08-21 10:42:08 +0100
commit9d53351dc20283103bf2eec6b92831033d06c5a8 (patch)
tree653cf0ddca3f777bcdba188187fb78fe39ae2b02 /internal
parent5aaf32bbed4d704d5a22ad7dff79f7a68002a213 (diff)
Component-wide TransactionWriters (#1290)
* Offset updates take place using TransactionWriter * Refactor TransactionWriter in current state server * Refactor TransactionWriter in federation sender * Refactor TransactionWriter in key server * Refactor TransactionWriter in media API * Refactor TransactionWriter in server key API * Refactor TransactionWriter in sync API * Refactor TransactionWriter in user API * Fix deadlocking Sync API tests * Un-deadlock device database * Fix appservice API * Rename TransactionWriters to Writers * Move writers up a layer in sync API * Document sqlutil.Writer interface * Add note to Writer documentation
Diffstat (limited to 'internal')
-rw-r--r--internal/sqlutil/partition_offset_table.go13
-rw-r--r--internal/sqlutil/sql.go4
-rw-r--r--internal/sqlutil/writer.go46
-rw-r--r--internal/sqlutil/writer_dummy.go16
-rw-r--r--internal/sqlutil/writer_exclusive.go21
5 files changed, 78 insertions, 22 deletions
diff --git a/internal/sqlutil/partition_offset_table.go b/internal/sqlutil/partition_offset_table.go
index 34882902..be079442 100644
--- a/internal/sqlutil/partition_offset_table.go
+++ b/internal/sqlutil/partition_offset_table.go
@@ -53,6 +53,8 @@ const upsertPartitionOffsetsSQL = "" +
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
type PartitionOffsetStatements struct {
+ db *sql.DB
+ writer Writer
selectPartitionOffsetsStmt *sql.Stmt
upsertPartitionOffsetStmt *sql.Stmt
}
@@ -60,7 +62,9 @@ type PartitionOffsetStatements struct {
// Prepare converts the raw SQL statements into prepared statements.
// Takes a prefix to prepend to the table name used to store the partition offsets.
// This allows multiple components to share the same database schema.
-func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) {
+func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) {
+ s.db = db
+ s.writer = writer
_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
if err != nil {
return
@@ -121,6 +125,9 @@ func (s *PartitionOffsetStatements) selectPartitionOffsets(
func (s *PartitionOffsetStatements) upsertPartitionOffset(
ctx context.Context, topic string, partition int32, offset int64,
) error {
- _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset)
- return err
+ return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
+ stmt := TxStmt(txn, s.upsertPartitionOffsetStmt)
+ _, err := stmt.ExecContext(ctx, topic, partition, offset)
+ return err
+ })
}
diff --git a/internal/sqlutil/sql.go b/internal/sqlutil/sql.go
index 002d7718..d296c418 100644
--- a/internal/sqlutil/sql.go
+++ b/internal/sqlutil/sql.go
@@ -103,7 +103,3 @@ func SQLiteDriverName() string {
}
return "sqlite3"
}
-
-type TransactionWriter interface {
- Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error
-}
diff --git a/internal/sqlutil/writer.go b/internal/sqlutil/writer.go
new file mode 100644
index 00000000..5d93fef4
--- /dev/null
+++ b/internal/sqlutil/writer.go
@@ -0,0 +1,46 @@
+package sqlutil
+
+import "database/sql"
+
+// The Writer interface is designed to solve the problem of how
+// to handle database writes for database engines that don't allow
+// concurrent writes, e.g. SQLite.
+//
+// The interface has a single Do function which takes an optional
+// database parameter, an optional transaction parameter and a
+// required function parameter. The Writer will call the function
+// provided when it is safe to do so, optionally providing a
+// transaction to use.
+//
+// Depending on the combination of parameters provided, the Writer
+// will behave in one of three ways:
+//
+// 1. `db` provided, `txn` provided:
+//
+// The Writer will call f() when it is safe to do so. The supplied
+// "txn" will ALWAYS be passed through to f(). Use this when you
+// already have a transaction open.
+//
+// 2. `db` provided, `txn` not provided (nil):
+//
+// The Writer will open a new transaction on the provided database
+// and then will call f() when it is safe to do so. The new
+// transaction will ALWAYS be passed through to f(). Use this if
+// you plan to perform more than one SQL query within f().
+//
+// 3. `db` not provided (nil), `txn` not provided (nil):
+//
+// The Writer will call f() when it is safe to do so, but will
+// not make any attempt to open a new database transaction or to
+// pass through an existing one. The "txn" parameter within f()
+// will ALWAYS be nil in this mode. This is useful if you just
+// want to perform a single query on an already-prepared statement
+// without the overhead of opening a new transaction to do it in.
+//
+// You MUST take particular care not to call Do() from within f()
+// on the same Writer, or it will likely result in a deadlock.
+type Writer interface {
+ // Queue up one or more database write operations within the
+ // provided function to be executed when it is safe to do so.
+ Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error
+}
diff --git a/internal/sqlutil/writer_dummy.go b/internal/sqlutil/writer_dummy.go
index e6ab81f6..f426c2bc 100644
--- a/internal/sqlutil/writer_dummy.go
+++ b/internal/sqlutil/writer_dummy.go
@@ -4,15 +4,21 @@ import (
"database/sql"
)
-type DummyTransactionWriter struct {
+// DummyWriter implements sqlutil.Writer.
+// The DummyWriter is designed to allow reuse of the sqlutil.Writer
+// interface but, unlike ExclusiveWriter, it will not guarantee
+// writer exclusivity. This is fine in PostgreSQL where overlapping
+// transactions and writes are acceptable.
+type DummyWriter struct {
}
-func NewDummyTransactionWriter() TransactionWriter {
- return &DummyTransactionWriter{}
+// NewDummyWriter returns a new dummy writer.
+func NewDummyWriter() Writer {
+ return &DummyWriter{}
}
-func (w *DummyTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
- if txn == nil {
+func (w *DummyWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
+ if db != nil && txn == nil {
return WithTransaction(db, func(txn *sql.Tx) error {
return f(txn)
})
diff --git a/internal/sqlutil/writer_exclusive.go b/internal/sqlutil/writer_exclusive.go
index 2e3666ae..002bc32c 100644
--- a/internal/sqlutil/writer_exclusive.go
+++ b/internal/sqlutil/writer_exclusive.go
@@ -7,16 +7,17 @@ import (
"go.uber.org/atomic"
)
-// ExclusiveTransactionWriter allows queuing database writes so that you don't
+// ExclusiveWriter implements sqlutil.Writer.
+// ExclusiveWriter allows queuing database writes so that you don't
// contend on database locks in, e.g. SQLite. Only one task will run
-// at a time on a given ExclusiveTransactionWriter.
-type ExclusiveTransactionWriter struct {
+// at a time on a given ExclusiveWriter.
+type ExclusiveWriter struct {
running atomic.Bool
todo chan transactionWriterTask
}
-func NewTransactionWriter() TransactionWriter {
- return &ExclusiveTransactionWriter{
+func NewExclusiveWriter() Writer {
+ return &ExclusiveWriter{
todo: make(chan transactionWriterTask),
}
}
@@ -34,7 +35,7 @@ type transactionWriterTask struct {
// txn parameter if one is supplied, and if not, will take out a
// new transaction from the database supplied in the database
// parameter. Either way, this will block until the task is done.
-func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
+func (w *ExclusiveWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
if w.todo == nil {
return errors.New("not initialised")
}
@@ -55,20 +56,20 @@ func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql
// of these goroutines will run at a time. A transaction will be
// opened using the database object from the task and then this will
// be passed as a parameter to the task function.
-func (w *ExclusiveTransactionWriter) run() {
+func (w *ExclusiveWriter) run() {
if !w.running.CAS(false, true) {
return
}
defer w.running.Store(false)
for task := range w.todo {
- if task.txn != nil {
+ if task.db != nil && task.txn != nil {
task.wait <- task.f(task.txn)
- } else if task.db != nil {
+ } else if task.db != nil && task.txn == nil {
task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
return task.f(txn)
})
} else {
- panic("expected database or transaction but got neither")
+ task.wait <- task.f(nil)
}
close(task.wait)
}