diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-06-05 16:42:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-05 16:42:01 +0100 |
commit | e7b19d2c70be49f3c995a9bfd9dd93ce767d960f (patch) | |
tree | 0ff8734c896530843537c4c58421964cf89f020d | |
parent | 76ff47c0522e03eabd72140bf62e1d0d1d1029e0 (diff) |
More flexible caching (#1101)
-rw-r--r-- | cmd/roomserver-integration-tests/main.go | 2 | ||||
-rw-r--r-- | internal/basecomponent/base.go | 10 | ||||
-rw-r--r-- | internal/caching/cache_roomversions.go | 30 | ||||
-rw-r--r-- | internal/caching/cache_serverkeys.go | 41 | ||||
-rw-r--r-- | internal/caching/caches.go | 15 | ||||
-rw-r--r-- | internal/caching/immutablecache.go | 17 | ||||
-rw-r--r-- | internal/caching/immutableinmemorylru.go | 95 | ||||
-rw-r--r-- | internal/caching/impl_inmemorylru.go | 73 | ||||
-rw-r--r-- | roomserver/internal/api.go | 2 | ||||
-rw-r--r-- | roomserver/internal/query.go | 4 | ||||
-rw-r--r-- | roomserver/inthttp/client.go | 18 | ||||
-rw-r--r-- | roomserver/roomserver.go | 2 | ||||
-rw-r--r-- | serverkeyapi/inthttp/client.go | 12 | ||||
-rw-r--r-- | serverkeyapi/inthttp/server.go | 2 | ||||
-rw-r--r-- | serverkeyapi/serverkeyapi.go | 4 | ||||
-rw-r--r-- | serverkeyapi/storage/cache/keydb.go | 4 |
16 files changed, 189 insertions, 142 deletions
diff --git a/cmd/roomserver-integration-tests/main.go b/cmd/roomserver-integration-tests/main.go index 2433f9ff..43aca078 100644 --- a/cmd/roomserver-integration-tests/main.go +++ b/cmd/roomserver-integration-tests/main.go @@ -255,7 +255,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R panic(err) } - cache, err := caching.NewImmutableInMemoryLRUCache() + cache, err := caching.NewInMemoryLRUCache() if err != nil { panic(err) } diff --git a/internal/basecomponent/base.go b/internal/basecomponent/base.go index 620b12d6..3ad1e4af 100644 --- a/internal/basecomponent/base.go +++ b/internal/basecomponent/base.go @@ -66,7 +66,7 @@ type BaseDendrite struct { UseHTTPAPIs bool httpClient *http.Client Cfg *config.Dendrite - ImmutableCache caching.ImmutableCache + Caches *caching.Caches KafkaConsumer sarama.Consumer KafkaProducer sarama.SyncProducer } @@ -95,7 +95,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo kafkaConsumer, kafkaProducer = setupKafka(cfg) } - cache, err := caching.NewImmutableInMemoryLRUCache() + cache, err := caching.NewInMemoryLRUCache() if err != nil { logrus.WithError(err).Warnf("Failed to create cache") } @@ -126,7 +126,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo UseHTTPAPIs: useHTTPAPIs, tracerCloser: closer, Cfg: cfg, - ImmutableCache: cache, + Caches: cache, PublicAPIMux: httpmux.PathPrefix(httpapis.PublicPathPrefix).Subrouter().UseEncodedPath(), InternalAPIMux: httpmux.PathPrefix(httpapis.InternalPathPrefix).Subrouter().UseEncodedPath(), httpClient: &client, @@ -151,7 +151,7 @@ func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI { // RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP. func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI { - rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.ImmutableCache) + rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.Caches) if err != nil { logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.httpClient) } @@ -182,7 +182,7 @@ func (b *BaseDendrite) ServerKeyAPIClient() serverKeyAPI.ServerKeyInternalAPI { f, err := skinthttp.NewServerKeyClient( b.Cfg.ServerKeyAPIURL(), b.httpClient, - b.ImmutableCache, + b.Caches, ) if err != nil { logrus.WithError(err).Panic("NewServerKeyInternalAPIHTTP failed", b.httpClient) diff --git a/internal/caching/cache_roomversions.go b/internal/caching/cache_roomversions.go new file mode 100644 index 00000000..0b46d3d4 --- /dev/null +++ b/internal/caching/cache_roomversions.go @@ -0,0 +1,30 @@ +package caching + +import "github.com/matrix-org/gomatrixserverlib" + +const ( + RoomVersionCacheName = "room_versions" + RoomVersionCacheMaxEntries = 1024 + RoomVersionCacheMutable = false +) + +// RoomVersionsCache contains the subset of functions needed for +// a room version cache. +type RoomVersionCache interface { + GetRoomVersion(roomID string) (roomVersion gomatrixserverlib.RoomVersion, ok bool) + StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) +} + +func (c Caches) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) { + val, found := c.RoomVersions.Get(roomID) + if found && val != nil { + if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok { + return roomVersion, true + } + } + return "", false +} + +func (c Caches) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) { + c.RoomVersions.Set(roomID, roomVersion) +} diff --git a/internal/caching/cache_serverkeys.go b/internal/caching/cache_serverkeys.go new file mode 100644 index 00000000..8c71ffbd --- /dev/null +++ b/internal/caching/cache_serverkeys.go @@ -0,0 +1,41 @@ +package caching + +import ( + "fmt" + + "github.com/matrix-org/gomatrixserverlib" +) + +const ( + ServerKeyCacheName = "server_key" + ServerKeyCacheMaxEntries = 4096 + ServerKeyCacheMutable = true +) + +// ServerKeyCache contains the subset of functions needed for +// a server key cache. +type ServerKeyCache interface { + GetServerKey(request gomatrixserverlib.PublicKeyLookupRequest) (response gomatrixserverlib.PublicKeyLookupResult, ok bool) + StoreServerKey(request gomatrixserverlib.PublicKeyLookupRequest, response gomatrixserverlib.PublicKeyLookupResult) +} + +func (c Caches) GetServerKey( + request gomatrixserverlib.PublicKeyLookupRequest, +) (gomatrixserverlib.PublicKeyLookupResult, bool) { + key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) + val, found := c.ServerKeys.Get(key) + if found && val != nil { + if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok { + return keyLookupResult, true + } + } + return gomatrixserverlib.PublicKeyLookupResult{}, false +} + +func (c Caches) StoreServerKey( + request gomatrixserverlib.PublicKeyLookupRequest, + response gomatrixserverlib.PublicKeyLookupResult, +) { + key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) + c.ServerKeys.Set(key, response) +} diff --git a/internal/caching/caches.go b/internal/caching/caches.go new file mode 100644 index 00000000..70f380ba --- /dev/null +++ b/internal/caching/caches.go @@ -0,0 +1,15 @@ +package caching + +// Caches contains a set of references to caches. They may be +// different implementations as long as they satisfy the Cache +// interface. +type Caches struct { + RoomVersions Cache // implements RoomVersionCache + ServerKeys Cache // implements ServerKeyCache +} + +// Cache is the interface that an implementation must satisfy. +type Cache interface { + Get(key string) (value interface{}, ok bool) + Set(key string, value interface{}) +} diff --git a/internal/caching/immutablecache.go b/internal/caching/immutablecache.go deleted file mode 100644 index fea05dd1..00000000 --- a/internal/caching/immutablecache.go +++ /dev/null @@ -1,17 +0,0 @@ -package caching - -import ( - "github.com/matrix-org/gomatrixserverlib" -) - -const ( - RoomVersionMaxCacheEntries = 1024 - ServerKeysMaxCacheEntries = 1024 -) - -type ImmutableCache interface { - GetRoomVersion(roomId string) (gomatrixserverlib.RoomVersion, bool) - StoreRoomVersion(roomId string, roomVersion gomatrixserverlib.RoomVersion) - GetServerKey(request gomatrixserverlib.PublicKeyLookupRequest) (gomatrixserverlib.PublicKeyLookupResult, bool) - StoreServerKey(request gomatrixserverlib.PublicKeyLookupRequest, response gomatrixserverlib.PublicKeyLookupResult) -} diff --git a/internal/caching/immutableinmemorylru.go b/internal/caching/immutableinmemorylru.go deleted file mode 100644 index 36cd56dc..00000000 --- a/internal/caching/immutableinmemorylru.go +++ /dev/null @@ -1,95 +0,0 @@ -package caching - -import ( - "fmt" - - lru "github.com/hashicorp/golang-lru" - "github.com/matrix-org/gomatrixserverlib" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -type ImmutableInMemoryLRUCache struct { - roomVersions *lru.Cache - serverKeys *lru.Cache -} - -func NewImmutableInMemoryLRUCache() (*ImmutableInMemoryLRUCache, error) { - roomVersionCache, rvErr := lru.New(RoomVersionMaxCacheEntries) - if rvErr != nil { - return nil, rvErr - } - serverKeysCache, rvErr := lru.New(ServerKeysMaxCacheEntries) - if rvErr != nil { - return nil, rvErr - } - cache := &ImmutableInMemoryLRUCache{ - roomVersions: roomVersionCache, - serverKeys: serverKeysCache, - } - cache.configureMetrics() - return cache, nil -} - -func (c *ImmutableInMemoryLRUCache) configureMetrics() { - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "dendrite", - Subsystem: "caching", - Name: "number_room_version_entries", - Help: "The number of room version entries cached.", - }, func() float64 { - return float64(c.roomVersions.Len()) - }) - - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "dendrite", - Subsystem: "caching", - Name: "number_server_key_entries", - Help: "The number of server key entries cached.", - }, func() float64 { - return float64(c.serverKeys.Len()) - }) -} - -func checkForInvalidMutation(cache *lru.Cache, key string, value interface{}) { - if peek, ok := cache.Peek(key); ok && peek != value { - panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key)) - } -} - -func (c *ImmutableInMemoryLRUCache) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) { - val, found := c.roomVersions.Get(roomID) - if found && val != nil { - if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok { - return roomVersion, true - } - } - return "", false -} - -func (c *ImmutableInMemoryLRUCache) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) { - checkForInvalidMutation(c.roomVersions, roomID, roomVersion) - c.roomVersions.Add(roomID, roomVersion) -} - -func (c *ImmutableInMemoryLRUCache) GetServerKey( - request gomatrixserverlib.PublicKeyLookupRequest, -) (gomatrixserverlib.PublicKeyLookupResult, bool) { - key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) - val, found := c.serverKeys.Get(key) - if found && val != nil { - if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok { - return keyLookupResult, true - } - } - return gomatrixserverlib.PublicKeyLookupResult{}, false -} - -func (c *ImmutableInMemoryLRUCache) StoreServerKey( - request gomatrixserverlib.PublicKeyLookupRequest, - response gomatrixserverlib.PublicKeyLookupResult, -) { - key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID) - checkForInvalidMutation(c.roomVersions, key, response) - c.serverKeys.Add(request, response) -} diff --git a/internal/caching/impl_inmemorylru.go b/internal/caching/impl_inmemorylru.go new file mode 100644 index 00000000..f7901d2e --- /dev/null +++ b/internal/caching/impl_inmemorylru.go @@ -0,0 +1,73 @@ +package caching + +import ( + "fmt" + + lru "github.com/hashicorp/golang-lru" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +func NewInMemoryLRUCache() (*Caches, error) { + roomVersions, err := NewInMemoryLRUCachePartition( + RoomVersionCacheName, + RoomVersionCacheMutable, + RoomVersionCacheMaxEntries, + ) + if err != nil { + return nil, err + } + serverKeys, err := NewInMemoryLRUCachePartition( + ServerKeyCacheName, + ServerKeyCacheMutable, + ServerKeyCacheMaxEntries, + ) + if err != nil { + return nil, err + } + return &Caches{ + RoomVersions: roomVersions, + ServerKeys: serverKeys, + }, nil +} + +type InMemoryLRUCachePartition struct { + name string + mutable bool + maxEntries int + lru *lru.Cache +} + +func NewInMemoryLRUCachePartition(name string, mutable bool, maxEntries int) (*InMemoryLRUCachePartition, error) { + var err error + cache := InMemoryLRUCachePartition{ + name: name, + mutable: mutable, + maxEntries: maxEntries, + } + cache.lru, err = lru.New(maxEntries) + if err != nil { + return nil, err + } + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "caching_in_memory_lru", + Name: name, + }, func() float64 { + return float64(cache.lru.Len()) + }) + return &cache, nil +} + +func (c *InMemoryLRUCachePartition) Set(key string, value interface{}) { + if !c.mutable { + if peek, ok := c.lru.Peek(key); ok && peek != value { + panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key)) + } + } + c.lru.Add(key, value) +} + +func (c *InMemoryLRUCachePartition) Get(key string) (value interface{}, ok bool) { + return c.lru.Get(key) +} diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 3a7b0d76..37a8a39b 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -16,7 +16,7 @@ type RoomserverInternalAPI struct { DB storage.Database Cfg *config.Dendrite Producer sarama.SyncProducer - ImmutableCache caching.ImmutableCache + Cache caching.RoomVersionCache ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier FedClient *gomatrixserverlib.FederationClient diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index fce2ae90..9fb67e7e 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -951,7 +951,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom( request *api.QueryRoomVersionForRoomRequest, response *api.QueryRoomVersionForRoomResponse, ) error { - if roomVersion, ok := r.ImmutableCache.GetRoomVersion(request.RoomID); ok { + if roomVersion, ok := r.Cache.GetRoomVersion(request.RoomID); ok { response.RoomVersion = roomVersion return nil } @@ -961,6 +961,6 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom( return err } response.RoomVersion = roomVersion - r.ImmutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion) + r.Cache.StoreRoomVersion(request.RoomID, response.RoomVersion) return nil } diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 99db9e1e..5cc2537e 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -43,9 +43,9 @@ const ( ) type httpRoomserverInternalAPI struct { - roomserverURL string - httpClient *http.Client - immutableCache caching.ImmutableCache + roomserverURL string + httpClient *http.Client + cache caching.RoomVersionCache } // NewRoomserverClient creates a RoomserverInputAPI implemented by talking to a HTTP POST API. @@ -53,15 +53,15 @@ type httpRoomserverInternalAPI struct { func NewRoomserverClient( roomserverURL string, httpClient *http.Client, - immutableCache caching.ImmutableCache, + cache caching.RoomVersionCache, ) (api.RoomserverInternalAPI, error) { if httpClient == nil { return nil, errors.New("NewRoomserverInternalAPIHTTP: httpClient is <nil>") } return &httpRoomserverInternalAPI{ - roomserverURL: roomserverURL, - httpClient: httpClient, - immutableCache: immutableCache, + roomserverURL: roomserverURL, + httpClient: httpClient, + cache: cache, }, nil } @@ -320,7 +320,7 @@ func (h *httpRoomserverInternalAPI) QueryRoomVersionForRoom( request *api.QueryRoomVersionForRoomRequest, response *api.QueryRoomVersionForRoomResponse, ) error { - if roomVersion, ok := h.immutableCache.GetRoomVersion(request.RoomID); ok { + if roomVersion, ok := h.cache.GetRoomVersion(request.RoomID); ok { response.RoomVersion = roomVersion return nil } @@ -331,7 +331,7 @@ func (h *httpRoomserverInternalAPI) QueryRoomVersionForRoom( apiURL := h.roomserverURL + RoomserverQueryRoomVersionForRoomPath err := internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) if err == nil { - h.immutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion) + h.cache.StoreRoomVersion(request.RoomID, response.RoomVersion) } return err } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index ae0b0794..a55b20be 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -44,7 +44,7 @@ func SetupRoomServerComponent( Cfg: base.Cfg, Producer: base.KafkaProducer, OutputRoomEventTopic: string(base.Cfg.Kafka.Topics.OutputRoomEvent), - ImmutableCache: base.ImmutableCache, + Cache: base.Caches, ServerName: base.Cfg.Matrix.ServerName, FedClient: fedClient, KeyRing: keyRing, diff --git a/serverkeyapi/inthttp/client.go b/serverkeyapi/inthttp/client.go index f986634b..f22b0e31 100644 --- a/serverkeyapi/inthttp/client.go +++ b/serverkeyapi/inthttp/client.go @@ -24,7 +24,7 @@ const ( func NewServerKeyClient( serverKeyAPIURL string, httpClient *http.Client, - immutableCache caching.ImmutableCache, + cache caching.ServerKeyCache, ) (api.ServerKeyInternalAPI, error) { if httpClient == nil { return nil, errors.New("NewRoomserverInternalAPIHTTP: httpClient is <nil>") @@ -32,14 +32,14 @@ func NewServerKeyClient( return &httpServerKeyInternalAPI{ serverKeyAPIURL: serverKeyAPIURL, httpClient: httpClient, - immutableCache: immutableCache, + cache: cache, }, nil } type httpServerKeyInternalAPI struct { serverKeyAPIURL string httpClient *http.Client - immutableCache caching.ImmutableCache + cache caching.ServerKeyCache } func (s *httpServerKeyInternalAPI) KeyRing() *gomatrixserverlib.KeyRing { @@ -71,7 +71,7 @@ func (s *httpServerKeyInternalAPI) StoreKeys( response := api.InputPublicKeysResponse{} for req, res := range results { request.Keys[req] = res - s.immutableCache.StoreServerKey(req, res) + s.cache.StoreServerKey(req, res) } return s.InputPublicKeys(ctx, &request, &response) } @@ -92,7 +92,7 @@ func (s *httpServerKeyInternalAPI) FetchKeys( } now := gomatrixserverlib.AsTimestamp(time.Now()) for req, ts := range requests { - if res, ok := s.immutableCache.GetServerKey(req); ok { + if res, ok := s.cache.GetServerKey(req); ok { if now > res.ValidUntilTS && res.ExpiredTS == gomatrixserverlib.PublicKeyNotExpired { continue } @@ -107,7 +107,7 @@ func (s *httpServerKeyInternalAPI) FetchKeys( } for req, res := range response.Results { result[req] = res - s.immutableCache.StoreServerKey(req, res) + s.cache.StoreServerKey(req, res) } return result, nil } diff --git a/serverkeyapi/inthttp/server.go b/serverkeyapi/inthttp/server.go index d5517e14..9efe7d9d 100644 --- a/serverkeyapi/inthttp/server.go +++ b/serverkeyapi/inthttp/server.go @@ -12,7 +12,7 @@ import ( "github.com/matrix-org/util" ) -func AddRoutes(s api.ServerKeyInternalAPI, internalAPIMux *mux.Router, cache caching.ImmutableCache) { +func AddRoutes(s api.ServerKeyInternalAPI, internalAPIMux *mux.Router, cache caching.ServerKeyCache) { internalAPIMux.Handle(ServerKeyQueryPublicKeyPath, internal.MakeInternalAPI("queryPublicKeys", func(req *http.Request) util.JSONResponse { request := api.QueryPublicKeysRequest{} diff --git a/serverkeyapi/serverkeyapi.go b/serverkeyapi/serverkeyapi.go index 5bf8f67d..ad885270 100644 --- a/serverkeyapi/serverkeyapi.go +++ b/serverkeyapi/serverkeyapi.go @@ -29,7 +29,7 @@ func SetupServerKeyAPIComponent( logrus.WithError(err).Panicf("failed to connect to server key database") } - serverKeyDB, err := cache.NewKeyDatabase(innerDB, base.ImmutableCache) + serverKeyDB, err := cache.NewKeyDatabase(innerDB, base.Caches) if err != nil { logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database") } @@ -77,7 +77,7 @@ func SetupServerKeyAPIComponent( }).Info("Enabled perspective key fetcher") } - inthttp.AddRoutes(&internalAPI, base.InternalAPIMux, base.ImmutableCache) + inthttp.AddRoutes(&internalAPI, base.InternalAPIMux, base.Caches) return &internalAPI } diff --git a/serverkeyapi/storage/cache/keydb.go b/serverkeyapi/storage/cache/keydb.go index a0cdb900..b662e4fd 100644 --- a/serverkeyapi/storage/cache/keydb.go +++ b/serverkeyapi/storage/cache/keydb.go @@ -12,10 +12,10 @@ import ( // the public keys for other matrix servers. type KeyDatabase struct { inner gomatrixserverlib.KeyDatabase - cache caching.ImmutableCache + cache caching.ServerKeyCache } -func NewKeyDatabase(inner gomatrixserverlib.KeyDatabase, cache caching.ImmutableCache) (*KeyDatabase, error) { +func NewKeyDatabase(inner gomatrixserverlib.KeyDatabase, cache caching.ServerKeyCache) (*KeyDatabase, error) { if inner == nil { return nil, errors.New("inner database can't be nil") } |