diff options
author | Kegsay <kegan@matrix.org> | 2020-03-30 15:02:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-30 15:02:20 +0100 |
commit | 11a8059bba9dbdc855f10e3b380c4a2f245635a2 (patch) | |
tree | 61b2ffaa53eecb6d4eef8e275f432734cbb9e606 /eduserver | |
parent | f72b759426c1895b76c16ace526cc788713c2fea (diff) |
Rename the typing server to EDU server (#948)
* Blunt move and sed rename
* Rename common/ refs to typing
* Rename internal stuff in eduserver
* Rename docs and scripts
* Rename constants/filenames, goimports everything to re-order imports
Diffstat (limited to 'eduserver')
-rw-r--r-- | eduserver/api/input.go | 83 | ||||
-rw-r--r-- | eduserver/api/output.go | 34 | ||||
-rw-r--r-- | eduserver/cache/cache.go | 180 | ||||
-rw-r--r-- | eduserver/cache/cache_test.go | 99 | ||||
-rw-r--r-- | eduserver/eduserver.go | 40 | ||||
-rw-r--r-- | eduserver/input/input.go | 107 |
6 files changed, 543 insertions, 0 deletions
diff --git a/eduserver/api/input.go b/eduserver/api/input.go new file mode 100644 index 00000000..c95acaf1 --- /dev/null +++ b/eduserver/api/input.go @@ -0,0 +1,83 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package api provides the types that are used to communicate with the typing server. +package api + +import ( + "context" + "net/http" + + commonHTTP "github.com/matrix-org/dendrite/common/http" + "github.com/matrix-org/gomatrixserverlib" + opentracing "github.com/opentracing/opentracing-go" +) + +// InputTypingEvent is an event for notifying the typing server about typing updates. +type InputTypingEvent struct { + // UserID of the user to update typing status. + UserID string `json:"user_id"` + // RoomID of the room the user is typing (or has stopped). + RoomID string `json:"room_id"` + // Typing is true if the user is typing, false if they have stopped. + Typing bool `json:"typing"` + // Timeout is the interval for which the user should be marked as typing. + Timeout int64 `json:"timeout"` + // OriginServerTS when the server received the update. + OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` +} + +// InputTypingEventRequest is a request to EDUServerInputAPI +type InputTypingEventRequest struct { + InputTypingEvent InputTypingEvent `json:"input_typing_event"` +} + +// InputTypingEventResponse is a response to InputTypingEvents +type InputTypingEventResponse struct{} + +// EDUServerInputAPI is used to write events to the typing server. +type EDUServerInputAPI interface { + InputTypingEvent( + ctx context.Context, + request *InputTypingEventRequest, + response *InputTypingEventResponse, + ) error +} + +// EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. +const EDUServerInputTypingEventPath = "/api/eduserver/input" + +// NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API. +func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) EDUServerInputAPI { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &httpEDUServerInputAPI{eduServerURL, httpClient} +} + +type httpEDUServerInputAPI struct { + eduServerURL string + httpClient *http.Client +} + +// InputRoomEvents implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputTypingEvent( + ctx context.Context, + request *InputTypingEventRequest, + response *InputTypingEventResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent") + defer span.Finish() + + apiURL := h.eduServerURL + EDUServerInputTypingEventPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/eduserver/api/output.go b/eduserver/api/output.go new file mode 100644 index 00000000..8696acf4 --- /dev/null +++ b/eduserver/api/output.go @@ -0,0 +1,34 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import "time" + +// OutputTypingEvent is an entry in typing server output kafka log. +// This contains the event with extra fields used to create 'm.typing' event +// in clientapi & federation. +type OutputTypingEvent struct { + // The Event for the typing edu event. + Event TypingEvent `json:"event"` + // ExpireTime is the interval after which the user should no longer be + // considered typing. Only available if Event.Typing is true. + ExpireTime *time.Time +} + +// TypingEvent represents a matrix edu event of type 'm.typing'. +type TypingEvent struct { + Type string `json:"type"` + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` +} diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go new file mode 100644 index 00000000..46f7a2b1 --- /dev/null +++ b/eduserver/cache/cache.go @@ -0,0 +1,180 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "sync" + "time" +) + +const defaultTypingTimeout = 10 * time.Second + +// userSet is a map of user IDs to a timer, timer fires at expiry. +type userSet map[string]*time.Timer + +// TimeoutCallbackFn is a function called right after the removal of a user +// from the typing user list due to timeout. +// latestSyncPosition is the typing sync position after the removal. +type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64) + +type roomData struct { + syncPosition int64 + userSet userSet +} + +// EDUCache maintains a list of users typing in each room. +type EDUCache struct { + sync.RWMutex + latestSyncPosition int64 + data map[string]*roomData + timeoutCallback TimeoutCallbackFn +} + +// Create a roomData with its sync position set to the latest sync position. +// Must only be called after locking the cache. +func (t *EDUCache) newRoomData() *roomData { + return &roomData{ + syncPosition: t.latestSyncPosition, + userSet: make(userSet), + } +} + +// New returns a new EDUCache initialised for use. +func New() *EDUCache { + return &EDUCache{data: make(map[string]*roomData)} +} + +// SetTimeoutCallback sets a callback function that is called right after +// a user is removed from the typing user list due to timeout. +func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) { + t.timeoutCallback = fn +} + +// GetTypingUsers returns the list of users typing in a room. +func (t *EDUCache) GetTypingUsers(roomID string) []string { + users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) + // 0 should work above because the first position used will be 1. + return users +} + +// GetTypingUsersIfUpdatedAfter returns all users typing in this room with +// updated == true if the typing sync position of the room is after the given +// position. Otherwise, returns an empty slice with updated == false. +func (t *EDUCache) GetTypingUsersIfUpdatedAfter( + roomID string, position int64, +) (users []string, updated bool) { + t.RLock() + defer t.RUnlock() + + roomData, ok := t.data[roomID] + if ok && roomData.syncPosition > position { + updated = true + userSet := roomData.userSet + users = make([]string, 0, len(userSet)) + for userID := range userSet { + users = append(users, userID) + } + } + + return +} + +// AddTypingUser sets an user as typing in a room. +// expire is the time when the user typing should time out. +// if expire is nil, defaultTypingTimeout is assumed. +// Returns the latest sync position for typing after update. +func (t *EDUCache) AddTypingUser( + userID, roomID string, expire *time.Time, +) int64 { + expireTime := getExpireTime(expire) + if until := time.Until(expireTime); until > 0 { + timer := time.AfterFunc(until, func() { + latestSyncPosition := t.RemoveUser(userID, roomID) + if t.timeoutCallback != nil { + t.timeoutCallback(userID, roomID, latestSyncPosition) + } + }) + return t.addUser(userID, roomID, timer) + } + return t.GetLatestSyncPosition() +} + +// addUser with mutex lock & replace the previous timer. +// Returns the latest typing sync position after update. +func (t *EDUCache) addUser( + userID, roomID string, expiryTimer *time.Timer, +) int64 { + t.Lock() + defer t.Unlock() + + t.latestSyncPosition++ + + if t.data[roomID] == nil { + t.data[roomID] = t.newRoomData() + } else { + t.data[roomID].syncPosition = t.latestSyncPosition + } + + // Stop the timer to cancel the call to timeoutCallback + if timer, ok := t.data[roomID].userSet[userID]; ok { + // It may happen that at this stage the timer fires, but we now have a lock on + // it. Hence the execution of timeoutCallback will happen after we unlock. So + // we may lose a typing state, though this is highly unlikely. This can be + // mitigated by keeping another time.Time in the map and checking against it + // before removing, but its occurrence is so infrequent it does not seem + // worthwhile. + timer.Stop() + } + + t.data[roomID].userSet[userID] = expiryTimer + + return t.latestSyncPosition +} + +// RemoveUser with mutex lock & stop the timer. +// Returns the latest sync position for typing after update. +func (t *EDUCache) RemoveUser(userID, roomID string) int64 { + t.Lock() + defer t.Unlock() + + roomData, ok := t.data[roomID] + if !ok { + return t.latestSyncPosition + } + + timer, ok := roomData.userSet[userID] + if !ok { + return t.latestSyncPosition + } + + timer.Stop() + delete(roomData.userSet, userID) + + t.latestSyncPosition++ + t.data[roomID].syncPosition = t.latestSyncPosition + + return t.latestSyncPosition +} + +func (t *EDUCache) GetLatestSyncPosition() int64 { + t.Lock() + defer t.Unlock() + return t.latestSyncPosition +} + +func getExpireTime(expire *time.Time) time.Time { + if expire != nil { + return *expire + } + return time.Now().Add(defaultTypingTimeout) +} diff --git a/eduserver/cache/cache_test.go b/eduserver/cache/cache_test.go new file mode 100644 index 00000000..8a1b6f79 --- /dev/null +++ b/eduserver/cache/cache_test.go @@ -0,0 +1,99 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "testing" + "time" + + "github.com/matrix-org/dendrite/common/test" +) + +func TestEDUCache(t *testing.T) { + tCache := New() + if tCache == nil { + t.Fatal("New failed") + } + + t.Run("AddTypingUser", func(t *testing.T) { + testAddTypingUser(t, tCache) + }) + + t.Run("GetTypingUsers", func(t *testing.T) { + testGetTypingUsers(t, tCache) + }) + + t.Run("RemoveUser", func(t *testing.T) { + testRemoveUser(t, tCache) + }) +} + +func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam + present := time.Now() + tests := []struct { + userID string + roomID string + expire *time.Time + }{ // Set four users typing state to room1 + {"user1", "room1", nil}, + {"user2", "room1", nil}, + {"user3", "room1", nil}, + {"user4", "room1", nil}, + //typing state with past expireTime should not take effect or removed. + {"user1", "room2", &present}, + } + + for _, tt := range tests { + tCache.AddTypingUser(tt.userID, tt.roomID, tt.expire) + } +} + +func testGetTypingUsers(t *testing.T, tCache *EDUCache) { + tests := []struct { + roomID string + wantUsers []string + }{ + {"room1", []string{"user1", "user2", "user3", "user4"}}, + {"room2", []string{}}, + } + + for _, tt := range tests { + gotUsers := tCache.GetTypingUsers(tt.roomID) + if !test.UnsortedStringSliceEqual(gotUsers, tt.wantUsers) { + t.Errorf("TypingCache.GetTypingUsers(%s) = %v, want %v", tt.roomID, gotUsers, tt.wantUsers) + } + } +} + +func testRemoveUser(t *testing.T, tCache *EDUCache) { + tests := []struct { + roomID string + userIDs []string + }{ + {"room3", []string{"user1"}}, + {"room4", []string{"user1", "user2", "user3"}}, + } + + for _, tt := range tests { + for _, userID := range tt.userIDs { + tCache.AddTypingUser(userID, tt.roomID, nil) + } + + length := len(tt.userIDs) + tCache.RemoveUser(tt.userIDs[length-1], tt.roomID) + expLeftUsers := tt.userIDs[:length-1] + if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) { + t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers) + } + } +} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go new file mode 100644 index 00000000..8ddd2c52 --- /dev/null +++ b/eduserver/eduserver.go @@ -0,0 +1,40 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eduserver + +import ( + "net/http" + + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/eduserver/input" +) + +// SetupEDUServerComponent sets up and registers HTTP handlers for the +// EDUServer component. Returns instances of the various roomserver APIs, +// allowing other components running in the same process to hit the query the +// APIs directly instead of having to use HTTP. +func SetupEDUServerComponent( + base *basecomponent.BaseDendrite, + eduCache *cache.EDUCache, +) api.EDUServerInputAPI { + inputAPI := &input.EDUServerInputAPI{ + Cache: eduCache, + Producer: base.KafkaProducer, + OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + } + + inputAPI.SetupHTTP(http.DefaultServeMux) + return inputAPI +} diff --git a/eduserver/input/input.go b/eduserver/input/input.go new file mode 100644 index 00000000..e0cc6922 --- /dev/null +++ b/eduserver/input/input.go @@ -0,0 +1,107 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package input + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "gopkg.in/Shopify/sarama.v1" +) + +// EDUServerInputAPI implements api.EDUServerInputAPI +type EDUServerInputAPI struct { + // Cache to store the current typing members in each room. + Cache *cache.EDUCache + // The kafka topic to output new typing events to. + OutputTypingEventTopic string + // kafka producer + Producer sarama.SyncProducer +} + +// InputTypingEvent implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputTypingEvent( + ctx context.Context, + request *api.InputTypingEventRequest, + response *api.InputTypingEventResponse, +) error { + ite := &request.InputTypingEvent + if ite.Typing { + // user is typing, update our current state of users typing. + expireTime := ite.OriginServerTS.Time().Add( + time.Duration(ite.Timeout) * time.Millisecond, + ) + t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime) + } else { + t.Cache.RemoveUser(ite.UserID, ite.RoomID) + } + + return t.sendEvent(ite) +} + +func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { + ev := &api.TypingEvent{ + Type: gomatrixserverlib.MTyping, + RoomID: ite.RoomID, + UserID: ite.UserID, + Typing: ite.Typing, + } + ote := &api.OutputTypingEvent{ + Event: *ev, + } + + if ev.Typing { + expireTime := ite.OriginServerTS.Time().Add( + time.Duration(ite.Timeout) * time.Millisecond, + ) + ote.ExpireTime = &expireTime + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + return err + } + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputTypingEventTopic), + Key: sarama.StringEncoder(ite.RoomID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + return err +} + +// SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux. +func (t *EDUServerInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.EDUServerInputTypingEventPath, + common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse { + var request api.InputTypingEventRequest + var response api.InputTypingEventResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) +} |