aboutsummaryrefslogtreecommitdiff
path: root/internal/transactions
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-21 14:40:13 +0100
committerGitHub <noreply@github.com>2020-05-21 14:40:13 +0100
commit24d8df664c21fa8bd68d80b5585a496e264c410a (patch)
tree0a176d6dfd7f81522c5739b53313366b552b0ce1 /internal/transactions
parent3fdb045116c9cd2f2a3badfebec0645d0381bacb (diff)
Fix #897 and shuffle directory around (#1054)
* Fix #897 and shuffle directory around * Update find-lint * goimports Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'internal/transactions')
-rw-r--r--internal/transactions/transactions.go95
-rw-r--r--internal/transactions/transactions_test.go77
2 files changed, 172 insertions, 0 deletions
diff --git a/internal/transactions/transactions.go b/internal/transactions/transactions.go
new file mode 100644
index 00000000..d2eb0f27
--- /dev/null
+++ b/internal/transactions/transactions.go
@@ -0,0 +1,95 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transactions
+
+import (
+ "sync"
+ "time"
+
+ "github.com/matrix-org/util"
+)
+
+// DefaultCleanupPeriod represents the default time duration after which cacheCleanService runs.
+const DefaultCleanupPeriod time.Duration = 30 * time.Minute
+
+type txnsMap map[CacheKey]*util.JSONResponse
+
+// CacheKey is the type for the key in a transactions cache.
+// This is needed because the spec requires transaction IDs to have a per-access token scope.
+type CacheKey struct {
+ AccessToken string
+ TxnID string
+}
+
+// Cache represents a temporary store for response entries.
+// Entries are evicted after a certain period, defined by cleanupPeriod.
+// This works by keeping two maps of entries, and cycling the maps after the cleanupPeriod.
+type Cache struct {
+ sync.RWMutex
+ txnsMaps [2]txnsMap
+ cleanupPeriod time.Duration
+}
+
+// New is a wrapper which calls NewWithCleanupPeriod with DefaultCleanupPeriod as argument.
+func New() *Cache {
+ return NewWithCleanupPeriod(DefaultCleanupPeriod)
+}
+
+// NewWithCleanupPeriod creates a new Cache object, starts cacheCleanService.
+// Takes cleanupPeriod as argument.
+// Returns a reference to newly created Cache.
+func NewWithCleanupPeriod(cleanupPeriod time.Duration) *Cache {
+ t := Cache{txnsMaps: [2]txnsMap{make(txnsMap), make(txnsMap)}}
+ t.cleanupPeriod = cleanupPeriod
+
+ // Start clean service as the Cache is created
+ go cacheCleanService(&t)
+ return &t
+}
+
+// FetchTransaction looks up an entry for the (accessToken, txnID) tuple in Cache.
+// Looks in both the txnMaps.
+// Returns (JSON response, true) if txnID is found, else the returned bool is false.
+func (t *Cache) FetchTransaction(accessToken, txnID string) (*util.JSONResponse, bool) {
+ t.RLock()
+ defer t.RUnlock()
+ for _, txns := range t.txnsMaps {
+ res, ok := txns[CacheKey{accessToken, txnID}]
+ if ok {
+ return res, true
+ }
+ }
+ return nil, false
+}
+
+// AddTransaction adds an entry for the (accessToken, txnID) tuple in Cache.
+// Adds to the front txnMap.
+func (t *Cache) AddTransaction(accessToken, txnID string, res *util.JSONResponse) {
+ t.Lock()
+ defer t.Unlock()
+
+ t.txnsMaps[0][CacheKey{accessToken, txnID}] = res
+}
+
+// cacheCleanService is responsible for cleaning up entries after cleanupPeriod.
+// It guarantees that an entry will be present in cache for at least cleanupPeriod & at most 2 * cleanupPeriod.
+// This cycles the txnMaps forward, i.e. back map is assigned the front and front is assigned an empty map.
+func cacheCleanService(t *Cache) {
+ ticker := time.NewTicker(t.cleanupPeriod).C
+ for range ticker {
+ t.Lock()
+ t.txnsMaps[1] = t.txnsMaps[0]
+ t.txnsMaps[0] = make(txnsMap)
+ t.Unlock()
+ }
+}
diff --git a/internal/transactions/transactions_test.go b/internal/transactions/transactions_test.go
new file mode 100644
index 00000000..f565e484
--- /dev/null
+++ b/internal/transactions/transactions_test.go
@@ -0,0 +1,77 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transactions
+
+import (
+ "net/http"
+ "testing"
+
+ "github.com/matrix-org/util"
+)
+
+type fakeType struct {
+ ID string `json:"ID"`
+}
+
+var (
+ fakeAccessToken = "aRandomAccessToken"
+ fakeAccessToken2 = "anotherRandomAccessToken"
+ fakeTxnID = "aRandomTxnID"
+ fakeResponse = &util.JSONResponse{
+ Code: http.StatusOK, JSON: fakeType{ID: "0"},
+ }
+ fakeResponse2 = &util.JSONResponse{
+ Code: http.StatusOK, JSON: fakeType{ID: "1"},
+ }
+)
+
+// TestCache creates a New Cache and tests AddTransaction & FetchTransaction
+func TestCache(t *testing.T) {
+ fakeTxnCache := New()
+ fakeTxnCache.AddTransaction(fakeAccessToken, fakeTxnID, fakeResponse)
+
+ // Add entries for noise.
+ for i := 1; i <= 100; i++ {
+ fakeTxnCache.AddTransaction(
+ fakeAccessToken,
+ fakeTxnID+string(i),
+ &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: string(i)}},
+ )
+ }
+
+ testResponse, ok := fakeTxnCache.FetchTransaction(fakeAccessToken, fakeTxnID)
+ if !ok {
+ t.Error("Failed to retrieve entry for txnID: ", fakeTxnID)
+ } else if testResponse.JSON != fakeResponse.JSON {
+ t.Error("Fetched response incorrect. Expected: ", fakeResponse.JSON, " got: ", testResponse.JSON)
+ }
+}
+
+// TestCacheScope ensures transactions with the same transaction ID are not shared
+// across multiple access tokens.
+func TestCacheScope(t *testing.T) {
+ cache := New()
+ cache.AddTransaction(fakeAccessToken, fakeTxnID, fakeResponse)
+ cache.AddTransaction(fakeAccessToken2, fakeTxnID, fakeResponse2)
+
+ if res, ok := cache.FetchTransaction(fakeAccessToken, fakeTxnID); !ok {
+ t.Errorf("failed to retrieve entry for (%s, %s)", fakeAccessToken, fakeTxnID)
+ } else if res.JSON != fakeResponse.JSON {
+ t.Errorf("Wrong cache entry for (%s, %s). Expected: %v; got: %v", fakeAccessToken, fakeTxnID, fakeResponse.JSON, res.JSON)
+ }
+ if res, ok := cache.FetchTransaction(fakeAccessToken2, fakeTxnID); !ok {
+ t.Errorf("failed to retrieve entry for (%s, %s)", fakeAccessToken, fakeTxnID)
+ } else if res.JSON != fakeResponse2.JSON {
+ t.Errorf("Wrong cache entry for (%s, %s). Expected: %v; got: %v", fakeAccessToken, fakeTxnID, fakeResponse2.JSON, res.JSON)
+ }
+}