aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-06-01 17:50:19 +0100
committerGitHub <noreply@github.com>2020-06-01 17:50:19 +0100
commita5d822004dd93d6f6a7ed73371aeb4bfb163b5ba (patch)
tree76b10a79094415e45cfe5f2db9dfdf58cf0d2837 /internal
parent1f43c24f8602dfbc95620e9d34fac78a7b449c11 (diff)
Send-to-device support (#1072)
* Groundwork for send-to-device messaging * Update sample config * Add unstable routing for now * Send to device consumer in sync API * Start the send-to-device consumer * fix indentation in dendrite-config.yaml * Create send-to-device database tables, other tweaks * Add some logic for send-to-device messages, add them into sync stream * Handle incoming send-to-device messages, count them with EDU stream pos * Undo changes to test * pq.Array * Fix sync * Logging * Fix a couple of transaction things, fix client API * Add send-to-device test, hopefully fix bugs * Comments * Refactor a bit * Fix schema * Fix queries * Debug logging * Fix storing and retrieving of send-to-device messages * Try to avoid database locks * Update sync position * Use latest sync position * Jiggle about sync a bit * Fix tests * Break out the retrieval from the update/delete behaviour * Comments * nolint on getResponseWithPDUsForCompleteSync * Try to line up sync tokens again * Implement wildcard * Add all send-to-device tests to whitelist, what could possibly go wrong? * Only care about wildcard when targeted locally * Deduplicate transactions * Handle tokens properly, return immediately if waiting send-to-device messages * Fix sync * Update sytest-whitelist * Fix copyright notice (need to do more of this) * Comments, copyrights * Return errors from Do, fix dendritejs * Review comments * Comments * Constructor for TransactionWriter * defletions * Update gomatrixserverlib, sytest-blacklist
Diffstat (limited to 'internal')
-rw-r--r--internal/config/config.go2
-rw-r--r--internal/sql.go62
2 files changed, 64 insertions, 0 deletions
diff --git a/internal/config/config.go b/internal/config/config.go
index 2a95069a..a20cc0ea 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -152,6 +152,8 @@ type Dendrite struct {
OutputClientData Topic `yaml:"output_client_data"`
// Topic for eduserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"`
+ // Topic for eduserver/api.OutputSendToDeviceEvent events.
+ OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"`
// Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"`
}
diff --git a/internal/sql.go b/internal/sql.go
index d6a5a308..546954bd 100644
--- a/internal/sql.go
+++ b/internal/sql.go
@@ -1,4 +1,6 @@
// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,9 +18,12 @@ package internal
import (
"database/sql"
+ "errors"
"fmt"
"runtime"
"time"
+
+ "go.uber.org/atomic"
)
// A Transaction is something that can be committed or rolledback.
@@ -107,3 +112,60 @@ type DbProperties interface {
MaxOpenConns() int
ConnMaxLifetime() time.Duration
}
+
+// TransactionWriter allows queuing database writes so that you don't
+// contend on database locks in, e.g. SQLite. Only one task will run
+// at a time on a given TransactionWriter.
+type TransactionWriter struct {
+ running atomic.Bool
+ todo chan transactionWriterTask
+}
+
+func NewTransactionWriter() *TransactionWriter {
+ return &TransactionWriter{
+ todo: make(chan transactionWriterTask),
+ }
+}
+
+// transactionWriterTask represents a specific task.
+type transactionWriterTask struct {
+ db *sql.DB
+ f func(txn *sql.Tx) error
+ wait chan error
+}
+
+// Do queues a task to be run by a TransactionWriter. The function
+// provided will be ran within a transaction as supplied by the
+// database parameter. This will block until the task is finished.
+func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error {
+ if w.todo == nil {
+ return errors.New("not initialised")
+ }
+ if !w.running.Load() {
+ go w.run()
+ }
+ task := transactionWriterTask{
+ db: db,
+ f: f,
+ wait: make(chan error, 1),
+ }
+ w.todo <- task
+ return <-task.wait
+}
+
+// run processes the tasks for a given transaction writer. Only one
+// of these goroutines will run at a time. A transaction will be
+// opened using the database object from the task and then this will
+// be passed as a parameter to the task function.
+func (w *TransactionWriter) run() {
+ if !w.running.CAS(false, true) {
+ return
+ }
+ defer w.running.Store(false)
+ for task := range w.todo {
+ task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
+ return task.f(txn)
+ })
+ close(task.wait)
+ }
+}