diff options
Diffstat (limited to 'federationsender/storage')
-rw-r--r-- | federationsender/storage/interface.go | 10 | ||||
-rw-r--r-- | federationsender/storage/postgres/queue_pdus_table.go | 37 | ||||
-rw-r--r-- | federationsender/storage/shared/storage.go | 14 | ||||
-rw-r--r-- | federationsender/storage/shared/storage_edus.go | 47 | ||||
-rw-r--r-- | federationsender/storage/shared/storage_pdus.go | 57 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/queue_pdus_table.go | 15 | ||||
-rw-r--r-- | federationsender/storage/tables/interface.go | 3 |
7 files changed, 70 insertions, 113 deletions
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index a3f5073f..03d616f1 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -36,14 +36,14 @@ type Database interface { StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) + GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error) + GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) + AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error - GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, *shared.Receipt, error) - GetNextTransactionEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) ([]*gomatrixserverlib.EDU, *shared.Receipt, error) - - CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error - CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error + CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error + CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go index 95a3b9ee..f9a47748 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -45,16 +45,10 @@ const insertQueuePDUSQL = "" + const deleteQueuePDUSQL = "" + "DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND json_nid = ANY($2)" -const selectQueuePDUNextTransactionIDSQL = "" + - "SELECT transaction_id FROM federationsender_queue_pdus" + - " WHERE server_name = $1" + - " ORDER BY transaction_id ASC" + - " LIMIT 1" - -const selectQueuePDUsByTransactionSQL = "" + +const selectQueuePDUsSQL = "" + "SELECT json_nid FROM federationsender_queue_pdus" + - " WHERE server_name = $1 AND transaction_id = $2" + - " LIMIT $3" + " WHERE server_name = $1" + + " LIMIT $2" const selectQueuePDUReferenceJSONCountSQL = "" + "SELECT COUNT(*) FROM federationsender_queue_pdus" + @@ -71,8 +65,7 @@ type queuePDUsStatements struct { db *sql.DB insertQueuePDUStmt *sql.Stmt deleteQueuePDUsStmt *sql.Stmt - selectQueuePDUNextTransactionIDStmt *sql.Stmt - selectQueuePDUsByTransactionStmt *sql.Stmt + selectQueuePDUsStmt *sql.Stmt selectQueuePDUReferenceJSONCountStmt *sql.Stmt selectQueuePDUsCountStmt *sql.Stmt selectQueuePDUServerNamesStmt *sql.Stmt @@ -92,10 +85,7 @@ func NewPostgresQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) { if s.deleteQueuePDUsStmt, err = s.db.Prepare(deleteQueuePDUSQL); err != nil { return } - if s.selectQueuePDUNextTransactionIDStmt, err = s.db.Prepare(selectQueuePDUNextTransactionIDSQL); err != nil { - return - } - if s.selectQueuePDUsByTransactionStmt, err = s.db.Prepare(selectQueuePDUsByTransactionSQL); err != nil { + if s.selectQueuePDUsStmt, err = s.db.Prepare(selectQueuePDUsSQL); err != nil { return } if s.selectQueuePDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueuePDUReferenceJSONCountSQL); err != nil { @@ -137,18 +127,6 @@ func (s *queuePDUsStatements) DeleteQueuePDUs( return err } -func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID( - ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, -) (gomatrixserverlib.TransactionID, error) { - var transactionID gomatrixserverlib.TransactionID - stmt := sqlutil.TxStmt(txn, s.selectQueuePDUNextTransactionIDStmt) - err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) - if err == sql.ErrNoRows { - return "", nil - } - return transactionID, err -} - func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( ctx context.Context, txn *sql.Tx, jsonNID int64, ) (int64, error) { @@ -182,11 +160,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount( func (s *queuePDUsStatements) SelectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, - transactionID gomatrixserverlib.TransactionID, limit int, ) ([]int64, error) { - stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) - rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt) + rows, err := stmt.QueryContext(ctx, serverName, limit) if err != nil { return nil, err } diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index af9d0d6a..fbf84c70 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -17,7 +17,6 @@ package shared import ( "context" "database/sql" - "encoding/json" "fmt" "github.com/matrix-org/dendrite/federationsender/storage/tables" @@ -44,16 +43,11 @@ type Database struct { // to pass them back so that we can clean up if the transaction sends // successfully. type Receipt struct { - nids []int64 + nid int64 } -func (e *Receipt) Empty() bool { - return len(e.nids) == 0 -} - -func (e *Receipt) String() string { - j, _ := json.Marshal(e.nids) - return string(j) +func (r *Receipt) String() string { + return fmt.Sprintf("%d", r.nid) } // UpdateRoom updates the joined hosts for a room and returns what the joined @@ -146,7 +140,7 @@ func (d *Database) StoreJSON( return nil, fmt.Errorf("d.insertQueueJSON: %w", err) } return &Receipt{ - nids: []int64{nid}, + nid: nid, }, nil } diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go index ae1d1511..86fee1a3 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationsender/storage/shared/storage_edus.go @@ -33,16 +33,14 @@ func (d *Database) AssociateEDUWithDestination( receipt *Receipt, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - for _, nid := range receipt.nids { - if err := d.FederationSenderQueueEDUs.InsertQueueEDU( - ctx, // context - txn, // SQL transaction - "", // TODO: EDU type for coalescing - serverName, // destination server name - nid, // NID from the federationsender_queue_json table - ); err != nil { - return fmt.Errorf("InsertQueueEDU: %w", err) - } + if err := d.FederationSenderQueueEDUs.InsertQueueEDU( + ctx, // context + txn, // SQL transaction + "", // TODO: EDU type for coalescing + serverName, // destination server name + receipt.nid, // NID from the federationsender_queue_json table + ); err != nil { + return fmt.Errorf("InsertQueueEDU: %w", err) } return nil }) @@ -50,29 +48,25 @@ func (d *Database) AssociateEDUWithDestination( // GetNextTransactionEDUs retrieves events from the database for // the next pending transaction, up to the limit specified. -func (d *Database) GetNextTransactionEDUs( +func (d *Database) GetPendingEDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, limit int, ) ( - edus []*gomatrixserverlib.EDU, - receipt *Receipt, + edus map[*Receipt]*gomatrixserverlib.EDU, err error, ) { + edus = make(map[*Receipt]*gomatrixserverlib.EDU) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueueEDUs: %w", err) } - receipt = &Receipt{ - nids: nids, - } - retrieve := make([]int64, 0, len(nids)) for _, nid := range nids { if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { - edus = append(edus, edu) + edus[&Receipt{nid}] = edu } else { retrieve = append(retrieve, nid) } @@ -83,12 +77,12 @@ func (d *Database) GetNextTransactionEDUs( return fmt.Errorf("SelectQueueJSON: %w", err) } - for _, blob := range blobs { + for nid, blob := range blobs { var event gomatrixserverlib.EDU if err := json.Unmarshal(blob, &event); err != nil { return fmt.Errorf("json.Unmarshal: %w", err) } - edus = append(edus, &event) + edus[&Receipt{nid}] = &event } return nil @@ -101,19 +95,24 @@ func (d *Database) GetNextTransactionEDUs( func (d *Database) CleanEDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, - receipt *Receipt, + receipts []*Receipt, ) error { - if receipt == nil { + if len(receipts) == 0 { return errors.New("expected receipt") } + nids := make([]int64, len(receipts)) + for i := range receipts { + nids[i] = receipts[i].nid + } + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil { + if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil { return err } var deleteNIDs []int64 - for _, nid := range receipt.nids { + for _, nid := range nids { count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid) if err != nil { return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err) diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go index 09235a5e..bc298a90 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationsender/storage/shared/storage_pdus.go @@ -34,16 +34,14 @@ func (d *Database) AssociatePDUWithDestination( receipt *Receipt, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - for _, nid := range receipt.nids { - if err := d.FederationSenderQueuePDUs.InsertQueuePDU( - ctx, // context - txn, // SQL transaction - transactionID, // transaction ID - serverName, // destination server name - nid, // NID from the federationsender_queue_json table - ); err != nil { - return fmt.Errorf("InsertQueuePDU: %w", err) - } + if err := d.FederationSenderQueuePDUs.InsertQueuePDU( + ctx, // context + txn, // SQL transaction + transactionID, // transaction ID + serverName, // destination server name + receipt.nid, // NID from the federationsender_queue_json table + ); err != nil { + return fmt.Errorf("InsertQueuePDU: %w", err) } return nil }) @@ -51,14 +49,12 @@ func (d *Database) AssociatePDUWithDestination( // GetNextTransactionPDUs retrieves events from the database for // the next pending transaction, up to the limit specified. -func (d *Database) GetNextTransactionPDUs( +func (d *Database) GetPendingPDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, limit int, ) ( - transactionID gomatrixserverlib.TransactionID, - events []*gomatrixserverlib.HeaderedEvent, - receipt *Receipt, + events map[*Receipt]*gomatrixserverlib.HeaderedEvent, err error, ) { // Strictly speaking this doesn't need to be using the writer @@ -66,29 +62,17 @@ func (d *Database) GetNextTransactionPDUs( // a guarantee of transactional isolation, it's actually useful // to know in SQLite mode that nothing else is trying to modify // the database. + events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName) - if err != nil { - return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err) - } - - if transactionID == "" { - return nil - } - - nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, transactionID, limit) + nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueuePDUs: %w", err) } - receipt = &Receipt{ - nids: nids, - } - retrieve := make([]int64, 0, len(nids)) for _, nid := range nids { if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { - events = append(events, event) + events[&Receipt{nid}] = event } else { retrieve = append(retrieve, nid) } @@ -104,7 +88,7 @@ func (d *Database) GetNextTransactionPDUs( if err := json.Unmarshal(blob, &event); err != nil { return fmt.Errorf("json.Unmarshal: %w", err) } - events = append(events, &event) + events[&Receipt{nid}] = &event d.Cache.StoreFederationSenderQueuedPDU(nid, &event) } @@ -119,19 +103,24 @@ func (d *Database) GetNextTransactionPDUs( func (d *Database) CleanPDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, - receipt *Receipt, + receipts []*Receipt, ) error { - if receipt == nil { + if len(receipts) == 0 { return errors.New("expected receipt") } + nids := make([]int64, len(receipts)) + for i := range receipts { + nids[i] = receipts[i].nid + } + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil { + if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil { return err } var deleteNIDs []int64 - for _, nid := range receipt.nids { + for _, nid := range nids { count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid) if err != nil { return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err) diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go index 70519c9e..e0fdbda5 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -53,10 +53,10 @@ const selectQueueNextTransactionIDSQL = "" + " ORDER BY transaction_id ASC" + " LIMIT 1" -const selectQueuePDUsByTransactionSQL = "" + +const selectQueuePDUsSQL = "" + "SELECT json_nid FROM federationsender_queue_pdus" + - " WHERE server_name = $1 AND transaction_id = $2" + - " LIMIT $3" + " WHERE server_name = $1" + + " LIMIT $2" const selectQueuePDUsReferenceJSONCountSQL = "" + "SELECT COUNT(*) FROM federationsender_queue_pdus" + @@ -73,7 +73,7 @@ type queuePDUsStatements struct { db *sql.DB insertQueuePDUStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt - selectQueuePDUsByTransactionStmt *sql.Stmt + selectQueuePDUsStmt *sql.Stmt selectQueueReferenceJSONCountStmt *sql.Stmt selectQueuePDUsCountStmt *sql.Stmt selectQueueServerNamesStmt *sql.Stmt @@ -97,7 +97,7 @@ func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) { if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil { return } - if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil { + if s.selectQueuePDUsStmt, err = db.Prepare(selectQueuePDUsSQL); err != nil { return } if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueuePDUsReferenceJSONCountSQL); err != nil { @@ -193,11 +193,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount( func (s *queuePDUsStatements) SelectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, - transactionID gomatrixserverlib.TransactionID, limit int, ) ([]int64, error) { - stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) - rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt) + rows, err := stmt.QueryContext(ctx, serverName, limit) if err != nil { return nil, err } diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 1167a212..69e952de 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -25,10 +25,9 @@ import ( type FederationSenderQueuePDUs interface { InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error - SelectQueuePDUNextTransactionID(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (gomatrixserverlib.TransactionID, error) SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) - SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, limit int) ([]int64, error) + SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) } |