diff options
author | S7evinK <2353100+S7evinK@users.noreply.github.com> | 2022-03-29 14:14:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-29 14:14:35 +0200 |
commit | 49dc49b232432d52c082646cc6f778593f4cb8b4 (patch) | |
tree | e809deedcae2127e930b0d382c863561b61fd9d2 /internal/caching | |
parent | 7972915806348847ecd9a9b8a1b1ff0609cb883c (diff) |
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer
* Move SendToDevice to producer
* Remove most parts of the EDU server
* Fix SendToDevice & copyrights
* Move structs, cleanup EDU Server traces
* Use HeadersOnly subscription
* Missing file
* Fix linter issues
* Move consumers to own files
* Rename durable consumer; Consumer cleanup
* Docs/config cleanup
Diffstat (limited to 'internal/caching')
-rw-r--r-- | internal/caching/cache_typing.go | 184 | ||||
-rw-r--r-- | internal/caching/cache_typing_test.go | 103 |
2 files changed, 287 insertions, 0 deletions
diff --git a/internal/caching/cache_typing.go b/internal/caching/cache_typing.go new file mode 100644 index 00000000..bd6a5fc1 --- /dev/null +++ b/internal/caching/cache_typing.go @@ -0,0 +1,184 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package caching + +import ( + "sync" + "time" +) + +const defaultTypingTimeout = 10 * time.Second + +// userSet is a map of user IDs to a timer, timer fires at expiry. +type userSet map[string]*time.Timer + +// TimeoutCallbackFn is a function called right after the removal of a user +// from the typing user list due to timeout. +// latestSyncPosition is the typing sync position after the removal. +type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64) + +type roomData struct { + syncPosition int64 + userSet userSet +} + +// EDUCache maintains a list of users typing in each room. +type EDUCache struct { + sync.RWMutex + latestSyncPosition int64 + data map[string]*roomData + timeoutCallback TimeoutCallbackFn +} + +// Create a roomData with its sync position set to the latest sync position. +// Must only be called after locking the cache. +func (t *EDUCache) newRoomData() *roomData { + return &roomData{ + syncPosition: t.latestSyncPosition, + userSet: make(userSet), + } +} + +// NewTypingCache returns a new EDUCache initialised for use. +func NewTypingCache() *EDUCache { + return &EDUCache{data: make(map[string]*roomData)} +} + +// SetTimeoutCallback sets a callback function that is called right after +// a user is removed from the typing user list due to timeout. +func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) { + t.timeoutCallback = fn +} + +// GetTypingUsers returns the list of users typing in a room. +func (t *EDUCache) GetTypingUsers(roomID string) []string { + users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) + // 0 should work above because the first position used will be 1. + return users +} + +// GetTypingUsersIfUpdatedAfter returns all users typing in this room with +// updated == true if the typing sync position of the room is after the given +// position. Otherwise, returns an empty slice with updated == false. +func (t *EDUCache) GetTypingUsersIfUpdatedAfter( + roomID string, position int64, +) (users []string, updated bool) { + t.RLock() + defer t.RUnlock() + + roomData, ok := t.data[roomID] + if ok && roomData.syncPosition > position { + updated = true + userSet := roomData.userSet + users = make([]string, 0, len(userSet)) + for userID := range userSet { + users = append(users, userID) + } + } + + return +} + +// AddTypingUser sets an user as typing in a room. +// expire is the time when the user typing should time out. +// if expire is nil, defaultTypingTimeout is assumed. +// Returns the latest sync position for typing after update. +func (t *EDUCache) AddTypingUser( + userID, roomID string, expire *time.Time, +) int64 { + expireTime := getExpireTime(expire) + if until := time.Until(expireTime); until > 0 { + timer := time.AfterFunc(until, func() { + latestSyncPosition := t.RemoveUser(userID, roomID) + if t.timeoutCallback != nil { + t.timeoutCallback(userID, roomID, latestSyncPosition) + } + }) + return t.addUser(userID, roomID, timer) + } + return t.GetLatestSyncPosition() +} + +// addUser with mutex lock & replace the previous timer. +// Returns the latest typing sync position after update. +func (t *EDUCache) addUser( + userID, roomID string, expiryTimer *time.Timer, +) int64 { + t.Lock() + defer t.Unlock() + + t.latestSyncPosition++ + + if t.data[roomID] == nil { + t.data[roomID] = t.newRoomData() + } else { + t.data[roomID].syncPosition = t.latestSyncPosition + } + + // Stop the timer to cancel the call to timeoutCallback + if timer, ok := t.data[roomID].userSet[userID]; ok { + // It may happen that at this stage the timer fires, but we now have a lock on + // it. Hence the execution of timeoutCallback will happen after we unlock. So + // we may lose a typing state, though this is highly unlikely. This can be + // mitigated by keeping another time.Time in the map and checking against it + // before removing, but its occurrence is so infrequent it does not seem + // worthwhile. + timer.Stop() + } + + t.data[roomID].userSet[userID] = expiryTimer + + return t.latestSyncPosition +} + +// RemoveUser with mutex lock & stop the timer. +// Returns the latest sync position for typing after update. +func (t *EDUCache) RemoveUser(userID, roomID string) int64 { + t.Lock() + defer t.Unlock() + + roomData, ok := t.data[roomID] + if !ok { + return t.latestSyncPosition + } + + timer, ok := roomData.userSet[userID] + if !ok { + return t.latestSyncPosition + } + + timer.Stop() + delete(roomData.userSet, userID) + + t.latestSyncPosition++ + t.data[roomID].syncPosition = t.latestSyncPosition + + return t.latestSyncPosition +} + +func (t *EDUCache) GetLatestSyncPosition() int64 { + t.Lock() + defer t.Unlock() + return t.latestSyncPosition +} + +func getExpireTime(expire *time.Time) time.Time { + if expire != nil { + return *expire + } + return time.Now().Add(defaultTypingTimeout) +} diff --git a/internal/caching/cache_typing_test.go b/internal/caching/cache_typing_test.go new file mode 100644 index 00000000..c03d89bc --- /dev/null +++ b/internal/caching/cache_typing_test.go @@ -0,0 +1,103 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package caching + +import ( + "testing" + "time" + + "github.com/matrix-org/dendrite/internal/test" +) + +func TestEDUCache(t *testing.T) { + tCache := NewTypingCache() + if tCache == nil { + t.Fatal("NewTypingCache failed") + } + + t.Run("AddTypingUser", func(t *testing.T) { + testAddTypingUser(t, tCache) + }) + + t.Run("GetTypingUsers", func(t *testing.T) { + testGetTypingUsers(t, tCache) + }) + + t.Run("RemoveUser", func(t *testing.T) { + testRemoveUser(t, tCache) + }) +} + +func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam + present := time.Now() + tests := []struct { + userID string + roomID string + expire *time.Time + }{ // Set four users typing state to room1 + {"user1", "room1", nil}, + {"user2", "room1", nil}, + {"user3", "room1", nil}, + {"user4", "room1", nil}, + //typing state with past expireTime should not take effect or removed. + {"user1", "room2", &present}, + } + + for _, tt := range tests { + tCache.AddTypingUser(tt.userID, tt.roomID, tt.expire) + } +} + +func testGetTypingUsers(t *testing.T, tCache *EDUCache) { + tests := []struct { + roomID string + wantUsers []string + }{ + {"room1", []string{"user1", "user2", "user3", "user4"}}, + {"room2", []string{}}, + } + + for _, tt := range tests { + gotUsers := tCache.GetTypingUsers(tt.roomID) + if !test.UnsortedStringSliceEqual(gotUsers, tt.wantUsers) { + t.Errorf("TypingCache.GetTypingUsers(%s) = %v, want %v", tt.roomID, gotUsers, tt.wantUsers) + } + } +} + +func testRemoveUser(t *testing.T, tCache *EDUCache) { + tests := []struct { + roomID string + userIDs []string + }{ + {"room3", []string{"user1"}}, + {"room4", []string{"user1", "user2", "user3"}}, + } + + for _, tt := range tests { + for _, userID := range tt.userIDs { + tCache.AddTypingUser(userID, tt.roomID, nil) + } + + length := len(tt.userIDs) + tCache.RemoveUser(tt.userIDs[length-1], tt.roomID) + expLeftUsers := tt.userIDs[:length-1] + if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) { + t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers) + } + } +} |