diff options
Diffstat (limited to 'federationsender/storage/shared')
-rw-r--r-- | federationsender/storage/shared/storage.go | 2 | ||||
-rw-r--r-- | federationsender/storage/shared/storage_edus.go | 12 | ||||
-rw-r--r-- | federationsender/storage/shared/storage_pdus.go | 15 |
3 files changed, 26 insertions, 3 deletions
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index d5731f31..af9d0d6a 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -22,12 +22,14 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage/tables" "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) type Database struct { DB *sql.DB + Cache caching.FederationSenderCache Writer sqlutil.Writer FederationSenderQueuePDUs tables.FederationSenderQueuePDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go index 529b46aa..ae1d1511 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationsender/storage/shared/storage_edus.go @@ -69,7 +69,16 @@ func (d *Database) GetNextTransactionEDUs( nids: nids, } - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids) + retrieve := make([]int64, 0, len(nids)) + for _, nid := range nids { + if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { + edus = append(edus, edu) + } else { + retrieve = append(retrieve, nid) + } + } + + blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) if err != nil { return fmt.Errorf("SelectQueueJSON: %w", err) } @@ -111,6 +120,7 @@ func (d *Database) CleanEDUs( } if count == 0 { deleteNIDs = append(deleteNIDs, nid) + d.Cache.EvictFederationSenderQueuedEDU(nid) } } diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go index 9ab0b094..09235a5e 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationsender/storage/shared/storage_pdus.go @@ -85,17 +85,27 @@ func (d *Database) GetNextTransactionPDUs( nids: nids, } - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids) + retrieve := make([]int64, 0, len(nids)) + for _, nid := range nids { + if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { + events = append(events, event) + } else { + retrieve = append(retrieve, nid) + } + } + + blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) if err != nil { return fmt.Errorf("SelectQueueJSON: %w", err) } - for _, blob := range blobs { + for nid, blob := range blobs { var event gomatrixserverlib.HeaderedEvent if err := json.Unmarshal(blob, &event); err != nil { return fmt.Errorf("json.Unmarshal: %w", err) } events = append(events, &event) + d.Cache.StoreFederationSenderQueuedPDU(nid, &event) } return nil @@ -128,6 +138,7 @@ func (d *Database) CleanPDUs( } if count == 0 { deleteNIDs = append(deleteNIDs, nid) + d.Cache.EvictFederationSenderQueuedPDU(nid) } } |