aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage/shared/storage_pdus.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-04 14:52:10 +0000
committerGitHub <noreply@github.com>2020-12-04 14:52:10 +0000
commit5d65a879a55e216e047574f54f69e55724546ec4 (patch)
tree45200f92c121bae8db464ff31473f4f6a8e2e2bd /federationsender/storage/shared/storage_pdus.go
parentb507312d4cf9d35b5d4eaaa01a7f74d095b825f8 (diff)
Federation sender event cache (#1614)
* Cache federation sender events * Store in the correct cache * Update federation event cache * Fix Unset * Give EDUs same caching treatment as PDUs * Make federationsender_cache_size configurable * Default caches configuration * Fix unit tests * Revert "Fix unit tests" This reverts commit 24eb5d22524f20e1024b1475debe61ae20538a5a. * Revert "Default caches configuration" This reverts commit 464ecd1e64b9d2983f6fd5430e9607519d543cb3. * Revert "Make federationsender_cache_size configurable" This reverts commit 4631f5324151e006a15d6f19008f06361b994607.
Diffstat (limited to 'federationsender/storage/shared/storage_pdus.go')
-rw-r--r--federationsender/storage/shared/storage_pdus.go15
1 files changed, 13 insertions, 2 deletions
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 9ab0b094..09235a5e 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -85,17 +85,27 @@ func (d *Database) GetNextTransactionPDUs(
nids: nids,
}
- blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
+ retrieve := make([]int64, 0, len(nids))
+ for _, nid := range nids {
+ if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
+ events = append(events, event)
+ } else {
+ retrieve = append(retrieve, nid)
+ }
+ }
+
+ blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
- for _, blob := range blobs {
+ for nid, blob := range blobs {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
events = append(events, &event)
+ d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
return nil
@@ -128,6 +138,7 @@ func (d *Database) CleanPDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
+ d.Cache.EvictFederationSenderQueuedPDU(nid)
}
}