aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/storage')
-rw-r--r--federationsender/storage/interface.go10
-rw-r--r--federationsender/storage/postgres/queue_pdus_table.go37
-rw-r--r--federationsender/storage/shared/storage.go14
-rw-r--r--federationsender/storage/shared/storage_edus.go47
-rw-r--r--federationsender/storage/shared/storage_pdus.go57
-rw-r--r--federationsender/storage/sqlite3/queue_pdus_table.go15
-rw-r--r--federationsender/storage/tables/interface.go3
7 files changed, 70 insertions, 113 deletions
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index a3f5073f..03d616f1 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -36,14 +36,14 @@ type Database interface {
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)
+ GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error)
+ GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error)
+
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
- GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, *shared.Receipt, error)
- GetNextTransactionEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) ([]*gomatrixserverlib.EDU, *shared.Receipt, error)
-
- CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
- CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
+ CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
+ CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)
GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)
diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go
index 95a3b9ee..f9a47748 100644
--- a/federationsender/storage/postgres/queue_pdus_table.go
+++ b/federationsender/storage/postgres/queue_pdus_table.go
@@ -45,16 +45,10 @@ const insertQueuePDUSQL = "" +
const deleteQueuePDUSQL = "" +
"DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND json_nid = ANY($2)"
-const selectQueuePDUNextTransactionIDSQL = "" +
- "SELECT transaction_id FROM federationsender_queue_pdus" +
- " WHERE server_name = $1" +
- " ORDER BY transaction_id ASC" +
- " LIMIT 1"
-
-const selectQueuePDUsByTransactionSQL = "" +
+const selectQueuePDUsSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" +
- " WHERE server_name = $1 AND transaction_id = $2" +
- " LIMIT $3"
+ " WHERE server_name = $1" +
+ " LIMIT $2"
const selectQueuePDUReferenceJSONCountSQL = "" +
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
@@ -71,8 +65,7 @@ type queuePDUsStatements struct {
db *sql.DB
insertQueuePDUStmt *sql.Stmt
deleteQueuePDUsStmt *sql.Stmt
- selectQueuePDUNextTransactionIDStmt *sql.Stmt
- selectQueuePDUsByTransactionStmt *sql.Stmt
+ selectQueuePDUsStmt *sql.Stmt
selectQueuePDUReferenceJSONCountStmt *sql.Stmt
selectQueuePDUsCountStmt *sql.Stmt
selectQueuePDUServerNamesStmt *sql.Stmt
@@ -92,10 +85,7 @@ func NewPostgresQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
if s.deleteQueuePDUsStmt, err = s.db.Prepare(deleteQueuePDUSQL); err != nil {
return
}
- if s.selectQueuePDUNextTransactionIDStmt, err = s.db.Prepare(selectQueuePDUNextTransactionIDSQL); err != nil {
- return
- }
- if s.selectQueuePDUsByTransactionStmt, err = s.db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
+ if s.selectQueuePDUsStmt, err = s.db.Prepare(selectQueuePDUsSQL); err != nil {
return
}
if s.selectQueuePDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueuePDUReferenceJSONCountSQL); err != nil {
@@ -137,18 +127,6 @@ func (s *queuePDUsStatements) DeleteQueuePDUs(
return err
}
-func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID(
- ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
-) (gomatrixserverlib.TransactionID, error) {
- var transactionID gomatrixserverlib.TransactionID
- stmt := sqlutil.TxStmt(txn, s.selectQueuePDUNextTransactionIDStmt)
- err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
- if err == sql.ErrNoRows {
- return "", nil
- }
- return transactionID, err
-}
-
func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64,
) (int64, error) {
@@ -182,11 +160,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount(
func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
- transactionID gomatrixserverlib.TransactionID,
limit int,
) ([]int64, error) {
- stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
- rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
+ stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
+ rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil {
return nil, err
}
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index af9d0d6a..fbf84c70 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -17,7 +17,6 @@ package shared
import (
"context"
"database/sql"
- "encoding/json"
"fmt"
"github.com/matrix-org/dendrite/federationsender/storage/tables"
@@ -44,16 +43,11 @@ type Database struct {
// to pass them back so that we can clean up if the transaction sends
// successfully.
type Receipt struct {
- nids []int64
+ nid int64
}
-func (e *Receipt) Empty() bool {
- return len(e.nids) == 0
-}
-
-func (e *Receipt) String() string {
- j, _ := json.Marshal(e.nids)
- return string(j)
+func (r *Receipt) String() string {
+ return fmt.Sprintf("%d", r.nid)
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
@@ -146,7 +140,7 @@ func (d *Database) StoreJSON(
return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
}
return &Receipt{
- nids: []int64{nid},
+ nid: nid,
}, nil
}
diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go
index ae1d1511..86fee1a3 100644
--- a/federationsender/storage/shared/storage_edus.go
+++ b/federationsender/storage/shared/storage_edus.go
@@ -33,16 +33,14 @@ func (d *Database) AssociateEDUWithDestination(
receipt *Receipt,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- for _, nid := range receipt.nids {
- if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
- ctx, // context
- txn, // SQL transaction
- "", // TODO: EDU type for coalescing
- serverName, // destination server name
- nid, // NID from the federationsender_queue_json table
- ); err != nil {
- return fmt.Errorf("InsertQueueEDU: %w", err)
- }
+ if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
+ ctx, // context
+ txn, // SQL transaction
+ "", // TODO: EDU type for coalescing
+ serverName, // destination server name
+ receipt.nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("InsertQueueEDU: %w", err)
}
return nil
})
@@ -50,29 +48,25 @@ func (d *Database) AssociateEDUWithDestination(
// GetNextTransactionEDUs retrieves events from the database for
// the next pending transaction, up to the limit specified.
-func (d *Database) GetNextTransactionEDUs(
+func (d *Database) GetPendingEDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
limit int,
) (
- edus []*gomatrixserverlib.EDU,
- receipt *Receipt,
+ edus map[*Receipt]*gomatrixserverlib.EDU,
err error,
) {
+ edus = make(map[*Receipt]*gomatrixserverlib.EDU)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err)
}
- receipt = &Receipt{
- nids: nids,
- }
-
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
- edus = append(edus, edu)
+ edus[&Receipt{nid}] = edu
} else {
retrieve = append(retrieve, nid)
}
@@ -83,12 +77,12 @@ func (d *Database) GetNextTransactionEDUs(
return fmt.Errorf("SelectQueueJSON: %w", err)
}
- for _, blob := range blobs {
+ for nid, blob := range blobs {
var event gomatrixserverlib.EDU
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
- edus = append(edus, &event)
+ edus[&Receipt{nid}] = &event
}
return nil
@@ -101,19 +95,24 @@ func (d *Database) GetNextTransactionEDUs(
func (d *Database) CleanEDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
- receipt *Receipt,
+ receipts []*Receipt,
) error {
- if receipt == nil {
+ if len(receipts) == 0 {
return errors.New("expected receipt")
}
+ nids := make([]int64, len(receipts))
+ for i := range receipts {
+ nids[i] = receipts[i].nid
+ }
+
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil {
+ if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil {
return err
}
var deleteNIDs []int64
- for _, nid := range receipt.nids {
+ for _, nid := range nids {
count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
if err != nil {
return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err)
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 09235a5e..bc298a90 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -34,16 +34,14 @@ func (d *Database) AssociatePDUWithDestination(
receipt *Receipt,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- for _, nid := range receipt.nids {
- if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
- ctx, // context
- txn, // SQL transaction
- transactionID, // transaction ID
- serverName, // destination server name
- nid, // NID from the federationsender_queue_json table
- ); err != nil {
- return fmt.Errorf("InsertQueuePDU: %w", err)
- }
+ if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
+ ctx, // context
+ txn, // SQL transaction
+ transactionID, // transaction ID
+ serverName, // destination server name
+ receipt.nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("InsertQueuePDU: %w", err)
}
return nil
})
@@ -51,14 +49,12 @@ func (d *Database) AssociatePDUWithDestination(
// GetNextTransactionPDUs retrieves events from the database for
// the next pending transaction, up to the limit specified.
-func (d *Database) GetNextTransactionPDUs(
+func (d *Database) GetPendingPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
limit int,
) (
- transactionID gomatrixserverlib.TransactionID,
- events []*gomatrixserverlib.HeaderedEvent,
- receipt *Receipt,
+ events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
err error,
) {
// Strictly speaking this doesn't need to be using the writer
@@ -66,29 +62,17 @@ func (d *Database) GetNextTransactionPDUs(
// a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify
// the database.
+ events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName)
- if err != nil {
- return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err)
- }
-
- if transactionID == "" {
- return nil
- }
-
- nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, transactionID, limit)
+ nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueuePDUs: %w", err)
}
- receipt = &Receipt{
- nids: nids,
- }
-
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
- events = append(events, event)
+ events[&Receipt{nid}] = event
} else {
retrieve = append(retrieve, nid)
}
@@ -104,7 +88,7 @@ func (d *Database) GetNextTransactionPDUs(
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
- events = append(events, &event)
+ events[&Receipt{nid}] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
@@ -119,19 +103,24 @@ func (d *Database) GetNextTransactionPDUs(
func (d *Database) CleanPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
- receipt *Receipt,
+ receipts []*Receipt,
) error {
- if receipt == nil {
+ if len(receipts) == 0 {
return errors.New("expected receipt")
}
+ nids := make([]int64, len(receipts))
+ for i := range receipts {
+ nids[i] = receipts[i].nid
+ }
+
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
+ if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
return err
}
var deleteNIDs []int64
- for _, nid := range receipt.nids {
+ for _, nid := range nids {
count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
if err != nil {
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)
diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go
index 70519c9e..e0fdbda5 100644
--- a/federationsender/storage/sqlite3/queue_pdus_table.go
+++ b/federationsender/storage/sqlite3/queue_pdus_table.go
@@ -53,10 +53,10 @@ const selectQueueNextTransactionIDSQL = "" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
-const selectQueuePDUsByTransactionSQL = "" +
+const selectQueuePDUsSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" +
- " WHERE server_name = $1 AND transaction_id = $2" +
- " LIMIT $3"
+ " WHERE server_name = $1" +
+ " LIMIT $2"
const selectQueuePDUsReferenceJSONCountSQL = "" +
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
@@ -73,7 +73,7 @@ type queuePDUsStatements struct {
db *sql.DB
insertQueuePDUStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt
- selectQueuePDUsByTransactionStmt *sql.Stmt
+ selectQueuePDUsStmt *sql.Stmt
selectQueueReferenceJSONCountStmt *sql.Stmt
selectQueuePDUsCountStmt *sql.Stmt
selectQueueServerNamesStmt *sql.Stmt
@@ -97,7 +97,7 @@ func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
return
}
- if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
+ if s.selectQueuePDUsStmt, err = db.Prepare(selectQueuePDUsSQL); err != nil {
return
}
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueuePDUsReferenceJSONCountSQL); err != nil {
@@ -193,11 +193,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount(
func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
- transactionID gomatrixserverlib.TransactionID,
limit int,
) ([]int64, error) {
- stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
- rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
+ stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
+ rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil {
return nil, err
}
diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go
index 1167a212..69e952de 100644
--- a/federationsender/storage/tables/interface.go
+++ b/federationsender/storage/tables/interface.go
@@ -25,10 +25,9 @@ import (
type FederationSenderQueuePDUs interface {
InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error
DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
- SelectQueuePDUNextTransactionID(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (gomatrixserverlib.TransactionID, error)
SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)
SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error)
- SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, limit int) ([]int64, error)
+ SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error)
SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
}