aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--federationsender/federationsender.go2
-rw-r--r--federationsender/storage/postgres/storage.go4
-rw-r--r--federationsender/storage/shared/storage.go2
-rw-r--r--federationsender/storage/shared/storage_edus.go12
-rw-r--r--federationsender/storage/shared/storage_pdus.go15
-rw-r--r--federationsender/storage/sqlite3/storage.go4
-rw-r--r--federationsender/storage/storage.go7
-rw-r--r--federationsender/storage/storage_wasm.go5
-rw-r--r--internal/caching/cache_federationevents.go67
-rw-r--r--internal/caching/caches.go1
-rw-r--r--internal/caching/impl_inmemorylru.go10
11 files changed, 118 insertions, 11 deletions
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index fc0ba6d5..a24e0f48 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -46,7 +46,7 @@ func NewInternalAPI(
) api.FederationSenderInternalAPI {
cfg := &base.Cfg.FederationSender
- federationSenderDB, err := storage.NewDatabase(&cfg.Database)
+ federationSenderDB, err := storage.NewDatabase(&cfg.Database, base.Caches)
if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index f314f849..75b54bbc 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -19,6 +19,7 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
)
@@ -32,7 +33,7 @@ type Database struct {
}
// NewDatabase opens a new database
-func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
@@ -65,6 +66,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
}
d.Database = shared.Database{
DB: d.db,
+ Cache: cache,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index d5731f31..af9d0d6a 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -22,12 +22,14 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/tables"
"github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
type Database struct {
DB *sql.DB
+ Cache caching.FederationSenderCache
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go
index 529b46aa..ae1d1511 100644
--- a/federationsender/storage/shared/storage_edus.go
+++ b/federationsender/storage/shared/storage_edus.go
@@ -69,7 +69,16 @@ func (d *Database) GetNextTransactionEDUs(
nids: nids,
}
- blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
+ retrieve := make([]int64, 0, len(nids))
+ for _, nid := range nids {
+ if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
+ edus = append(edus, edu)
+ } else {
+ retrieve = append(retrieve, nid)
+ }
+ }
+
+ blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
@@ -111,6 +120,7 @@ func (d *Database) CleanEDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
+ d.Cache.EvictFederationSenderQueuedEDU(nid)
}
}
diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go
index 9ab0b094..09235a5e 100644
--- a/federationsender/storage/shared/storage_pdus.go
+++ b/federationsender/storage/shared/storage_pdus.go
@@ -85,17 +85,27 @@ func (d *Database) GetNextTransactionPDUs(
nids: nids,
}
- blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
+ retrieve := make([]int64, 0, len(nids))
+ for _, nid := range nids {
+ if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
+ events = append(events, event)
+ } else {
+ retrieve = append(retrieve, nid)
+ }
+ }
+
+ blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
- for _, blob := range blobs {
+ for nid, blob := range blobs {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
events = append(events, &event)
+ d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
return nil
@@ -128,6 +138,7 @@ func (d *Database) CleanPDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
+ d.Cache.EvictFederationSenderQueuedPDU(nid)
}
}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index 4f663f64..e66d7690 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -21,6 +21,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
)
@@ -34,7 +35,7 @@ type Database struct {
}
// NewDatabase opens a new database
-func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
@@ -67,6 +68,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
}
d.Database = shared.Database{
DB: d.db,
+ Cache: cache,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go
index f3613822..5462c352 100644
--- a/federationsender/storage/storage.go
+++ b/federationsender/storage/storage.go
@@ -21,16 +21,17 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/config"
)
// NewDatabase opens a new database
-func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
- return sqlite3.NewDatabase(dbProperties)
+ return sqlite3.NewDatabase(dbProperties, cache)
case dbProperties.ConnectionString.IsPostgres():
- return postgres.NewDatabase(dbProperties)
+ return postgres.NewDatabase(dbProperties, cache)
default:
return nil, fmt.Errorf("unexpected database type")
}
diff --git a/federationsender/storage/storage_wasm.go b/federationsender/storage/storage_wasm.go
index c35b4828..bc52bd9b 100644
--- a/federationsender/storage/storage_wasm.go
+++ b/federationsender/storage/storage_wasm.go
@@ -18,14 +18,15 @@ import (
"fmt"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/config"
)
// NewDatabase opens a new database
-func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
- return sqlite3.NewDatabase(dbProperties)
+ return sqlite3.NewDatabase(dbProperties, cache)
case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation")
default:
diff --git a/internal/caching/cache_federationevents.go b/internal/caching/cache_federationevents.go
new file mode 100644
index 00000000..a48c11fd
--- /dev/null
+++ b/internal/caching/cache_federationevents.go
@@ -0,0 +1,67 @@
+package caching
+
+import (
+ "fmt"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const (
+ FederationEventCacheName = "federation_event"
+ FederationEventCacheMaxEntries = 256
+ FederationEventCacheMutable = true // to allow use of Unset only
+)
+
+// FederationSenderCache contains the subset of functions needed for
+// a federation event cache.
+type FederationSenderCache interface {
+ GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
+ StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
+ EvictFederationSenderQueuedPDU(eventNID int64)
+
+ GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool)
+ StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU)
+ EvictFederationSenderQueuedEDU(eventNID int64)
+}
+
+func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
+ key := fmt.Sprintf("%d", eventNID)
+ val, found := c.FederationEvents.Get(key)
+ if found && val != nil {
+ if event, ok := val.(*gomatrixserverlib.HeaderedEvent); ok {
+ return event, true
+ }
+ }
+ return nil, false
+}
+
+func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
+ key := fmt.Sprintf("%d", eventNID)
+ c.FederationEvents.Set(key, event)
+}
+
+func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) {
+ key := fmt.Sprintf("%d", eventNID)
+ c.FederationEvents.Unset(key)
+}
+
+func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) {
+ key := fmt.Sprintf("%d", eventNID)
+ val, found := c.FederationEvents.Get(key)
+ if found && val != nil {
+ if event, ok := val.(*gomatrixserverlib.EDU); ok {
+ return event, true
+ }
+ }
+ return nil, false
+}
+
+func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) {
+ key := fmt.Sprintf("%d", eventNID)
+ c.FederationEvents.Set(key, event)
+}
+
+func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) {
+ key := fmt.Sprintf("%d", eventNID)
+ c.FederationEvents.Unset(key)
+}
diff --git a/internal/caching/caches.go b/internal/caching/caches.go
index 655cc037..e7b7f550 100644
--- a/internal/caching/caches.go
+++ b/internal/caching/caches.go
@@ -10,6 +10,7 @@ type Caches struct {
RoomServerEventTypeNIDs Cache // RoomServerNIDsCache
RoomServerRoomNIDs Cache // RoomServerNIDsCache
RoomServerRoomIDs Cache // RoomServerNIDsCache
+ FederationEvents Cache // FederationEventsCache
}
// Cache is the interface that an implementation must satisfy.
diff --git a/internal/caching/impl_inmemorylru.go b/internal/caching/impl_inmemorylru.go
index e99c18d7..f05e8f3c 100644
--- a/internal/caching/impl_inmemorylru.go
+++ b/internal/caching/impl_inmemorylru.go
@@ -63,6 +63,15 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
if err != nil {
return nil, err
}
+ federationEvents, err := NewInMemoryLRUCachePartition(
+ FederationEventCacheName,
+ FederationEventCacheMutable,
+ FederationEventCacheMaxEntries,
+ enablePrometheus,
+ )
+ if err != nil {
+ return nil, err
+ }
return &Caches{
RoomVersions: roomVersions,
ServerKeys: serverKeys,
@@ -70,6 +79,7 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
RoomServerEventTypeNIDs: roomServerEventTypeNIDs,
RoomServerRoomNIDs: roomServerRoomNIDs,
RoomServerRoomIDs: roomServerRoomIDs,
+ FederationEvents: federationEvents,
}, nil
}