aboutsummaryrefslogtreecommitdiff
path: root/eduserver
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-03-30 15:02:20 +0100
committerGitHub <noreply@github.com>2020-03-30 15:02:20 +0100
commit11a8059bba9dbdc855f10e3b380c4a2f245635a2 (patch)
tree61b2ffaa53eecb6d4eef8e275f432734cbb9e606 /eduserver
parentf72b759426c1895b76c16ace526cc788713c2fea (diff)
Rename the typing server to EDU server (#948)
* Blunt move and sed rename * Rename common/ refs to typing * Rename internal stuff in eduserver * Rename docs and scripts * Rename constants/filenames, goimports everything to re-order imports
Diffstat (limited to 'eduserver')
-rw-r--r--eduserver/api/input.go83
-rw-r--r--eduserver/api/output.go34
-rw-r--r--eduserver/cache/cache.go180
-rw-r--r--eduserver/cache/cache_test.go99
-rw-r--r--eduserver/eduserver.go40
-rw-r--r--eduserver/input/input.go107
6 files changed, 543 insertions, 0 deletions
diff --git a/eduserver/api/input.go b/eduserver/api/input.go
new file mode 100644
index 00000000..c95acaf1
--- /dev/null
+++ b/eduserver/api/input.go
@@ -0,0 +1,83 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package api provides the types that are used to communicate with the typing server.
+package api
+
+import (
+ "context"
+ "net/http"
+
+ commonHTTP "github.com/matrix-org/dendrite/common/http"
+ "github.com/matrix-org/gomatrixserverlib"
+ opentracing "github.com/opentracing/opentracing-go"
+)
+
+// InputTypingEvent is an event for notifying the typing server about typing updates.
+type InputTypingEvent struct {
+ // UserID of the user to update typing status.
+ UserID string `json:"user_id"`
+ // RoomID of the room the user is typing (or has stopped).
+ RoomID string `json:"room_id"`
+ // Typing is true if the user is typing, false if they have stopped.
+ Typing bool `json:"typing"`
+ // Timeout is the interval for which the user should be marked as typing.
+ Timeout int64 `json:"timeout"`
+ // OriginServerTS when the server received the update.
+ OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
+}
+
+// InputTypingEventRequest is a request to EDUServerInputAPI
+type InputTypingEventRequest struct {
+ InputTypingEvent InputTypingEvent `json:"input_typing_event"`
+}
+
+// InputTypingEventResponse is a response to InputTypingEvents
+type InputTypingEventResponse struct{}
+
+// EDUServerInputAPI is used to write events to the typing server.
+type EDUServerInputAPI interface {
+ InputTypingEvent(
+ ctx context.Context,
+ request *InputTypingEventRequest,
+ response *InputTypingEventResponse,
+ ) error
+}
+
+// EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
+const EDUServerInputTypingEventPath = "/api/eduserver/input"
+
+// NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
+func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) EDUServerInputAPI {
+ if httpClient == nil {
+ httpClient = http.DefaultClient
+ }
+ return &httpEDUServerInputAPI{eduServerURL, httpClient}
+}
+
+type httpEDUServerInputAPI struct {
+ eduServerURL string
+ httpClient *http.Client
+}
+
+// InputRoomEvents implements EDUServerInputAPI
+func (h *httpEDUServerInputAPI) InputTypingEvent(
+ ctx context.Context,
+ request *InputTypingEventRequest,
+ response *InputTypingEventResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
+ defer span.Finish()
+
+ apiURL := h.eduServerURL + EDUServerInputTypingEventPath
+ return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
diff --git a/eduserver/api/output.go b/eduserver/api/output.go
new file mode 100644
index 00000000..8696acf4
--- /dev/null
+++ b/eduserver/api/output.go
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package api
+
+import "time"
+
+// OutputTypingEvent is an entry in typing server output kafka log.
+// This contains the event with extra fields used to create 'm.typing' event
+// in clientapi & federation.
+type OutputTypingEvent struct {
+ // The Event for the typing edu event.
+ Event TypingEvent `json:"event"`
+ // ExpireTime is the interval after which the user should no longer be
+ // considered typing. Only available if Event.Typing is true.
+ ExpireTime *time.Time
+}
+
+// TypingEvent represents a matrix edu event of type 'm.typing'.
+type TypingEvent struct {
+ Type string `json:"type"`
+ RoomID string `json:"room_id"`
+ UserID string `json:"user_id"`
+ Typing bool `json:"typing"`
+}
diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go
new file mode 100644
index 00000000..46f7a2b1
--- /dev/null
+++ b/eduserver/cache/cache.go
@@ -0,0 +1,180 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cache
+
+import (
+ "sync"
+ "time"
+)
+
+const defaultTypingTimeout = 10 * time.Second
+
+// userSet is a map of user IDs to a timer, timer fires at expiry.
+type userSet map[string]*time.Timer
+
+// TimeoutCallbackFn is a function called right after the removal of a user
+// from the typing user list due to timeout.
+// latestSyncPosition is the typing sync position after the removal.
+type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64)
+
+type roomData struct {
+ syncPosition int64
+ userSet userSet
+}
+
+// EDUCache maintains a list of users typing in each room.
+type EDUCache struct {
+ sync.RWMutex
+ latestSyncPosition int64
+ data map[string]*roomData
+ timeoutCallback TimeoutCallbackFn
+}
+
+// Create a roomData with its sync position set to the latest sync position.
+// Must only be called after locking the cache.
+func (t *EDUCache) newRoomData() *roomData {
+ return &roomData{
+ syncPosition: t.latestSyncPosition,
+ userSet: make(userSet),
+ }
+}
+
+// New returns a new EDUCache initialised for use.
+func New() *EDUCache {
+ return &EDUCache{data: make(map[string]*roomData)}
+}
+
+// SetTimeoutCallback sets a callback function that is called right after
+// a user is removed from the typing user list due to timeout.
+func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
+ t.timeoutCallback = fn
+}
+
+// GetTypingUsers returns the list of users typing in a room.
+func (t *EDUCache) GetTypingUsers(roomID string) []string {
+ users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
+ // 0 should work above because the first position used will be 1.
+ return users
+}
+
+// GetTypingUsersIfUpdatedAfter returns all users typing in this room with
+// updated == true if the typing sync position of the room is after the given
+// position. Otherwise, returns an empty slice with updated == false.
+func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
+ roomID string, position int64,
+) (users []string, updated bool) {
+ t.RLock()
+ defer t.RUnlock()
+
+ roomData, ok := t.data[roomID]
+ if ok && roomData.syncPosition > position {
+ updated = true
+ userSet := roomData.userSet
+ users = make([]string, 0, len(userSet))
+ for userID := range userSet {
+ users = append(users, userID)
+ }
+ }
+
+ return
+}
+
+// AddTypingUser sets an user as typing in a room.
+// expire is the time when the user typing should time out.
+// if expire is nil, defaultTypingTimeout is assumed.
+// Returns the latest sync position for typing after update.
+func (t *EDUCache) AddTypingUser(
+ userID, roomID string, expire *time.Time,
+) int64 {
+ expireTime := getExpireTime(expire)
+ if until := time.Until(expireTime); until > 0 {
+ timer := time.AfterFunc(until, func() {
+ latestSyncPosition := t.RemoveUser(userID, roomID)
+ if t.timeoutCallback != nil {
+ t.timeoutCallback(userID, roomID, latestSyncPosition)
+ }
+ })
+ return t.addUser(userID, roomID, timer)
+ }
+ return t.GetLatestSyncPosition()
+}
+
+// addUser with mutex lock & replace the previous timer.
+// Returns the latest typing sync position after update.
+func (t *EDUCache) addUser(
+ userID, roomID string, expiryTimer *time.Timer,
+) int64 {
+ t.Lock()
+ defer t.Unlock()
+
+ t.latestSyncPosition++
+
+ if t.data[roomID] == nil {
+ t.data[roomID] = t.newRoomData()
+ } else {
+ t.data[roomID].syncPosition = t.latestSyncPosition
+ }
+
+ // Stop the timer to cancel the call to timeoutCallback
+ if timer, ok := t.data[roomID].userSet[userID]; ok {
+ // It may happen that at this stage the timer fires, but we now have a lock on
+ // it. Hence the execution of timeoutCallback will happen after we unlock. So
+ // we may lose a typing state, though this is highly unlikely. This can be
+ // mitigated by keeping another time.Time in the map and checking against it
+ // before removing, but its occurrence is so infrequent it does not seem
+ // worthwhile.
+ timer.Stop()
+ }
+
+ t.data[roomID].userSet[userID] = expiryTimer
+
+ return t.latestSyncPosition
+}
+
+// RemoveUser with mutex lock & stop the timer.
+// Returns the latest sync position for typing after update.
+func (t *EDUCache) RemoveUser(userID, roomID string) int64 {
+ t.Lock()
+ defer t.Unlock()
+
+ roomData, ok := t.data[roomID]
+ if !ok {
+ return t.latestSyncPosition
+ }
+
+ timer, ok := roomData.userSet[userID]
+ if !ok {
+ return t.latestSyncPosition
+ }
+
+ timer.Stop()
+ delete(roomData.userSet, userID)
+
+ t.latestSyncPosition++
+ t.data[roomID].syncPosition = t.latestSyncPosition
+
+ return t.latestSyncPosition
+}
+
+func (t *EDUCache) GetLatestSyncPosition() int64 {
+ t.Lock()
+ defer t.Unlock()
+ return t.latestSyncPosition
+}
+
+func getExpireTime(expire *time.Time) time.Time {
+ if expire != nil {
+ return *expire
+ }
+ return time.Now().Add(defaultTypingTimeout)
+}
diff --git a/eduserver/cache/cache_test.go b/eduserver/cache/cache_test.go
new file mode 100644
index 00000000..8a1b6f79
--- /dev/null
+++ b/eduserver/cache/cache_test.go
@@ -0,0 +1,99 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cache
+
+import (
+ "testing"
+ "time"
+
+ "github.com/matrix-org/dendrite/common/test"
+)
+
+func TestEDUCache(t *testing.T) {
+ tCache := New()
+ if tCache == nil {
+ t.Fatal("New failed")
+ }
+
+ t.Run("AddTypingUser", func(t *testing.T) {
+ testAddTypingUser(t, tCache)
+ })
+
+ t.Run("GetTypingUsers", func(t *testing.T) {
+ testGetTypingUsers(t, tCache)
+ })
+
+ t.Run("RemoveUser", func(t *testing.T) {
+ testRemoveUser(t, tCache)
+ })
+}
+
+func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam
+ present := time.Now()
+ tests := []struct {
+ userID string
+ roomID string
+ expire *time.Time
+ }{ // Set four users typing state to room1
+ {"user1", "room1", nil},
+ {"user2", "room1", nil},
+ {"user3", "room1", nil},
+ {"user4", "room1", nil},
+ //typing state with past expireTime should not take effect or removed.
+ {"user1", "room2", &present},
+ }
+
+ for _, tt := range tests {
+ tCache.AddTypingUser(tt.userID, tt.roomID, tt.expire)
+ }
+}
+
+func testGetTypingUsers(t *testing.T, tCache *EDUCache) {
+ tests := []struct {
+ roomID string
+ wantUsers []string
+ }{
+ {"room1", []string{"user1", "user2", "user3", "user4"}},
+ {"room2", []string{}},
+ }
+
+ for _, tt := range tests {
+ gotUsers := tCache.GetTypingUsers(tt.roomID)
+ if !test.UnsortedStringSliceEqual(gotUsers, tt.wantUsers) {
+ t.Errorf("TypingCache.GetTypingUsers(%s) = %v, want %v", tt.roomID, gotUsers, tt.wantUsers)
+ }
+ }
+}
+
+func testRemoveUser(t *testing.T, tCache *EDUCache) {
+ tests := []struct {
+ roomID string
+ userIDs []string
+ }{
+ {"room3", []string{"user1"}},
+ {"room4", []string{"user1", "user2", "user3"}},
+ }
+
+ for _, tt := range tests {
+ for _, userID := range tt.userIDs {
+ tCache.AddTypingUser(userID, tt.roomID, nil)
+ }
+
+ length := len(tt.userIDs)
+ tCache.RemoveUser(tt.userIDs[length-1], tt.roomID)
+ expLeftUsers := tt.userIDs[:length-1]
+ if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) {
+ t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers)
+ }
+ }
+}
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
new file mode 100644
index 00000000..8ddd2c52
--- /dev/null
+++ b/eduserver/eduserver.go
@@ -0,0 +1,40 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package eduserver
+
+import (
+ "net/http"
+
+ "github.com/matrix-org/dendrite/common/basecomponent"
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/eduserver/input"
+)
+
+// SetupEDUServerComponent sets up and registers HTTP handlers for the
+// EDUServer component. Returns instances of the various roomserver APIs,
+// allowing other components running in the same process to hit the query the
+// APIs directly instead of having to use HTTP.
+func SetupEDUServerComponent(
+ base *basecomponent.BaseDendrite,
+ eduCache *cache.EDUCache,
+) api.EDUServerInputAPI {
+ inputAPI := &input.EDUServerInputAPI{
+ Cache: eduCache,
+ Producer: base.KafkaProducer,
+ OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
+ }
+
+ inputAPI.SetupHTTP(http.DefaultServeMux)
+ return inputAPI
+}
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
new file mode 100644
index 00000000..e0cc6922
--- /dev/null
+++ b/eduserver/input/input.go
@@ -0,0 +1,107 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package input
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "gopkg.in/Shopify/sarama.v1"
+)
+
+// EDUServerInputAPI implements api.EDUServerInputAPI
+type EDUServerInputAPI struct {
+ // Cache to store the current typing members in each room.
+ Cache *cache.EDUCache
+ // The kafka topic to output new typing events to.
+ OutputTypingEventTopic string
+ // kafka producer
+ Producer sarama.SyncProducer
+}
+
+// InputTypingEvent implements api.EDUServerInputAPI
+func (t *EDUServerInputAPI) InputTypingEvent(
+ ctx context.Context,
+ request *api.InputTypingEventRequest,
+ response *api.InputTypingEventResponse,
+) error {
+ ite := &request.InputTypingEvent
+ if ite.Typing {
+ // user is typing, update our current state of users typing.
+ expireTime := ite.OriginServerTS.Time().Add(
+ time.Duration(ite.Timeout) * time.Millisecond,
+ )
+ t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
+ } else {
+ t.Cache.RemoveUser(ite.UserID, ite.RoomID)
+ }
+
+ return t.sendEvent(ite)
+}
+
+func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
+ ev := &api.TypingEvent{
+ Type: gomatrixserverlib.MTyping,
+ RoomID: ite.RoomID,
+ UserID: ite.UserID,
+ Typing: ite.Typing,
+ }
+ ote := &api.OutputTypingEvent{
+ Event: *ev,
+ }
+
+ if ev.Typing {
+ expireTime := ite.OriginServerTS.Time().Add(
+ time.Duration(ite.Timeout) * time.Millisecond,
+ )
+ ote.ExpireTime = &expireTime
+ }
+
+ eventJSON, err := json.Marshal(ote)
+ if err != nil {
+ return err
+ }
+
+ m := &sarama.ProducerMessage{
+ Topic: string(t.OutputTypingEventTopic),
+ Key: sarama.StringEncoder(ite.RoomID),
+ Value: sarama.ByteEncoder(eventJSON),
+ }
+
+ _, _, err = t.Producer.SendMessage(m)
+ return err
+}
+
+// SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux.
+func (t *EDUServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
+ servMux.Handle(api.EDUServerInputTypingEventPath,
+ common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
+ var request api.InputTypingEventRequest
+ var response api.InputTypingEventResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
+}