aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage/shared
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/storage/shared')
-rw-r--r--federationsender/storage/shared/storage.go2
-rw-r--r--federationsender/storage/shared/storage_edus.go12
-rw-r--r--federationsender/storage/shared/storage_pdus.go15
3 files changed, 26 insertions, 3 deletions
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index d5731f31..af9d0d6a 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -22,12 +22,14 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/tables"
"github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
type Database struct {
DB *sql.DB
+ Cache caching.FederationSenderCache
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go
index 529b46aa..ae1d1511 100644
--- a/federationsender/storage/shared/storage_edus.go
+++ b/federationsender/storage/shared/storage_edus.go
@@ -69,7 +69,16 @@ func (d *Database) GetNextTransactionEDUs(
nids: nids,
}
- blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
+ retrieve := make([]int64, 0, len(nids))
+ for _, nid := range nids {
+ if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
+ edus = append(edus, edu)
+ } else {
+ retrieve = append(retrieve, nid)
+ }
+ }
+
+ blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
@@ -111,6 +120,7 @@ func (d *Database) CleanEDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
+ d.Cache.EvictFederationSenderQueuedEDU(nid)
}
}
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 9ab0b094..09235a5e 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -85,17 +85,27 @@ func (d *Database) GetNextTransactionPDUs(
nids: nids,
}
- blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
+ retrieve := make([]int64, 0, len(nids))
+ for _, nid := range nids {
+ if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
+ events = append(events, event)
+ } else {
+ retrieve = append(retrieve, nid)
+ }
+ }
+
+ blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
- for _, blob := range blobs {
+ for nid, blob := range blobs {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
events = append(events, &event)
+ d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
return nil
@@ -128,6 +138,7 @@ func (d *Database) CleanPDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
+ d.Cache.EvictFederationSenderQueuedPDU(nid)
}
}