aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-01 11:46:38 +0100
committerGitHub <noreply@github.com>2020-07-01 11:46:38 +0100
commit42dd96242574866378bb95d92bc0c7fdf3dbabf6 (patch)
tree78f6afb8160006ec5f573f19e1f6653c5187373b /federationsender
parent6f49758b90d655d9c2bb9170da2ea1d0a2bdd664 (diff)
Persistent federation sender queues (PDUs) (#1173)
* Initial work on persistent queues * Update index for event ID and server name * Put things into database (postgres for now) * Duplicate postgres code into sqlite for now just to stop build errors, will fix SQLite soon * Fix table name * Fix index * Fix table name * Use RETURNING because LastInsertID is not supported by postgres * Use functions * Marshal headered event * Don't error on now rows * Don't block if there are PDUs waiting * Try to tidy up JSON * Debug logging * Fix query, use transactions in postgres * Clean up * Rehydrate more opportunistically * Fix SQLite * remove unused types * Review comments * Shuffle things around a bit * Clean up transaction properly * Don't send empty transactions * Reduce unnecessary retries * Count PDUs to make more resilient * Don't stop when there is work to be done * Try to limit wakeups * well this is tedious * Fix race in incomplete transactions * Thread safety on transaction ID/count
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/federationsender.go3
-rw-r--r--federationsender/queue/destinationqueue.go267
-rw-r--r--federationsender/queue/queue.go21
-rw-r--r--federationsender/storage/interface.go5
-rw-r--r--federationsender/storage/postgres/queue_json_table.go111
-rw-r--r--federationsender/storage/postgres/queue_pdus_table.go169
-rw-r--r--federationsender/storage/postgres/storage.go135
-rw-r--r--federationsender/storage/sqlite3/queue_json_table.go132
-rw-r--r--federationsender/storage/sqlite3/queue_pdus_table.go167
-rw-r--r--federationsender/storage/sqlite3/storage.go135
10 files changed, 1045 insertions, 100 deletions
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index 10ac51c8..acf52414 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -50,7 +50,8 @@ func NewInternalAPI(
statistics := &types.Statistics{}
queues := queue.NewOutgoingQueues(
- base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{
+ federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, statistics,
+ &queue.SigningInfo{
KeyID: base.Cfg.Matrix.KeyID,
PrivateKey: base.Cfg.Matrix.PrivateKey,
ServerName: base.Cfg.Matrix.ServerName,
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 4449f9e6..a736b385 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -18,8 +18,10 @@ import (
"context"
"encoding/json"
"fmt"
+ "sync"
"time"
+ "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
@@ -29,11 +31,14 @@ import (
"go.uber.org/atomic"
)
+const maxPDUsPerTransaction = 50
+
// destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
+ db storage.Database
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
@@ -42,13 +47,15 @@ type destinationQueue struct {
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
- incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
- incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
- lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID
- pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
+ incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
+ transactionIDMutex sync.Mutex // protects transactionID
+ transactionID gomatrixserverlib.TransactionID // last transaction ID
+ transactionCount atomic.Int32 // how many events in this transaction so far
+ pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
+ wakeServerCh chan bool // interrupts idle wait
retryServerCh chan bool // interrupts backoff
}
@@ -79,15 +86,47 @@ func (oq *destinationQueue) retry() {
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
-func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
+func (oq *destinationQueue) sendEvent(nid int64) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
return
}
+ // Create a transaction ID. We'll either do this if we don't have
+ // one made up yet, or if we've exceeded the number of maximum
+ // events allowed in a single tranaction. We'll reset the counter
+ // when we do.
+ oq.transactionIDMutex.Lock()
+ if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
+ now := gomatrixserverlib.AsTimestamp(time.Now())
+ oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
+ oq.transactionCount.Store(0)
+ }
+ oq.transactionIDMutex.Unlock()
+ // Create a database entry that associates the given PDU NID with
+ // this destination queue. We'll then be able to retrieve the PDU
+ // later.
+ if err := oq.db.AssociatePDUWithDestination(
+ context.TODO(),
+ oq.transactionID, // the current transaction ID
+ oq.destination, // the destination server name
+ []int64{nid}, // NID from federationsender_queue_json table
+ ); err != nil {
+ log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination)
+ return
+ }
+ // We've successfully added a PDU to the transaction so increase
+ // the counter.
+ oq.transactionCount.Add(1)
+ // If the queue isn't running at this point then start it.
if !oq.running.Load() {
go oq.backgroundSend()
}
- oq.incomingPDUs <- ev
+ // Signal that we've sent a new PDU. This will cause the queue to
+ // wake up if it's asleep. The return to the Add function will only
+ // be 1 if the previous value was 0, e.g. nothing was waiting before.
+ if oq.pendingPDUs.Add(1) == 1 {
+ oq.wakeServerCh <- true
+ }
}
// sendEDU adds the EDU event to the pending queue for the destination.
@@ -129,56 +168,48 @@ func (oq *destinationQueue) backgroundSend() {
defer oq.running.Store(false)
for {
- // Wait either for incoming events, or until we hit an
- // idle timeout.
- select {
- case pdu := <-oq.incomingPDUs:
- // Ordering of PDUs is important so we add them to the end
- // of the queue and they will all be added to transactions
- // in order.
- oq.pendingPDUs = append(oq.pendingPDUs, pdu)
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingPDUs) > 0 {
- oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs)
- }
- case edu := <-oq.incomingEDUs:
- // Likewise for EDUs, although we should probably not try
- // too hard with some EDUs (like typing notifications) after
- // a certain amount of time has passed.
- // TODO: think about EDU expiry some more
- oq.pendingEDUs = append(oq.pendingEDUs, edu)
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingEDUs) > 0 {
- oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
- }
- case invite := <-oq.incomingInvites:
- // There's no strict ordering requirement for invites like
- // there is for transactions, so we put the invite onto the
- // front of the queue. This means that if an invite that is
- // stuck failing already, that it won't block our new invite
- // from being sent.
- oq.pendingInvites = append(
- []*gomatrixserverlib.InviteV2Request{invite},
- oq.pendingInvites...,
- )
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingInvites) > 0 {
- oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
+ // If we have nothing to do then wait either for incoming events, or
+ // until we hit an idle timeout.
+ if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 {
+ select {
+ case <-oq.wakeServerCh:
+ // We were woken up because there are new PDUs waiting in the
+ // database.
+ case edu := <-oq.incomingEDUs:
+ // EDUs are handled in-memory for now. We will try to keep
+ // the ordering intact.
+ // TODO: Certain EDU types need persistence, e.g. send-to-device
+ oq.pendingEDUs = append(oq.pendingEDUs, edu)
+ // If there are any more things waiting in the channel queue
+ // then read them. This is safe because we guarantee only
+ // having one goroutine per destination queue, so the channel
+ // isn't being consumed anywhere else.
+ for len(oq.incomingEDUs) > 0 {
+ oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
+ }
+ case invite := <-oq.incomingInvites:
+ // There's no strict ordering requirement for invites like
+ // there is for transactions, so we put the invite onto the
+ // front of the queue. This means that if an invite that is
+ // stuck failing already, that it won't block our new invite
+ // from being sent.
+ oq.pendingInvites = append(
+ []*gomatrixserverlib.InviteV2Request{invite},
+ oq.pendingInvites...,
+ )
+ // If there are any more things waiting in the channel queue
+ // then read them. This is safe because we guarantee only
+ // having one goroutine per destination queue, so the channel
+ // isn't being consumed anywhere else.
+ for len(oq.incomingInvites) > 0 {
+ oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
+ }
+ case <-time.After(time.Second * 30):
+ // The worker is idle so stop the goroutine. It'll get
+ // restarted automatically the next time we have an event to
+ // send.
+ return
}
- case <-time.After(time.Second * 30):
- // The worker is idle so stop the goroutine. It'll
- // get restarted automatically the next time we
- // get an event.
- return
}
// If we are backing off this server then wait for the
@@ -193,47 +224,31 @@ func (oq *destinationQueue) backgroundSend() {
oq.backingOff.Store(false)
}
- // How many things do we have waiting?
- numPDUs := len(oq.pendingPDUs)
- numEDUs := len(oq.pendingEDUs)
- numInvites := len(oq.pendingInvites)
-
// If we have pending PDUs or EDUs then construct a transaction.
- if numPDUs > 0 || numEDUs > 0 {
+ if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 {
// Try sending the next transaction and see what happens.
- transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount())
+ transaction, terr := oq.nextTransaction(oq.pendingEDUs)
if terr != nil {
// We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp {
- // It's been suggested that we should give up because
- // the backoff has exceeded a maximum allowable value.
+ // It's been suggested that we should give up because the backoff
+ // has exceeded a maximum allowable value. Clean up the in-memory
+ // buffers at this point. The PDU clean-up is already on a defer.
+ oq.cleanPendingEDUs()
+ oq.cleanPendingInvites()
return
}
} else if transaction {
// If we successfully sent the transaction then clear out
- // the pending events and EDUs.
+ // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success()
- // Reallocate so that the underlying arrays can be GC'd, as
- // opposed to growing forever.
- for i := 0; i < numPDUs; i++ {
- oq.pendingPDUs[i] = nil
- }
- for i := 0; i < numEDUs; i++ {
- oq.pendingEDUs[i] = nil
- }
- oq.pendingPDUs = append(
- []*gomatrixserverlib.HeaderedEvent{},
- oq.pendingPDUs[numPDUs:]...,
- )
- oq.pendingEDUs = append(
- []*gomatrixserverlib.EDU{},
- oq.pendingEDUs[numEDUs:]...,
- )
+ // Clean up the in-memory buffers.
+ oq.cleanPendingEDUs()
}
}
// Try sending the next invite and see what happens.
- if numInvites > 0 {
+ if len(oq.pendingInvites) > 0 {
sent, ierr := oq.nextInvites(oq.pendingInvites)
if ierr != nil {
// We failed to send the transaction so increase the
@@ -249,59 +264,117 @@ func (oq *destinationQueue) backgroundSend() {
oq.statistics.Success()
// Reallocate so that the underlying array can be GC'd, as
// opposed to growing forever.
- oq.pendingInvites = append(
- []*gomatrixserverlib.InviteV2Request{},
- oq.pendingInvites[sent:]...,
- )
+ oq.cleanPendingInvites()
}
}
}
}
+// cleanPendingEDUs cleans out the pending EDU buffer, removing
+// all references so that the underlying objects can be GC'd.
+func (oq *destinationQueue) cleanPendingEDUs() {
+ for i := 0; i < len(oq.pendingEDUs); i++ {
+ oq.pendingEDUs[i] = nil
+ }
+ oq.pendingEDUs = []*gomatrixserverlib.EDU{}
+}
+
+// cleanPendingInvites cleans out the pending invite buffer,
+// removing all references so that the underlying objects can
+// be GC'd.
+func (oq *destinationQueue) cleanPendingInvites() {
+ for i := 0; i < len(oq.pendingInvites); i++ {
+ oq.pendingInvites[i] = nil
+ }
+ oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
+}
+
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
func (oq *destinationQueue) nextTransaction(
- pendingPDUs []*gomatrixserverlib.HeaderedEvent,
pendingEDUs []*gomatrixserverlib.EDU,
- sentCounter uint32,
) (bool, error) {
+ // Before we do anything, we need to roll over the transaction
+ // ID that is being used to coalesce events into the next TX.
+ // Otherwise it's possible that we'll pick up an incomplete
+ // transaction and end up nuking the rest of the events at the
+ // cleanup stage.
+ oq.transactionIDMutex.Lock()
+ oq.transactionID = ""
+ oq.transactionIDMutex.Unlock()
+ oq.transactionCount.Store(0)
+
+ // Create the transaction.
t := gomatrixserverlib.Transaction{
PDUs: []json.RawMessage{},
EDUs: []gomatrixserverlib.EDU{},
}
- now := gomatrixserverlib.AsTimestamp(time.Now())
- t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter))
t.Origin = oq.origin
t.Destination = oq.destination
- t.OriginServerTS = now
- t.PreviousIDs = oq.lastTransactionIDs
- if t.PreviousIDs == nil {
- t.PreviousIDs = []gomatrixserverlib.TransactionID{}
+ t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
+
+ // Ask the database for any pending PDUs from the next transaction.
+ // maxPDUsPerTransaction is an upper limit but we probably won't
+ // actually retrieve that many events.
+ txid, pdus, err := oq.db.GetNextTransactionPDUs(
+ context.TODO(), // context
+ oq.destination, // server name
+ maxPDUsPerTransaction, // max events to retrieve
+ )
+ if err != nil {
+ log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
+ return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
}
- oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
+ // If we didn't get anything from the database and there are no
+ // pending EDUs then there's nothing to do - stop here.
+ if len(pdus) == 0 && len(pendingEDUs) == 0 {
+ return false, nil
+ }
- for _, pdu := range pendingPDUs {
+ // Pick out the transaction ID from the database. If we didn't
+ // get a transaction ID (i.e. because there are no PDUs but only
+ // EDUs) then generate a transaction ID.
+ t.TransactionID = txid
+ if t.TransactionID == "" {
+ now := gomatrixserverlib.AsTimestamp(time.Now())
+ t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
+ }
+
+ // Go through PDUs that we retrieved from the database, if any,
+ // and add them into the transaction.
+ for _, pdu := range pdus {
// Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct
t.PDUs = append(t.PDUs, (*pdu).JSON())
}
+ // Do the same for pending EDUS in the queue.
for _, edu := range pendingEDUs {
t.EDUs = append(t.EDUs, *edu)
}
logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
+ // Try to send the transaction to the destination server.
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
- _, err := oq.client.SendTransaction(context.TODO(), t)
+ _, err = oq.client.SendTransaction(context.TODO(), t)
switch e := err.(type) {
case nil:
// No error was returned so the transaction looks to have
// been successfully sent.
+ oq.pendingPDUs.Sub(int32(len(t.PDUs)))
+ // Clean up the transaction in the database.
+ if err = oq.db.CleanTransactionPDUs(
+ context.TODO(),
+ t.Destination,
+ t.TransactionID,
+ ); err != nil {
+ log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination)
+ }
return true, nil
case gomatrix.HTTPError:
// We received a HTTP error back. In this instance we only
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 24034355..492d5f55 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -15,10 +15,13 @@
package queue
import (
+ "context"
"crypto/ed25519"
+ "encoding/json"
"fmt"
"sync"
+ "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -29,6 +32,7 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers
type OutgoingQueues struct {
+ db storage.Database
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
@@ -40,6 +44,7 @@ type OutgoingQueues struct {
// NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(
+ db storage.Database,
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI,
@@ -47,6 +52,7 @@ func NewOutgoingQueues(
signing *SigningInfo,
) *OutgoingQueues {
return &OutgoingQueues{
+ db: db,
rsAPI: rsAPI,
origin: origin,
client: client,
@@ -76,14 +82,15 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
+ db: oqs.db,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
- incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
+ wakeServerCh: make(chan bool, 128),
retryServerCh: make(chan bool),
signing: oqs.signing,
}
@@ -115,8 +122,18 @@ func (oqs *OutgoingQueues) SendEvent(
"destinations": destinations, "event": ev.EventID(),
}).Info("Sending event")
+ headeredJSON, err := json.Marshal(ev)
+ if err != nil {
+ return fmt.Errorf("json.Marshal: %w", err)
+ }
+
+ nid, err := oqs.db.StoreJSON(context.TODO(), string(headeredJSON))
+ if err != nil {
+ return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
+ }
+
for _, destination := range destinations {
- oqs.getQueue(destination).sendEvent(ev)
+ oqs.getQueue(destination).sendEvent(nid)
}
return nil
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index be195382..f4df93fa 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -19,10 +19,15 @@ import (
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
internal.PartitionStorer
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
+ StoreJSON(ctx context.Context, js string) (int64, error)
+ AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error
+ GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error)
+ CleanTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID) error
}
diff --git a/federationsender/storage/postgres/queue_json_table.go b/federationsender/storage/postgres/queue_json_table.go
new file mode 100644
index 00000000..eac2ea98
--- /dev/null
+++ b/federationsender/storage/postgres/queue_json_table.go
@@ -0,0 +1,111 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+const queueJSONSchema = `
+-- The federationsender_queue_json table contains event contents that
+-- we failed to send.
+CREATE TABLE IF NOT EXISTS federationsender_queue_json (
+ -- The JSON NID. This allows the federationsender_queue_retry table to
+ -- cross-reference to find the JSON blob.
+ json_nid BIGSERIAL,
+ -- The JSON body. Text so that we preserve UTF-8.
+ json_body TEXT NOT NULL
+);
+`
+
+const insertJSONSQL = "" +
+ "INSERT INTO federationsender_queue_json (json_body)" +
+ " VALUES ($1)" +
+ " RETURNING json_nid"
+
+const deleteJSONSQL = "" +
+ "DELETE FROM federationsender_queue_json WHERE json_nid = ANY($1)"
+
+const selectJSONSQL = "" +
+ "SELECT json_nid, json_body FROM federationsender_queue_json" +
+ " WHERE json_nid = ANY($1)"
+
+type queueJSONStatements struct {
+ insertJSONStmt *sql.Stmt
+ deleteJSONStmt *sql.Stmt
+ selectJSONStmt *sql.Stmt
+}
+
+func (s *queueJSONStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(queueJSONSchema)
+ if err != nil {
+ return
+ }
+ if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil {
+ return
+ }
+ if s.deleteJSONStmt, err = db.Prepare(deleteJSONSQL); err != nil {
+ return
+ }
+ if s.selectJSONStmt, err = db.Prepare(selectJSONSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *queueJSONStatements) insertQueueJSON(
+ ctx context.Context, txn *sql.Tx, json string,
+) (int64, error) {
+ stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
+ var lastid int64
+ if err := stmt.QueryRowContext(ctx, json).Scan(&lastid); err != nil {
+ return 0, err
+ }
+ return lastid, nil
+}
+
+func (s *queueJSONStatements) deleteQueueJSON(
+ ctx context.Context, txn *sql.Tx, nids []int64,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt)
+ _, err := stmt.ExecContext(ctx, pq.Int64Array(nids))
+ return err
+}
+
+func (s *queueJSONStatements) selectQueueJSON(
+ ctx context.Context, txn *sql.Tx, jsonNIDs []int64,
+) (map[int64][]byte, error) {
+ blobs := map[int64][]byte{}
+ stmt := sqlutil.TxStmt(txn, s.selectJSONStmt)
+ rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
+ for rows.Next() {
+ var nid int64
+ var blob []byte
+ if err = rows.Scan(&nid, &blob); err != nil {
+ return nil, err
+ }
+ blobs[nid] = blob
+ }
+ return blobs, err
+}
diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go
new file mode 100644
index 00000000..ef7a9f41
--- /dev/null
+++ b/federationsender/storage/postgres/queue_pdus_table.go
@@ -0,0 +1,169 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const queuePDUsSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
+ -- The transaction ID that was generated before persisting the event.
+ transaction_id TEXT NOT NULL,
+ -- The destination server that we will send the event to.
+ server_name TEXT NOT NULL,
+ -- The JSON NID from the federationsender_queue_pdus_json table.
+ json_nid BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx
+ ON federationsender_queue_pdus (json_nid, server_name);
+`
+
+const insertQueuePDUSQL = "" +
+ "INSERT INTO federationsender_queue_pdus (transaction_id, server_name, json_nid)" +
+ " VALUES ($1, $2, $3)"
+
+const deleteQueueTransactionPDUsSQL = "" +
+ "DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND transaction_id = $2"
+
+const selectQueueNextTransactionIDSQL = "" +
+ "SELECT transaction_id FROM federationsender_queue_pdus" +
+ " WHERE server_name = $1" +
+ " ORDER BY transaction_id ASC" +
+ " LIMIT 1"
+
+const selectQueuePDUsByTransactionSQL = "" +
+ "SELECT json_nid FROM federationsender_queue_pdus" +
+ " WHERE server_name = $1 AND transaction_id = $2" +
+ " LIMIT $3"
+
+const selectQueueReferenceJSONCountSQL = "" +
+ "SELECT COUNT(*) FROM federationsender_queue_pdus" +
+ " WHERE json_nid = $1"
+
+type queuePDUsStatements struct {
+ insertQueuePDUStmt *sql.Stmt
+ deleteQueueTransactionPDUsStmt *sql.Stmt
+ selectQueueNextTransactionIDStmt *sql.Stmt
+ selectQueuePDUsByTransactionStmt *sql.Stmt
+ selectQueueReferenceJSONCountStmt *sql.Stmt
+}
+
+func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(queuePDUsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil {
+ return
+ }
+ if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil {
+ return
+ }
+ if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
+ return
+ }
+ if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
+ return
+ }
+ if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *queuePDUsStatements) insertQueuePDU(
+ ctx context.Context,
+ txn *sql.Tx,
+ transactionID gomatrixserverlib.TransactionID,
+ serverName gomatrixserverlib.ServerName,
+ nid int64,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
+ _, err := stmt.ExecContext(
+ ctx,
+ transactionID, // the transaction ID that we initially attempted
+ serverName, // destination server name
+ nid, // JSON blob NID
+ )
+ return err
+}
+
+func (s *queuePDUsStatements) deleteQueueTransaction(
+ ctx context.Context, txn *sql.Tx,
+ serverName gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt)
+ _, err := stmt.ExecContext(ctx, serverName, transactionID)
+ return err
+}
+
+func (s *queuePDUsStatements) selectQueueNextTransactionID(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
+) (gomatrixserverlib.TransactionID, error) {
+ var transactionID gomatrixserverlib.TransactionID
+ stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
+ err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
+ if err == sql.ErrNoRows {
+ return "", nil
+ }
+ return transactionID, err
+}
+
+func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
+ ctx context.Context, txn *sql.Tx, jsonNID int64,
+) (int64, error) {
+ var count int64
+ stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt)
+ err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
+ if err == sql.ErrNoRows {
+ // It's acceptable for there to be no rows referencing a given
+ // JSON NID but it's not an error condition. Just return as if
+ // there's a zero count.
+ return 0, nil
+ }
+ return count, err
+}
+
+func (s *queuePDUsStatements) selectQueuePDUs(
+ ctx context.Context, txn *sql.Tx,
+ serverName gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+ limit int,
+) ([]int64, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
+ rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
+ var result []int64
+ for rows.Next() {
+ var nid int64
+ if err = rows.Scan(&nid); err != nil {
+ return nil, err
+ }
+ result = append(result, nid)
+ }
+
+ return result, rows.Err()
+}
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index 8fd4c11a..18d1532a 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -18,15 +18,20 @@ package postgres
import (
"context"
"database/sql"
+ "encoding/json"
+ "fmt"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
+ queuePDUsStatements
+ queueJSONStatements
sqlutil.PartitionOffsetStatements
db *sql.DB
}
@@ -55,6 +60,14 @@ func (d *Database) prepare() error {
return err
}
+ if err = d.queuePDUsStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.queueJSONStatements.prepare(d.db); err != nil {
+ return err
+ }
+
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
@@ -120,3 +133,125 @@ func (d *Database) GetJoinedHosts(
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}
+
+// StoreJSON adds a JSON blob into the queue JSON table and returns
+// a NID. The NID will then be used when inserting the per-destination
+// metadata entries.
+func (d *Database) StoreJSON(
+ ctx context.Context, js string,
+) (int64, error) {
+ nid, err := d.insertQueueJSON(ctx, nil, js)
+ if err != nil {
+ return 0, fmt.Errorf("d.insertQueueJSON: %w", err)
+ }
+ return nid, nil
+}
+
+// AssociatePDUWithDestination creates an association that the
+// destination queues will use to determine which JSON blobs to send
+// to which servers.
+func (d *Database) AssociatePDUWithDestination(
+ ctx context.Context,
+ transactionID gomatrixserverlib.TransactionID,
+ serverName gomatrixserverlib.ServerName,
+ nids []int64,
+) error {
+ return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
+ for _, nid := range nids {
+ if err := d.insertQueuePDU(
+ ctx, // context
+ txn, // SQL transaction
+ transactionID, // transaction ID
+ serverName, // destination server name
+ nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
+ }
+ }
+ return nil
+ })
+}
+
+// GetNextTransactionPDUs retrieves events from the database for
+// the next pending transaction, up to the limit specified.
+func (d *Database) GetNextTransactionPDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ limit int,
+) (
+ transactionID gomatrixserverlib.TransactionID,
+ events []*gomatrixserverlib.HeaderedEvent,
+ err error,
+) {
+ err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
+ transactionID, err = d.selectQueueNextTransactionID(ctx, txn, serverName)
+ if err != nil {
+ return fmt.Errorf("d.selectQueueNextTransactionID: %w", err)
+ }
+
+ if transactionID == "" {
+ return nil
+ }
+
+ nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, limit)
+ if err != nil {
+ return fmt.Errorf("d.selectQueuePDUs: %w", err)
+ }
+
+ blobs, err := d.selectQueueJSON(ctx, txn, nids)
+ if err != nil {
+ return fmt.Errorf("d.selectJSON: %w", err)
+ }
+
+ for _, blob := range blobs {
+ var event gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal(blob, &event); err != nil {
+ return fmt.Errorf("json.Unmarshal: %w", err)
+ }
+ events = append(events, &event)
+ }
+
+ return nil
+ })
+ return
+}
+
+// CleanTransactionPDUs cleans up all associated events for a
+// given transaction. This is done when the transaction was sent
+// successfully.
+func (d *Database) CleanTransactionPDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+) error {
+ return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
+ nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
+ if err != nil {
+ return fmt.Errorf("d.selectQueuePDUs: %w", err)
+ }
+
+ if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
+ return fmt.Errorf("d.deleteQueueTransaction: %w", err)
+ }
+
+ var count int64
+ var deleteNIDs []int64
+ for _, nid := range nids {
+ count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
+ if err != nil {
+ return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
+ }
+ if count == 0 {
+ deleteNIDs = append(deleteNIDs, nid)
+ }
+ }
+
+ if len(deleteNIDs) > 0 {
+ if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
+ return fmt.Errorf("d.deleteQueueJSON: %w", err)
+ }
+ }
+
+ return nil
+ })
+}
diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go
new file mode 100644
index 00000000..01b7160d
--- /dev/null
+++ b/federationsender/storage/sqlite3/queue_json_table.go
@@ -0,0 +1,132 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+const queueJSONSchema = `
+-- The queue_retry_json table contains event contents that
+-- we failed to send.
+CREATE TABLE IF NOT EXISTS federationsender_queue_json (
+ -- The JSON NID. This allows the federationsender_queue_retry table to
+ -- cross-reference to find the JSON blob.
+ json_nid INTEGER PRIMARY KEY AUTOINCREMENT,
+ -- The JSON body. Text so that we preserve UTF-8.
+ json_body TEXT NOT NULL
+);
+`
+
+const insertJSONSQL = "" +
+ "INSERT INTO federationsender_queue_json (json_body)" +
+ " VALUES ($1)"
+
+const deleteJSONSQL = "" +
+ "DELETE FROM federationsender_queue_json WHERE json_nid IN ($1)"
+
+const selectJSONSQL = "" +
+ "SELECT json_nid, json_body FROM federationsender_queue_json" +
+ " WHERE json_nid IN ($1)"
+
+type queueJSONStatements struct {
+ insertJSONStmt *sql.Stmt
+ //deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic
+ //selectJSONStmt *sql.Stmt - prepared at runtime due to variadic
+}
+
+func (s *queueJSONStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(queueJSONSchema)
+ if err != nil {
+ return
+ }
+ if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *queueJSONStatements) insertQueueJSON(
+ ctx context.Context, txn *sql.Tx, json string,
+) (int64, error) {
+ stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
+ res, err := stmt.ExecContext(ctx, json)
+ if err != nil {
+ return 0, fmt.Errorf("stmt.QueryContext: %w", err)
+ }
+ lastid, err := res.LastInsertId()
+ if err != nil {
+ return 0, fmt.Errorf("res.LastInsertId: %w", err)
+ }
+ return lastid, nil
+}
+
+func (s *queueJSONStatements) deleteQueueJSON(
+ ctx context.Context, txn *sql.Tx, nids []int64,
+) error {
+ deleteSQL := strings.Replace(deleteJSONSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1)
+ deleteStmt, err := txn.Prepare(deleteSQL)
+ if err != nil {
+ return fmt.Errorf("s.deleteQueueJSON s.db.Prepare: %w", err)
+ }
+
+ iNIDs := make([]interface{}, len(nids))
+ for k, v := range nids {
+ iNIDs[k] = v
+ }
+
+ stmt := sqlutil.TxStmt(txn, deleteStmt)
+ _, err = stmt.ExecContext(ctx, iNIDs...)
+ return err
+}
+
+func (s *queueJSONStatements) selectQueueJSON(
+ ctx context.Context, txn *sql.Tx, jsonNIDs []int64,
+) (map[int64][]byte, error) {
+ selectSQL := strings.Replace(selectJSONSQL, "($1)", sqlutil.QueryVariadic(len(jsonNIDs)), 1)
+ selectStmt, err := txn.Prepare(selectSQL)
+ if err != nil {
+ return nil, fmt.Errorf("s.selectQueueJSON s.db.Prepare: %w", err)
+ }
+
+ iNIDs := make([]interface{}, len(jsonNIDs))
+ for k, v := range jsonNIDs {
+ iNIDs[k] = v
+ }
+
+ blobs := map[int64][]byte{}
+ stmt := sqlutil.TxStmt(txn, selectStmt)
+ rows, err := stmt.QueryContext(ctx, iNIDs...)
+ if err != nil {
+ return nil, fmt.Errorf("s.selectQueueJSON stmt.QueryContext: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
+ for rows.Next() {
+ var nid int64
+ var blob []byte
+ if err = rows.Scan(&nid, &blob); err != nil {
+ return nil, fmt.Errorf("s.selectQueueJSON rows.Scan: %w", err)
+ }
+ blobs[nid] = blob
+ }
+ return blobs, err
+}
diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go
new file mode 100644
index 00000000..dc08fd70
--- /dev/null
+++ b/federationsender/storage/sqlite3/queue_pdus_table.go
@@ -0,0 +1,167 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const queuePDUsSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
+ -- The transaction ID that was generated before persisting the event.
+ transaction_id TEXT NOT NULL,
+ -- The domain part of the user ID the m.room.member event is for.
+ server_name TEXT NOT NULL,
+ -- The JSON NID from the federationsender_queue_pdus_json table.
+ json_nid BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx
+ ON federationsender_queue_pdus (json_nid, server_name);
+`
+
+const insertQueuePDUSQL = "" +
+ "INSERT INTO federationsender_queue_pdus (transaction_id, server_name, json_nid)" +
+ " VALUES ($1, $2, $3)"
+
+const deleteQueueTransactionPDUsSQL = "" +
+ "DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND transaction_id = $2"
+
+const selectQueueNextTransactionIDSQL = "" +
+ "SELECT transaction_id FROM federationsender_queue_pdus" +
+ " WHERE server_name = $1" +
+ " ORDER BY transaction_id ASC" +
+ " LIMIT 1"
+
+const selectQueuePDUsByTransactionSQL = "" +
+ "SELECT json_nid FROM federationsender_queue_pdus" +
+ " WHERE server_name = $1 AND transaction_id = $2" +
+ " LIMIT $3"
+
+const selectQueueReferenceJSONCountSQL = "" +
+ "SELECT COUNT(*) FROM federationsender_queue_pdus" +
+ " WHERE json_nid = $1"
+
+type queuePDUsStatements struct {
+ insertQueuePDUStmt *sql.Stmt
+ deleteQueueTransactionPDUsStmt *sql.Stmt
+ selectQueueNextTransactionIDStmt *sql.Stmt
+ selectQueuePDUsByTransactionStmt *sql.Stmt
+ selectQueueReferenceJSONCountStmt *sql.Stmt
+}
+
+func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(queuePDUsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil {
+ return
+ }
+ if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil {
+ return
+ }
+ if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
+ return
+ }
+ if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
+ return
+ }
+ if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *queuePDUsStatements) insertQueuePDU(
+ ctx context.Context,
+ txn *sql.Tx,
+ transactionID gomatrixserverlib.TransactionID,
+ serverName gomatrixserverlib.ServerName,
+ nid int64,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
+ _, err := stmt.ExecContext(
+ ctx,
+ transactionID, // the transaction ID that we initially attempted
+ serverName, // destination server name
+ nid, // JSON blob NID
+ )
+ return err
+}
+
+func (s *queuePDUsStatements) deleteQueueTransaction(
+ ctx context.Context, txn *sql.Tx,
+ serverName gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt)
+ _, err := stmt.ExecContext(ctx, serverName, transactionID)
+ return err
+}
+
+func (s *queuePDUsStatements) selectQueueNextTransactionID(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
+) (gomatrixserverlib.TransactionID, error) {
+ var transactionID gomatrixserverlib.TransactionID
+ stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
+ err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
+ if err == sql.ErrNoRows {
+ return "", nil
+ }
+ return transactionID, err
+}
+
+func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
+ ctx context.Context, txn *sql.Tx, jsonNID int64,
+) (int64, error) {
+ var count int64
+ stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt)
+ err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
+ if err == sql.ErrNoRows {
+ return -1, nil
+ }
+ return count, err
+}
+
+func (s *queuePDUsStatements) selectQueuePDUs(
+ ctx context.Context, txn *sql.Tx,
+ serverName gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+ limit int,
+) ([]int64, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
+ rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
+ var result []int64
+ for rows.Next() {
+ var nid int64
+ if err = rows.Scan(&nid); err != nil {
+ return nil, err
+ }
+ result = append(result, nid)
+ }
+
+ return result, rows.Err()
+}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index ac303f64..7629ecd2 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -18,17 +18,22 @@ package sqlite3
import (
"context"
"database/sql"
+ "encoding/json"
+ "fmt"
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
+ queuePDUsStatements
+ queueJSONStatements
sqlutil.PartitionOffsetStatements
db *sql.DB
}
@@ -61,6 +66,14 @@ func (d *Database) prepare() error {
return err
}
+ if err = d.queuePDUsStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.queueJSONStatements.prepare(d.db); err != nil {
+ return err
+ }
+
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
@@ -126,3 +139,125 @@ func (d *Database) GetJoinedHosts(
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}
+
+// StoreJSON adds a JSON blob into the queue JSON table and returns
+// a NID. The NID will then be used when inserting the per-destination
+// metadata entries.
+func (d *Database) StoreJSON(
+ ctx context.Context, js string,
+) (int64, error) {
+ nid, err := d.insertQueueJSON(ctx, nil, js)
+ if err != nil {
+ return 0, fmt.Errorf("d.insertQueueJSON: %w", err)
+ }
+ return nid, nil
+}
+
+// AssociatePDUWithDestination creates an association that the
+// destination queues will use to determine which JSON blobs to send
+// to which servers.
+func (d *Database) AssociatePDUWithDestination(
+ ctx context.Context,
+ transactionID gomatrixserverlib.TransactionID,
+ serverName gomatrixserverlib.ServerName,
+ nids []int64,
+) error {
+ return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
+ for _, nid := range nids {
+ if err := d.insertQueuePDU(
+ ctx, // context
+ txn, // SQL transaction
+ transactionID, // transaction ID
+ serverName, // destination server name
+ nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
+ }
+ }
+ return nil
+ })
+}
+
+// GetNextTransactionPDUs retrieves events from the database for
+// the next pending transaction, up to the limit specified.
+func (d *Database) GetNextTransactionPDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ limit int,
+) (
+ transactionID gomatrixserverlib.TransactionID,
+ events []*gomatrixserverlib.HeaderedEvent,
+ err error,
+) {
+ err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
+ transactionID, err = d.selectQueueNextTransactionID(ctx, txn, serverName)
+ if err != nil {
+ return fmt.Errorf("d.selectQueueNextTransactionID: %w", err)
+ }
+
+ if transactionID == "" {
+ return nil
+ }
+
+ nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, limit)
+ if err != nil {
+ return fmt.Errorf("d.selectQueuePDUs: %w", err)
+ }
+
+ blobs, err := d.selectQueueJSON(ctx, txn, nids)
+ if err != nil {
+ return fmt.Errorf("d.selectJSON: %w", err)
+ }
+
+ for _, blob := range blobs {
+ var event gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal(blob, &event); err != nil {
+ return fmt.Errorf("json.Unmarshal: %w", err)
+ }
+ events = append(events, &event)
+ }
+
+ return nil
+ })
+ return
+}
+
+// CleanTransactionPDUs cleans up all associated events for a
+// given transaction. This is done when the transaction was sent
+// successfully.
+func (d *Database) CleanTransactionPDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+) error {
+ return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
+ nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
+ if err != nil {
+ return fmt.Errorf("d.selectQueuePDUs: %w", err)
+ }
+
+ if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
+ return fmt.Errorf("d.deleteQueueTransaction: %w", err)
+ }
+
+ var count int64
+ var deleteNIDs []int64
+ for _, nid := range nids {
+ count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
+ if err != nil {
+ return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
+ }
+ if count == 0 {
+ deleteNIDs = append(deleteNIDs, nid)
+ }
+ }
+
+ if len(deleteNIDs) > 0 {
+ if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
+ return fmt.Errorf("d.deleteQueueJSON: %w", err)
+ }
+ }
+
+ return nil
+ })
+}