aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-07-11 14:31:31 +0100
committerGitHub <noreply@github.com>2022-07-11 14:31:31 +0100
commit3ea21273bcc151b36eec412d0ec550642fe9b04f (patch)
tree84074f881f9875c89d417f028caeacd1eaeca3ce /roomserver
parenteb8dc50a970cc1bfd82dc4bace76aba00181df6e (diff)
Ristretto cache (#2563)
* Try Ristretto cache * Tweak * It's beautiful * Update GMSL * More strict keyable interface * Fix that some more * Make less panicky * Don't enforce mutability checks for now * Determine mutability using deep equality * Tweaks * Namespace keys * Make federation caches mutable * Update cost estimation, add metric * Update GMSL * Estimate cost for metrics better * Reduce counters a bit * Try caching events * Some guards * Try again * Try this * Use separate caches for hopefully better hash distribution * Fix bug with admitting events into cache * Try to fix bugs * Check nil * Try that again * Preserve order jeezo this is messy * thanks VS Code for doing exactly the wrong thing * Try this again * Be more specific * aaaaargh * One more time * That might be better * Stronger sorting * Cache expiries, async publishing of EDUs * Put it back * Use a shared cache again * Cost estimation fixes * Update ristretto * Reduce counters a bit * Clean up a bit * Update GMSL * 1GB * Configurable cache sizees * Tweaks * Add `config.DataUnit` for specifying friendly cache sizes * Various tweaks * Update GMSL * Add back some lazy loading caching * Include key in cost * Include key in cost * Tweak max age handling, config key name * Only register prometheus metrics if requested * Review comments @S7evinK * Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere) * Review comments * Update sample configs * Update GHA Workflow * Update Complement images to Go 1.18 * Remove the cache test from the federation API as we no longer guarantee immediate cache admission * Don't check the caches in the renewal test * Possibly fix the upgrade tests * Update to matrix-org/gomatrixserverlib#322 * Update documentation to refer to Go 1.18
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/internal/input/input_events.go4
-rw-r--r--roomserver/internal/input/input_test.go6
-rw-r--r--roomserver/internal/perform/perform_backfill.go5
-rw-r--r--roomserver/state/state.go2
-rw-r--r--roomserver/storage/shared/storage.go39
5 files changed, 35 insertions, 21 deletions
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 743b1efe..866670d7 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -319,11 +319,9 @@ func (r *Inputer) processRoomEvent(
// if storing this event results in it being redacted then do so.
if !isRejected && redactedEventID == event.EventID() {
- r, rerr := eventutil.RedactEvent(redactionEvent, event)
- if rerr != nil {
+ if err = eventutil.RedactEvent(redactionEvent, event); err != nil {
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
}
- event = r
}
// For outliers we can stop after we've stored the event itself as it
diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go
index 7c65f9ea..4708560a 100644
--- a/roomserver/internal/input/input_test.go
+++ b/roomserver/internal/input/input_test.go
@@ -48,10 +48,6 @@ func TestSingleTransactionOnInput(t *testing.T) {
Kind: api.KindOutlier, // don't panic if we generate an output event
Event: event.Headered(gomatrixserverlib.RoomVersionV6),
}
- cache, err := caching.NewInMemoryLRUCache(false)
- if err != nil {
- t.Fatal(err)
- }
db, err := storage.Open(
nil,
&config.DatabaseOptions{
@@ -59,7 +55,7 @@ func TestSingleTransactionOnInput(t *testing.T) {
MaxOpenConnections: 1,
MaxIdleConnections: 1,
},
- cache,
+ caching.NewRistrettoCache(8*1024*1024, time.Hour, false),
)
if err != nil {
t.Logf("PostgreSQL not available (%s), skipping", err)
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go
index 9eddca73..3f98fbc2 100644
--- a/roomserver/internal/perform/perform_backfill.go
+++ b/roomserver/internal/perform/perform_backfill.go
@@ -593,12 +593,11 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
// redacted, which we don't care about since we aren't returning it in this backfill.
if redactedEventID == ev.EventID() {
eventToRedact := ev.Unwrap()
- redactedEvent, err := eventutil.RedactEvent(redactionEvent, eventToRedact)
- if err != nil {
+ if err := eventutil.RedactEvent(redactionEvent, eventToRedact); err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event")
continue
}
- ev = redactedEvent.Headered(ev.RoomVersion)
+ ev = eventToRedact.Headered(ev.RoomVersion)
events[j] = ev
}
backfilledEventMap[ev.EventID()] = types.Event{
diff --git a/roomserver/state/state.go b/roomserver/state/state.go
index 91f27165..d1d24b09 100644
--- a/roomserver/state/state.go
+++ b/roomserver/state/state.go
@@ -1027,7 +1027,7 @@ func (v *StateResolution) loadStateEvents(
result := make([]*gomatrixserverlib.Event, 0, len(entries))
eventEntries := make([]types.StateEntry, 0, len(entries))
- eventNIDs := make([]types.EventNID, 0, len(entries))
+ eventNIDs := make(types.EventNIDs, 0, len(entries))
for _, entry := range entries {
if e, ok := v.events[entry.EventNID]; ok {
result = append(result, e)
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index ba937ba3..692af1f6 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -439,8 +439,18 @@ func (d *Database) Events(
}
func (d *Database) events(
- ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID,
+ ctx context.Context, txn *sql.Tx, inputEventNIDs types.EventNIDs,
) ([]types.Event, error) {
+ sort.Sort(inputEventNIDs)
+ events := make(map[types.EventNID]*gomatrixserverlib.Event, len(inputEventNIDs))
+ eventNIDs := make([]types.EventNID, 0, len(inputEventNIDs))
+ for _, nid := range inputEventNIDs {
+ if event, ok := d.Cache.GetRoomServerEvent(nid); ok && event != nil {
+ events[nid] = event
+ } else {
+ eventNIDs = append(eventNIDs, nid)
+ }
+ }
eventJSONs, err := d.EventJSONTable.BulkSelectEventJSON(ctx, txn, eventNIDs)
if err != nil {
return nil, err
@@ -476,18 +486,29 @@ func (d *Database) events(
for n, v := range dbRoomVersions {
roomVersions[n] = v
}
- results := make([]types.Event, len(eventJSONs))
- for i, eventJSON := range eventJSONs {
- result := &results[i]
- result.EventNID = eventJSON.EventNID
- roomNID := roomNIDs[result.EventNID]
+ for _, eventJSON := range eventJSONs {
+ roomNID := roomNIDs[eventJSON.EventNID]
roomVersion := roomVersions[roomNID]
- result.Event, err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
+ events[eventJSON.EventNID], err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
)
if err != nil {
return nil, err
}
+ if event := events[eventJSON.EventNID]; event != nil {
+ d.Cache.StoreRoomServerEvent(eventJSON.EventNID, event)
+ }
+ }
+ results := make([]types.Event, 0, len(inputEventNIDs))
+ for _, nid := range inputEventNIDs {
+ event, ok := events[nid]
+ if !ok || event == nil {
+ return nil, fmt.Errorf("event %d missing", nid)
+ }
+ results = append(results, types.Event{
+ EventNID: nid,
+ Event: event,
+ })
}
if !redactionsArePermanent {
d.applyRedactions(results)
@@ -854,7 +875,7 @@ func (d *Database) handleRedactions(
// mark the event as redacted
if redactionsArePermanent {
- redactedEvent.Event = redactedEvent.Redact()
+ redactedEvent.Redact()
}
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
@@ -926,7 +947,7 @@ func (d *Database) loadRedactionPair(
func (d *Database) applyRedactions(events []types.Event) {
for i := range events {
if result := gjson.GetBytes(events[i].Unsigned(), "redacted_because"); result.Exists() {
- events[i].Event = events[i].Redact()
+ events[i].Redact()
}
}
}