diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-07-11 14:31:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-11 14:31:31 +0100 |
commit | 3ea21273bcc151b36eec412d0ec550642fe9b04f (patch) | |
tree | 84074f881f9875c89d417f028caeacd1eaeca3ce /internal | |
parent | eb8dc50a970cc1bfd82dc4bace76aba00181df6e (diff) |
Ristretto cache (#2563)
* Try Ristretto cache
* Tweak
* It's beautiful
* Update GMSL
* More strict keyable interface
* Fix that some more
* Make less panicky
* Don't enforce mutability checks for now
* Determine mutability using deep equality
* Tweaks
* Namespace keys
* Make federation caches mutable
* Update cost estimation, add metric
* Update GMSL
* Estimate cost for metrics better
* Reduce counters a bit
* Try caching events
* Some guards
* Try again
* Try this
* Use separate caches for hopefully better hash distribution
* Fix bug with admitting events into cache
* Try to fix bugs
* Check nil
* Try that again
* Preserve order jeezo this is messy
* thanks VS Code for doing exactly the wrong thing
* Try this again
* Be more specific
* aaaaargh
* One more time
* That might be better
* Stronger sorting
* Cache expiries, async publishing of EDUs
* Put it back
* Use a shared cache again
* Cost estimation fixes
* Update ristretto
* Reduce counters a bit
* Clean up a bit
* Update GMSL
* 1GB
* Configurable cache sizees
* Tweaks
* Add `config.DataUnit` for specifying friendly cache sizes
* Various tweaks
* Update GMSL
* Add back some lazy loading caching
* Include key in cost
* Include key in cost
* Tweak max age handling, config key name
* Only register prometheus metrics if requested
* Review comments @S7evinK
* Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere)
* Review comments
* Update sample configs
* Update GHA Workflow
* Update Complement images to Go 1.18
* Remove the cache test from the federation API as we no longer guarantee immediate cache admission
* Don't check the caches in the renewal test
* Possibly fix the upgrade tests
* Update to matrix-org/gomatrixserverlib#322
* Update documentation to refer to Go 1.18
Diffstat (limited to 'internal')
-rw-r--r-- | internal/caching/cache_federationevents.go | 39 | ||||
-rw-r--r-- | internal/caching/cache_lazy_load_members.go | 68 | ||||
-rw-r--r-- | internal/caching/cache_roomevents.go | 21 | ||||
-rw-r--r-- | internal/caching/cache_roominfo.go | 17 | ||||
-rw-r--r-- | internal/caching/cache_roomservernids.go | 20 | ||||
-rw-r--r-- | internal/caching/cache_roomversions.go | 15 | ||||
-rw-r--r-- | internal/caching/cache_serverkeys.go | 24 | ||||
-rw-r--r-- | internal/caching/cache_space_rooms.go | 17 | ||||
-rw-r--r-- | internal/caching/caches.go | 52 | ||||
-rw-r--r-- | internal/caching/impl_inmemorylru.go | 189 | ||||
-rw-r--r-- | internal/caching/impl_ristretto.go | 200 | ||||
-rw-r--r-- | internal/eventutil/events.go | 18 |
12 files changed, 303 insertions, 377 deletions
diff --git a/internal/caching/cache_federationevents.go b/internal/caching/cache_federationevents.go index b79cc809..24af51bd 100644 --- a/internal/caching/cache_federationevents.go +++ b/internal/caching/cache_federationevents.go @@ -1,18 +1,9 @@ package caching import ( - "fmt" - "github.com/matrix-org/gomatrixserverlib" ) -const ( - FederationEventCacheName = "federation_event" - FederationEventCacheMaxEntries = 256 - FederationEventCacheMutable = true // to allow use of Unset only - FederationEventCacheMaxAge = CacheNoMaxAge -) - // FederationCache contains the subset of functions needed for // a federation event cache. type FederationCache interface { @@ -26,43 +17,25 @@ type FederationCache interface { } func (c Caches) GetFederationQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) { - key := fmt.Sprintf("%d", eventNID) - val, found := c.FederationEvents.Get(key) - if found && val != nil { - if event, ok := val.(*gomatrixserverlib.HeaderedEvent); ok { - return event, true - } - } - return nil, false + return c.FederationPDUs.Get(eventNID) } func (c Caches) StoreFederationQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Set(key, event) + c.FederationPDUs.Set(eventNID, event) } func (c Caches) EvictFederationQueuedPDU(eventNID int64) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Unset(key) + c.FederationPDUs.Unset(eventNID) } func (c Caches) GetFederationQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) { - key := fmt.Sprintf("%d", eventNID) - val, found := c.FederationEvents.Get(key) - if found && val != nil { - if event, ok := val.(*gomatrixserverlib.EDU); ok { - return event, true - } - } - return nil, false + return c.FederationEDUs.Get(eventNID) } func (c Caches) StoreFederationQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Set(key, event) + c.FederationEDUs.Set(eventNID, event) } func (c Caches) EvictFederationQueuedEDU(eventNID int64) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Unset(key) + c.FederationEDUs.Unset(eventNID) } diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go index f0d49506..0d7009c9 100644 --- a/internal/caching/cache_lazy_load_members.go +++ b/internal/caching/cache_lazy_load_members.go @@ -1,67 +1,35 @@ package caching import ( - "fmt" - "time" - userapi "github.com/matrix-org/dendrite/userapi/api" ) -const ( - LazyLoadCacheName = "lazy_load_members" - LazyLoadCacheMaxEntries = 128 - LazyLoadCacheMaxUserEntries = 128 - LazyLoadCacheMutable = true - LazyLoadCacheMaxAge = time.Minute * 30 -) +type lazyLoadingCacheKey struct { + UserID string // the user we're querying on behalf of + DeviceID string // the user we're querying on behalf of + RoomID string // the room in question + TargetUserID string // the user whose membership we're asking about +} type LazyLoadCache interface { StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) } -func (c Caches) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) { - cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID) - userCache, ok := c.LazyLoading.Get(cacheName) - if ok && userCache != nil { - if cache, ok := userCache.(*InMemoryLRUCachePartition); ok { - return cache, nil - } - } - cache, err := NewInMemoryLRUCachePartition( - LazyLoadCacheName, - LazyLoadCacheMutable, - LazyLoadCacheMaxUserEntries, - LazyLoadCacheMaxAge, - false, - ) - if err != nil { - return nil, err - } - c.LazyLoading.Set(cacheName, cache) - go cacheCleaner(cache) - return cache, nil -} - func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { - cache, err := c.lazyLoadCacheForUser(device) - if err != nil { - return - } - cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID) - cache.Set(cacheKey, eventID) + c.LazyLoading.Set(lazyLoadingCacheKey{ + UserID: device.UserID, + DeviceID: device.ID, + RoomID: roomID, + TargetUserID: userID, + }, eventID) } func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) { - cache, err := c.lazyLoadCacheForUser(device) - if err != nil { - return "", false - } - - cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID) - val, ok := cache.Get(cacheKey) - if !ok { - return "", ok - } - return val.(string), ok + return c.LazyLoading.Get(lazyLoadingCacheKey{ + UserID: device.UserID, + DeviceID: device.ID, + RoomID: roomID, + TargetUserID: userID, + }) } diff --git a/internal/caching/cache_roomevents.go b/internal/caching/cache_roomevents.go new file mode 100644 index 00000000..9d5d3b91 --- /dev/null +++ b/internal/caching/cache_roomevents.go @@ -0,0 +1,21 @@ +package caching + +import ( + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// RoomServerEventsCache contains the subset of functions needed for +// a roomserver event cache. +type RoomServerEventsCache interface { + GetRoomServerEvent(eventNID types.EventNID) (*gomatrixserverlib.Event, bool) + StoreRoomServerEvent(eventNID types.EventNID, event *gomatrixserverlib.Event) +} + +func (c Caches) GetRoomServerEvent(eventNID types.EventNID) (*gomatrixserverlib.Event, bool) { + return c.RoomServerEvents.Get(int64(eventNID)) +} + +func (c Caches) StoreRoomServerEvent(eventNID types.EventNID, event *gomatrixserverlib.Event) { + c.RoomServerEvents.Set(int64(eventNID), event) +} diff --git a/internal/caching/cache_roominfo.go b/internal/caching/cache_roominfo.go index 60d22128..d03a6107 100644 --- a/internal/caching/cache_roominfo.go +++ b/internal/caching/cache_roominfo.go @@ -1,8 +1,6 @@ package caching import ( - "time" - "github.com/matrix-org/dendrite/roomserver/types" ) @@ -14,13 +12,6 @@ import ( // used from other components as we currently have no way to invalidate // the cache in downstream components. -const ( - RoomInfoCacheName = "roominfo" - RoomInfoCacheMaxEntries = 1024 - RoomInfoCacheMutable = true - RoomInfoCacheMaxAge = time.Minute * 5 -) - // RoomInfosCache contains the subset of functions needed for // a room Info cache. It must only be used from the roomserver only // It is not safe for use from other components. @@ -32,13 +23,7 @@ type RoomInfoCache interface { // GetRoomInfo must only be called from the roomserver only. It is not // safe for use from other components. func (c Caches) GetRoomInfo(roomID string) (types.RoomInfo, bool) { - val, found := c.RoomInfos.Get(roomID) - if found && val != nil { - if roomInfo, ok := val.(types.RoomInfo); ok { - return roomInfo, true - } - } - return types.RoomInfo{}, false + return c.RoomInfos.Get(roomID) } // StoreRoomInfo must only be called from the roomserver only. It is not diff --git a/internal/caching/cache_roomservernids.go b/internal/caching/cache_roomservernids.go index 1918a2f1..b409aeef 100644 --- a/internal/caching/cache_roomservernids.go +++ b/internal/caching/cache_roomservernids.go @@ -1,22 +1,14 @@ package caching import ( - "strconv" - "github.com/matrix-org/dendrite/roomserver/types" ) -const ( - RoomServerRoomIDsCacheName = "roomserver_room_ids" - RoomServerRoomIDsCacheMaxEntries = 1024 - RoomServerRoomIDsCacheMutable = false - RoomServerRoomIDsCacheMaxAge = CacheNoMaxAge -) - type RoomServerCaches interface { RoomServerNIDsCache RoomVersionCache RoomInfoCache + RoomServerEventsCache } // RoomServerNIDsCache contains the subset of functions needed for @@ -27,15 +19,9 @@ type RoomServerNIDsCache interface { } func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) { - val, found := c.RoomServerRoomIDs.Get(strconv.Itoa(int(roomNID))) - if found && val != nil { - if roomID, ok := val.(string); ok { - return roomID, true - } - } - return "", false + return c.RoomServerRoomIDs.Get(int64(roomNID)) } func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) { - c.RoomServerRoomIDs.Set(strconv.Itoa(int(roomNID)), roomID) + c.RoomServerRoomIDs.Set(int64(roomNID), roomID) } diff --git a/internal/caching/cache_roomversions.go b/internal/caching/cache_roomversions.go index 92d2eab0..afc3d36d 100644 --- a/internal/caching/cache_roomversions.go +++ b/internal/caching/cache_roomversions.go @@ -2,13 +2,6 @@ package caching import "github.com/matrix-org/gomatrixserverlib" -const ( - RoomVersionCacheName = "room_versions" - RoomVersionCacheMaxEntries = 1024 - RoomVersionCacheMutable = false - RoomVersionCacheMaxAge = CacheNoMaxAge -) - // RoomVersionsCache contains the subset of functions needed for // a room version cache. type RoomVersionCache interface { @@ -17,13 +10,7 @@ type RoomVersionCache interface { } func (c Caches) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) { - val, found := c.RoomVersions.Get(roomID) - if found && val != nil { - if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok { - return roomVersion, true - } - } - return "", false + return c.RoomVersions.Get(roomID) } func (c Caches) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) { diff --git a/internal/caching/cache_serverkeys.go b/internal/caching/cache_serverkeys.go index 4eb10fe6..cffa101d 100644 --- a/internal/caching/cache_serverkeys.go +++ b/internal/caching/cache_serverkeys.go @@ -6,13 +6,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -const ( - ServerKeyCacheName = "server_key" - ServerKeyCacheMaxEntries = 4096 - ServerKeyCacheMutable = true - ServerKeyCacheMaxAge = CacheNoMaxAge -) - // ServerKeyCache contains the subset of functions needed for // a server key cache. type ServerKeyCache interface { @@ -34,18 +27,13 @@ func (c Caches) GetServerKey( ) (gomatrixserverlib.PublicKeyLookupResult, bool) { key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) val, found := c.ServerKeys.Get(key) - if found && val != nil { - if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok { - if !keyLookupResult.WasValidAt(timestamp, true) { - // The key wasn't valid at the requested timestamp so don't - // return it. The caller will have to work out what to do. - c.ServerKeys.Unset(key) - return gomatrixserverlib.PublicKeyLookupResult{}, false - } - return keyLookupResult, true - } + if found && !val.WasValidAt(timestamp, true) { + // The key wasn't valid at the requested timestamp so don't + // return it. The caller will have to work out what to do. + c.ServerKeys.Unset(key) + return gomatrixserverlib.PublicKeyLookupResult{}, false } - return gomatrixserverlib.PublicKeyLookupResult{}, false + return val, found } func (c Caches) StoreServerKey( diff --git a/internal/caching/cache_space_rooms.go b/internal/caching/cache_space_rooms.go index 6d56cce5..697f9926 100644 --- a/internal/caching/cache_space_rooms.go +++ b/internal/caching/cache_space_rooms.go @@ -1,31 +1,16 @@ package caching import ( - "time" - "github.com/matrix-org/gomatrixserverlib" ) -const ( - SpaceSummaryRoomsCacheName = "space_summary_rooms" - SpaceSummaryRoomsCacheMaxEntries = 100 - SpaceSummaryRoomsCacheMutable = true - SpaceSummaryRoomsCacheMaxAge = time.Minute * 5 -) - type SpaceSummaryRoomsCache interface { GetSpaceSummary(roomID string) (r gomatrixserverlib.MSC2946SpacesResponse, ok bool) StoreSpaceSummary(roomID string, r gomatrixserverlib.MSC2946SpacesResponse) } func (c Caches) GetSpaceSummary(roomID string) (r gomatrixserverlib.MSC2946SpacesResponse, ok bool) { - val, found := c.SpaceSummaryRooms.Get(roomID) - if found && val != nil { - if resp, ok := val.(gomatrixserverlib.MSC2946SpacesResponse); ok { - return resp, true - } - } - return r, false + return c.SpaceSummaryRooms.Get(roomID) } func (c Caches) StoreSpaceSummary(roomID string, r gomatrixserverlib.MSC2946SpacesResponse) { diff --git a/internal/caching/caches.go b/internal/caching/caches.go index 173e47e5..14b232dd 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -1,28 +1,52 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package caching import ( - "time" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" ) // Caches contains a set of references to caches. They may be // different implementations as long as they satisfy the Cache // interface. type Caches struct { - RoomVersions Cache // RoomVersionCache - ServerKeys Cache // ServerKeyCache - RoomServerRoomNIDs Cache // RoomServerNIDsCache - RoomServerRoomIDs Cache // RoomServerNIDsCache - RoomInfos Cache // RoomInfoCache - FederationEvents Cache // FederationEventsCache - SpaceSummaryRooms Cache // SpaceSummaryRoomsCache - LazyLoading Cache // LazyLoadCache + RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version + ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys + RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID + RoomServerRoomIDs Cache[int64, string] // room NID -> room ID + RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event + RoomInfos Cache[string, types.RoomInfo] // room ID -> room info + FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU + FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU + SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response + LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID } // Cache is the interface that an implementation must satisfy. -type Cache interface { - Get(key string) (value interface{}, ok bool) - Set(key string, value interface{}) - Unset(key string) +type Cache[K keyable, T any] interface { + Get(key K) (value T, ok bool) + Set(key K, value T) + Unset(key K) +} + +type keyable interface { + // from https://github.com/dgraph-io/ristretto/blob/8e850b710d6df0383c375ec6a7beae4ce48fc8d5/z/z.go#L34 + uint64 | string | []byte | byte | int | int32 | uint32 | int64 | lazyLoadingCacheKey } -const CacheNoMaxAge = time.Duration(0) +type costable interface { + CacheCost() int +} diff --git a/internal/caching/impl_inmemorylru.go b/internal/caching/impl_inmemorylru.go deleted file mode 100644 index 59476089..00000000 --- a/internal/caching/impl_inmemorylru.go +++ /dev/null @@ -1,189 +0,0 @@ -package caching - -import ( - "fmt" - "time" - - lru "github.com/hashicorp/golang-lru" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) { - roomVersions, err := NewInMemoryLRUCachePartition( - RoomVersionCacheName, - RoomVersionCacheMutable, - RoomVersionCacheMaxEntries, - RoomVersionCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - serverKeys, err := NewInMemoryLRUCachePartition( - ServerKeyCacheName, - ServerKeyCacheMutable, - ServerKeyCacheMaxEntries, - ServerKeyCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - roomServerRoomIDs, err := NewInMemoryLRUCachePartition( - RoomServerRoomIDsCacheName, - RoomServerRoomIDsCacheMutable, - RoomServerRoomIDsCacheMaxEntries, - RoomServerRoomIDsCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - roomInfos, err := NewInMemoryLRUCachePartition( - RoomInfoCacheName, - RoomInfoCacheMutable, - RoomInfoCacheMaxEntries, - RoomInfoCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - federationEvents, err := NewInMemoryLRUCachePartition( - FederationEventCacheName, - FederationEventCacheMutable, - FederationEventCacheMaxEntries, - FederationEventCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - spaceRooms, err := NewInMemoryLRUCachePartition( - SpaceSummaryRoomsCacheName, - SpaceSummaryRoomsCacheMutable, - SpaceSummaryRoomsCacheMaxEntries, - SpaceSummaryRoomsCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - - lazyLoadCache, err := NewInMemoryLRUCachePartition( - LazyLoadCacheName, - LazyLoadCacheMutable, - LazyLoadCacheMaxEntries, - LazyLoadCacheMaxAge, - enablePrometheus, - ) - if err != nil { - return nil, err - } - - go cacheCleaner( - roomVersions, serverKeys, roomServerRoomIDs, - roomInfos, federationEvents, spaceRooms, lazyLoadCache, - ) - return &Caches{ - RoomVersions: roomVersions, - ServerKeys: serverKeys, - RoomServerRoomIDs: roomServerRoomIDs, - RoomInfos: roomInfos, - FederationEvents: federationEvents, - SpaceSummaryRooms: spaceRooms, - LazyLoading: lazyLoadCache, - }, nil -} - -func cacheCleaner(caches ...*InMemoryLRUCachePartition) { - for { - time.Sleep(time.Minute) - for _, cache := range caches { - // Hold onto the last 10% of the cache entries, since - // otherwise a quiet period might cause us to evict all - // cache entries entirely. - if cache.lru.Len() > cache.maxEntries/10 { - cache.lru.RemoveOldest() - } - } - } -} - -type InMemoryLRUCachePartition struct { - name string - mutable bool - maxEntries int - maxAge time.Duration - lru *lru.Cache -} - -type inMemoryLRUCacheEntry struct { - value interface{} - created time.Time -} - -func NewInMemoryLRUCachePartition(name string, mutable bool, maxEntries int, maxAge time.Duration, enablePrometheus bool) (*InMemoryLRUCachePartition, error) { - var err error - cache := InMemoryLRUCachePartition{ - name: name, - mutable: mutable, - maxEntries: maxEntries, - maxAge: maxAge, - } - cache.lru, err = lru.New(maxEntries) - if err != nil { - return nil, err - } - if enablePrometheus { - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "dendrite", - Subsystem: "caching_in_memory_lru", - Name: name, - }, func() float64 { - return float64(cache.lru.Len()) - }) - } - return &cache, nil -} - -func (c *InMemoryLRUCachePartition) Set(key string, value interface{}) { - if !c.mutable { - if peek, ok := c.lru.Peek(key); ok { - if entry, ok := peek.(*inMemoryLRUCacheEntry); ok && entry.value != value { - panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key)) - } - } - } - c.lru.Add(key, &inMemoryLRUCacheEntry{ - value: value, - created: time.Now(), - }) -} - -func (c *InMemoryLRUCachePartition) Unset(key string) { - if !c.mutable { - panic(fmt.Sprintf("invalid use of immutable cache tries to unset value of %q", key)) - } - c.lru.Remove(key) -} - -func (c *InMemoryLRUCachePartition) Get(key string) (value interface{}, ok bool) { - v, ok := c.lru.Get(key) - if !ok { - return nil, false - } - entry, ok := v.(*inMemoryLRUCacheEntry) - switch { - case ok && c.maxAge == CacheNoMaxAge: - return entry.value, ok // There's no maximum age policy - case ok && time.Since(entry.created) < c.maxAge: - return entry.value, ok // The value for the key isn't stale - default: - // Either the key was found and it was stale, or the key - // wasn't found at all - c.lru.Remove(key) - return nil, false - } -} diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go new file mode 100644 index 00000000..6d625b55 --- /dev/null +++ b/internal/caching/impl_ristretto.go @@ -0,0 +1,200 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package caching + +import ( + "fmt" + "reflect" + "time" + "unsafe" + + "github.com/dgraph-io/ristretto" + "github.com/dgraph-io/ristretto/z" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + roomVersionsCache byte = iota + 1 + serverKeysCache + roomNIDsCache + roomIDsCache + roomEventsCache + roomInfosCache + federationPDUsCache + federationEDUsCache + spaceSummaryRoomsCache + lazyLoadingCache +) + +func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e5, // 10x number of expected cache items, affects bloom filter size, gives us room for 10,000 currently + BufferItems: 64, // recommended by the ristretto godocs as a sane buffer size value + MaxCost: int64(maxCost), + Metrics: true, + KeyToHash: func(key interface{}) (uint64, uint64) { + return z.KeyToHash(key) + }, + }) + if err != nil { + panic(err) + } + if enablePrometheus { + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "caching_ristretto", + Name: "ratio", + }, func() float64 { + return float64(cache.Metrics.Ratio()) + }) + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "caching_ristretto", + Name: "cost", + }, func() float64 { + return float64(cache.Metrics.CostAdded() - cache.Metrics.CostEvicted()) + }) + } + return &Caches{ + RoomVersions: &RistrettoCachePartition[string, gomatrixserverlib.RoomVersion]{ // room ID -> room version + cache: cache, + Prefix: roomVersionsCache, + MaxAge: maxAge, + }, + ServerKeys: &RistrettoCachePartition[string, gomatrixserverlib.PublicKeyLookupResult]{ // server name -> server keys + cache: cache, + Prefix: serverKeysCache, + Mutable: true, + MaxAge: maxAge, + }, + RoomServerRoomNIDs: &RistrettoCachePartition[string, types.RoomNID]{ // room ID -> room NID + cache: cache, + Prefix: roomNIDsCache, + MaxAge: maxAge, + }, + RoomServerRoomIDs: &RistrettoCachePartition[int64, string]{ // room NID -> room ID + cache: cache, + Prefix: roomIDsCache, + MaxAge: maxAge, + }, + RoomServerEvents: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.Event]{ // event NID -> event + &RistrettoCachePartition[int64, *gomatrixserverlib.Event]{ + cache: cache, + Prefix: roomEventsCache, + MaxAge: maxAge, + }, + }, + RoomInfos: &RistrettoCachePartition[string, types.RoomInfo]{ // room ID -> room info + cache: cache, + Prefix: roomInfosCache, + Mutable: true, + MaxAge: maxAge, + }, + FederationPDUs: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ // queue NID -> PDU + &RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ + cache: cache, + Prefix: federationPDUsCache, + Mutable: true, + MaxAge: lesserOf(time.Hour/2, maxAge), + }, + }, + FederationEDUs: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.EDU]{ // queue NID -> EDU + &RistrettoCachePartition[int64, *gomatrixserverlib.EDU]{ + cache: cache, + Prefix: federationEDUsCache, + Mutable: true, + MaxAge: lesserOf(time.Hour/2, maxAge), + }, + }, + SpaceSummaryRooms: &RistrettoCachePartition[string, gomatrixserverlib.MSC2946SpacesResponse]{ // room ID -> space response + cache: cache, + Prefix: spaceSummaryRoomsCache, + Mutable: true, + MaxAge: maxAge, + }, + LazyLoading: &RistrettoCachePartition[lazyLoadingCacheKey, string]{ // composite key -> event ID + cache: cache, + Prefix: lazyLoadingCache, + Mutable: true, + MaxAge: maxAge, + }, + } +} + +type RistrettoCostedCachePartition[k keyable, v costable] struct { + *RistrettoCachePartition[k, v] +} + +func (c *RistrettoCostedCachePartition[K, V]) Set(key K, value V) { + cost := value.CacheCost() + c.setWithCost(key, value, int64(cost)) +} + +type RistrettoCachePartition[K keyable, V any] struct { + cache *ristretto.Cache + Prefix byte + Mutable bool + MaxAge time.Duration +} + +func (c *RistrettoCachePartition[K, V]) setWithCost(key K, value V, cost int64) { + bkey := fmt.Sprintf("%c%v", c.Prefix, key) + if !c.Mutable { + if v, ok := c.cache.Get(bkey); ok && v != nil && !reflect.DeepEqual(v, value) { + panic(fmt.Sprintf("invalid use of immutable cache tries to change value of %v from %v to %v", key, v, value)) + } + } + c.cache.SetWithTTL(bkey, value, int64(len(bkey))+cost, c.MaxAge) +} + +func (c *RistrettoCachePartition[K, V]) Set(key K, value V) { + var cost int64 + if cv, ok := any(value).(string); ok { + cost = int64(len(cv)) + } else { + cost = int64(unsafe.Sizeof(value)) + } + c.setWithCost(key, value, cost) +} + +func (c *RistrettoCachePartition[K, V]) Unset(key K) { + bkey := fmt.Sprintf("%c%v", c.Prefix, key) + if !c.Mutable { + panic(fmt.Sprintf("invalid use of immutable cache tries to unset value of %v", key)) + } + c.cache.Del(bkey) +} + +func (c *RistrettoCachePartition[K, V]) Get(key K) (value V, ok bool) { + bkey := fmt.Sprintf("%c%v", c.Prefix, key) + v, ok := c.cache.Get(bkey) + if !ok || v == nil { + var empty V + return empty, false + } + value, ok = v.(V) + return +} + +func lesserOf(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go index ee67a6da..d9623196 100644 --- a/internal/eventutil/events.go +++ b/internal/eventutil/events.go @@ -170,20 +170,18 @@ func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( // RedactEvent redacts the given event and sets the unsigned field appropriately. This should be used by // downstream components to the roomserver when an OutputTypeRedactedEvent occurs. -func RedactEvent(redactionEvent, redactedEvent *gomatrixserverlib.Event) (*gomatrixserverlib.Event, error) { +func RedactEvent(redactionEvent, redactedEvent *gomatrixserverlib.Event) error { // sanity check if redactionEvent.Type() != gomatrixserverlib.MRoomRedaction { - return nil, fmt.Errorf("RedactEvent: redactionEvent isn't a redaction event, is '%s'", redactionEvent.Type()) + return fmt.Errorf("RedactEvent: redactionEvent isn't a redaction event, is '%s'", redactionEvent.Type()) } - r := redactedEvent.Redact() - err := r.SetUnsignedField("redacted_because", redactionEvent) - if err != nil { - return nil, err + redactedEvent.Redact() + if err := redactedEvent.SetUnsignedField("redacted_because", redactionEvent); err != nil { + return err } // NOTSPEC: sytest relies on this unspecced field existing :( - err = r.SetUnsignedField("redacted_by", redactionEvent.EventID()) - if err != nil { - return nil, err + if err := redactedEvent.SetUnsignedField("redacted_by", redactionEvent.EventID()); err != nil { + return err } - return r, nil + return nil } |