diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/config/config.go | 2 | ||||
-rw-r--r-- | internal/sql.go | 62 |
2 files changed, 64 insertions, 0 deletions
diff --git a/internal/config/config.go b/internal/config/config.go index 2a95069a..a20cc0ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -152,6 +152,8 @@ type Dendrite struct { OutputClientData Topic `yaml:"output_client_data"` // Topic for eduserver/api.OutputTypingEvent events. OutputTypingEvent Topic `yaml:"output_typing_event"` + // Topic for eduserver/api.OutputSendToDeviceEvent events. + OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` } diff --git a/internal/sql.go b/internal/sql.go index d6a5a308..546954bd 100644 --- a/internal/sql.go +++ b/internal/sql.go @@ -1,4 +1,6 @@ // Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,9 +18,12 @@ package internal import ( "database/sql" + "errors" "fmt" "runtime" "time" + + "go.uber.org/atomic" ) // A Transaction is something that can be committed or rolledback. @@ -107,3 +112,60 @@ type DbProperties interface { MaxOpenConns() int ConnMaxLifetime() time.Duration } + +// TransactionWriter allows queuing database writes so that you don't +// contend on database locks in, e.g. SQLite. Only one task will run +// at a time on a given TransactionWriter. +type TransactionWriter struct { + running atomic.Bool + todo chan transactionWriterTask +} + +func NewTransactionWriter() *TransactionWriter { + return &TransactionWriter{ + todo: make(chan transactionWriterTask), + } +} + +// transactionWriterTask represents a specific task. +type transactionWriterTask struct { + db *sql.DB + f func(txn *sql.Tx) error + wait chan error +} + +// Do queues a task to be run by a TransactionWriter. The function +// provided will be ran within a transaction as supplied by the +// database parameter. This will block until the task is finished. +func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error { + if w.todo == nil { + return errors.New("not initialised") + } + if !w.running.Load() { + go w.run() + } + task := transactionWriterTask{ + db: db, + f: f, + wait: make(chan error, 1), + } + w.todo <- task + return <-task.wait +} + +// run processes the tasks for a given transaction writer. Only one +// of these goroutines will run at a time. A transaction will be +// opened using the database object from the task and then this will +// be passed as a parameter to the task function. +func (w *TransactionWriter) run() { + if !w.running.CAS(false, true) { + return + } + defer w.running.Store(false) + for task := range w.todo { + task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { + return task.f(txn) + }) + close(task.wait) + } +} |