aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage/shared/storage_pdus.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/storage/shared/storage_pdus.go')
-rw-r--r--federationsender/storage/shared/storage_pdus.go15
1 files changed, 13 insertions, 2 deletions
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 9ab0b094..09235a5e 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -85,17 +85,27 @@ func (d *Database) GetNextTransactionPDUs(
nids: nids,
}
- blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
+ retrieve := make([]int64, 0, len(nids))
+ for _, nid := range nids {
+ if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
+ events = append(events, event)
+ } else {
+ retrieve = append(retrieve, nid)
+ }
+ }
+
+ blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
- for _, blob := range blobs {
+ for nid, blob := range blobs {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
events = append(events, &event)
+ d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
return nil
@@ -128,6 +138,7 @@ func (d *Database) CleanPDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
+ d.Cache.EvictFederationSenderQueuedPDU(nid)
}
}