aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage/shared/storage_pdus.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/storage/shared/storage_pdus.go')
-rw-r--r--federationsender/storage/shared/storage_pdus.go57
1 files changed, 23 insertions, 34 deletions
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 09235a5e..bc298a90 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -34,16 +34,14 @@ func (d *Database) AssociatePDUWithDestination(
receipt *Receipt,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- for _, nid := range receipt.nids {
- if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
- ctx, // context
- txn, // SQL transaction
- transactionID, // transaction ID
- serverName, // destination server name
- nid, // NID from the federationsender_queue_json table
- ); err != nil {
- return fmt.Errorf("InsertQueuePDU: %w", err)
- }
+ if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
+ ctx, // context
+ txn, // SQL transaction
+ transactionID, // transaction ID
+ serverName, // destination server name
+ receipt.nid, // NID from the federationsender_queue_json table
+ ); err != nil {
+ return fmt.Errorf("InsertQueuePDU: %w", err)
}
return nil
})
@@ -51,14 +49,12 @@ func (d *Database) AssociatePDUWithDestination(
// GetNextTransactionPDUs retrieves events from the database for
// the next pending transaction, up to the limit specified.
-func (d *Database) GetNextTransactionPDUs(
+func (d *Database) GetPendingPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
limit int,
) (
- transactionID gomatrixserverlib.TransactionID,
- events []*gomatrixserverlib.HeaderedEvent,
- receipt *Receipt,
+ events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
err error,
) {
// Strictly speaking this doesn't need to be using the writer
@@ -66,29 +62,17 @@ func (d *Database) GetNextTransactionPDUs(
// a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify
// the database.
+ events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName)
- if err != nil {
- return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err)
- }
-
- if transactionID == "" {
- return nil
- }
-
- nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, transactionID, limit)
+ nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueuePDUs: %w", err)
}
- receipt = &Receipt{
- nids: nids,
- }
-
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
- events = append(events, event)
+ events[&Receipt{nid}] = event
} else {
retrieve = append(retrieve, nid)
}
@@ -104,7 +88,7 @@ func (d *Database) GetNextTransactionPDUs(
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
- events = append(events, &event)
+ events[&Receipt{nid}] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
@@ -119,19 +103,24 @@ func (d *Database) GetNextTransactionPDUs(
func (d *Database) CleanPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
- receipt *Receipt,
+ receipts []*Receipt,
) error {
- if receipt == nil {
+ if len(receipts) == 0 {
return errors.New("expected receipt")
}
+ nids := make([]int64, len(receipts))
+ for i := range receipts {
+ nids[i] = receipts[i].nid
+ }
+
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
+ if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
return err
}
var deleteNIDs []int64
- for _, nid := range receipt.nids {
+ for _, nid := range nids {
count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
if err != nil {
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)