aboutsummaryrefslogtreecommitdiff
path: root/internal/caching
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-03-29 14:14:35 +0200
committerGitHub <noreply@github.com>2022-03-29 14:14:35 +0200
commit49dc49b232432d52c082646cc6f778593f4cb8b4 (patch)
treee809deedcae2127e930b0d382c863561b61fd9d2 /internal/caching
parent7972915806348847ecd9a9b8a1b1ff0609cb883c (diff)
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer * Move SendToDevice to producer * Remove most parts of the EDU server * Fix SendToDevice & copyrights * Move structs, cleanup EDU Server traces * Use HeadersOnly subscription * Missing file * Fix linter issues * Move consumers to own files * Rename durable consumer; Consumer cleanup * Docs/config cleanup
Diffstat (limited to 'internal/caching')
-rw-r--r--internal/caching/cache_typing.go184
-rw-r--r--internal/caching/cache_typing_test.go103
2 files changed, 287 insertions, 0 deletions
diff --git a/internal/caching/cache_typing.go b/internal/caching/cache_typing.go
new file mode 100644
index 00000000..bd6a5fc1
--- /dev/null
+++ b/internal/caching/cache_typing.go
@@ -0,0 +1,184 @@
+// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package caching
+
+import (
+ "sync"
+ "time"
+)
+
+const defaultTypingTimeout = 10 * time.Second
+
+// userSet is a map of user IDs to a timer, timer fires at expiry.
+type userSet map[string]*time.Timer
+
+// TimeoutCallbackFn is a function called right after the removal of a user
+// from the typing user list due to timeout.
+// latestSyncPosition is the typing sync position after the removal.
+type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64)
+
+type roomData struct {
+ syncPosition int64
+ userSet userSet
+}
+
+// EDUCache maintains a list of users typing in each room.
+type EDUCache struct {
+ sync.RWMutex
+ latestSyncPosition int64
+ data map[string]*roomData
+ timeoutCallback TimeoutCallbackFn
+}
+
+// Create a roomData with its sync position set to the latest sync position.
+// Must only be called after locking the cache.
+func (t *EDUCache) newRoomData() *roomData {
+ return &roomData{
+ syncPosition: t.latestSyncPosition,
+ userSet: make(userSet),
+ }
+}
+
+// NewTypingCache returns a new EDUCache initialised for use.
+func NewTypingCache() *EDUCache {
+ return &EDUCache{data: make(map[string]*roomData)}
+}
+
+// SetTimeoutCallback sets a callback function that is called right after
+// a user is removed from the typing user list due to timeout.
+func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
+ t.timeoutCallback = fn
+}
+
+// GetTypingUsers returns the list of users typing in a room.
+func (t *EDUCache) GetTypingUsers(roomID string) []string {
+ users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
+ // 0 should work above because the first position used will be 1.
+ return users
+}
+
+// GetTypingUsersIfUpdatedAfter returns all users typing in this room with
+// updated == true if the typing sync position of the room is after the given
+// position. Otherwise, returns an empty slice with updated == false.
+func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
+ roomID string, position int64,
+) (users []string, updated bool) {
+ t.RLock()
+ defer t.RUnlock()
+
+ roomData, ok := t.data[roomID]
+ if ok && roomData.syncPosition > position {
+ updated = true
+ userSet := roomData.userSet
+ users = make([]string, 0, len(userSet))
+ for userID := range userSet {
+ users = append(users, userID)
+ }
+ }
+
+ return
+}
+
+// AddTypingUser sets an user as typing in a room.
+// expire is the time when the user typing should time out.
+// if expire is nil, defaultTypingTimeout is assumed.
+// Returns the latest sync position for typing after update.
+func (t *EDUCache) AddTypingUser(
+ userID, roomID string, expire *time.Time,
+) int64 {
+ expireTime := getExpireTime(expire)
+ if until := time.Until(expireTime); until > 0 {
+ timer := time.AfterFunc(until, func() {
+ latestSyncPosition := t.RemoveUser(userID, roomID)
+ if t.timeoutCallback != nil {
+ t.timeoutCallback(userID, roomID, latestSyncPosition)
+ }
+ })
+ return t.addUser(userID, roomID, timer)
+ }
+ return t.GetLatestSyncPosition()
+}
+
+// addUser with mutex lock & replace the previous timer.
+// Returns the latest typing sync position after update.
+func (t *EDUCache) addUser(
+ userID, roomID string, expiryTimer *time.Timer,
+) int64 {
+ t.Lock()
+ defer t.Unlock()
+
+ t.latestSyncPosition++
+
+ if t.data[roomID] == nil {
+ t.data[roomID] = t.newRoomData()
+ } else {
+ t.data[roomID].syncPosition = t.latestSyncPosition
+ }
+
+ // Stop the timer to cancel the call to timeoutCallback
+ if timer, ok := t.data[roomID].userSet[userID]; ok {
+ // It may happen that at this stage the timer fires, but we now have a lock on
+ // it. Hence the execution of timeoutCallback will happen after we unlock. So
+ // we may lose a typing state, though this is highly unlikely. This can be
+ // mitigated by keeping another time.Time in the map and checking against it
+ // before removing, but its occurrence is so infrequent it does not seem
+ // worthwhile.
+ timer.Stop()
+ }
+
+ t.data[roomID].userSet[userID] = expiryTimer
+
+ return t.latestSyncPosition
+}
+
+// RemoveUser with mutex lock & stop the timer.
+// Returns the latest sync position for typing after update.
+func (t *EDUCache) RemoveUser(userID, roomID string) int64 {
+ t.Lock()
+ defer t.Unlock()
+
+ roomData, ok := t.data[roomID]
+ if !ok {
+ return t.latestSyncPosition
+ }
+
+ timer, ok := roomData.userSet[userID]
+ if !ok {
+ return t.latestSyncPosition
+ }
+
+ timer.Stop()
+ delete(roomData.userSet, userID)
+
+ t.latestSyncPosition++
+ t.data[roomID].syncPosition = t.latestSyncPosition
+
+ return t.latestSyncPosition
+}
+
+func (t *EDUCache) GetLatestSyncPosition() int64 {
+ t.Lock()
+ defer t.Unlock()
+ return t.latestSyncPosition
+}
+
+func getExpireTime(expire *time.Time) time.Time {
+ if expire != nil {
+ return *expire
+ }
+ return time.Now().Add(defaultTypingTimeout)
+}
diff --git a/internal/caching/cache_typing_test.go b/internal/caching/cache_typing_test.go
new file mode 100644
index 00000000..c03d89bc
--- /dev/null
+++ b/internal/caching/cache_typing_test.go
@@ -0,0 +1,103 @@
+// Copyright 2017 Vector Creations Ltd
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package caching
+
+import (
+ "testing"
+ "time"
+
+ "github.com/matrix-org/dendrite/internal/test"
+)
+
+func TestEDUCache(t *testing.T) {
+ tCache := NewTypingCache()
+ if tCache == nil {
+ t.Fatal("NewTypingCache failed")
+ }
+
+ t.Run("AddTypingUser", func(t *testing.T) {
+ testAddTypingUser(t, tCache)
+ })
+
+ t.Run("GetTypingUsers", func(t *testing.T) {
+ testGetTypingUsers(t, tCache)
+ })
+
+ t.Run("RemoveUser", func(t *testing.T) {
+ testRemoveUser(t, tCache)
+ })
+}
+
+func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam
+ present := time.Now()
+ tests := []struct {
+ userID string
+ roomID string
+ expire *time.Time
+ }{ // Set four users typing state to room1
+ {"user1", "room1", nil},
+ {"user2", "room1", nil},
+ {"user3", "room1", nil},
+ {"user4", "room1", nil},
+ //typing state with past expireTime should not take effect or removed.
+ {"user1", "room2", &present},
+ }
+
+ for _, tt := range tests {
+ tCache.AddTypingUser(tt.userID, tt.roomID, tt.expire)
+ }
+}
+
+func testGetTypingUsers(t *testing.T, tCache *EDUCache) {
+ tests := []struct {
+ roomID string
+ wantUsers []string
+ }{
+ {"room1", []string{"user1", "user2", "user3", "user4"}},
+ {"room2", []string{}},
+ }
+
+ for _, tt := range tests {
+ gotUsers := tCache.GetTypingUsers(tt.roomID)
+ if !test.UnsortedStringSliceEqual(gotUsers, tt.wantUsers) {
+ t.Errorf("TypingCache.GetTypingUsers(%s) = %v, want %v", tt.roomID, gotUsers, tt.wantUsers)
+ }
+ }
+}
+
+func testRemoveUser(t *testing.T, tCache *EDUCache) {
+ tests := []struct {
+ roomID string
+ userIDs []string
+ }{
+ {"room3", []string{"user1"}},
+ {"room4", []string{"user1", "user2", "user3"}},
+ }
+
+ for _, tt := range tests {
+ for _, userID := range tt.userIDs {
+ tCache.AddTypingUser(userID, tt.roomID, nil)
+ }
+
+ length := len(tt.userIDs)
+ tCache.RemoveUser(tt.userIDs[length-1], tt.roomID)
+ expLeftUsers := tt.userIDs[:length-1]
+ if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) {
+ t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers)
+ }
+ }
+}