diff options
Diffstat (limited to 'eduserver')
-rw-r--r-- | eduserver/api/input.go | 42 | ||||
-rw-r--r-- | eduserver/api/output.go | 19 | ||||
-rw-r--r-- | eduserver/cache/cache.go | 17 | ||||
-rw-r--r-- | eduserver/cache/cache_test.go | 4 | ||||
-rw-r--r-- | eduserver/eduserver.go | 15 | ||||
-rw-r--r-- | eduserver/input/input.go | 98 |
6 files changed, 188 insertions, 7 deletions
diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 8b5b6d76..fa7f30cb 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -1,3 +1,7 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -37,6 +41,12 @@ type InputTypingEvent struct { OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` } +type InputSendToDeviceEvent struct { + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` + gomatrixserverlib.SendToDeviceEvent +} + // InputTypingEventRequest is a request to EDUServerInputAPI type InputTypingEventRequest struct { InputTypingEvent InputTypingEvent `json:"input_typing_event"` @@ -45,6 +55,14 @@ type InputTypingEventRequest struct { // InputTypingEventResponse is a response to InputTypingEvents type InputTypingEventResponse struct{} +// InputSendToDeviceEventRequest is a request to EDUServerInputAPI +type InputSendToDeviceEventRequest struct { + InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"` +} + +// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest +type InputSendToDeviceEventResponse struct{} + // EDUServerInputAPI is used to write events to the typing server. type EDUServerInputAPI interface { InputTypingEvent( @@ -52,11 +70,20 @@ type EDUServerInputAPI interface { request *InputTypingEventRequest, response *InputTypingEventResponse, ) error + + InputSendToDeviceEvent( + ctx context.Context, + request *InputSendToDeviceEventRequest, + response *InputSendToDeviceEventResponse, + ) error } // EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. const EDUServerInputTypingEventPath = "/eduserver/input" +// EDUServerInputSendToDeviceEventPath is the HTTP path for the InputSendToDeviceEvent API. +const EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + // NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API. func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) (EDUServerInputAPI, error) { if httpClient == nil { @@ -70,7 +97,7 @@ type httpEDUServerInputAPI struct { httpClient *http.Client } -// InputRoomEvents implements EDUServerInputAPI +// InputTypingEvent implements EDUServerInputAPI func (h *httpEDUServerInputAPI) InputTypingEvent( ctx context.Context, request *InputTypingEventRequest, @@ -82,3 +109,16 @@ func (h *httpEDUServerInputAPI) InputTypingEvent( apiURL := h.eduServerURL + EDUServerInputTypingEventPath return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +// InputSendToDeviceEvent implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputSendToDeviceEvent( + ctx context.Context, + request *InputSendToDeviceEventRequest, + response *InputSendToDeviceEventResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent") + defer span.Finish() + + apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath + return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/eduserver/api/output.go b/eduserver/api/output.go index 8696acf4..e6ded841 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -1,3 +1,7 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -12,7 +16,11 @@ package api -import "time" +import ( + "time" + + "github.com/matrix-org/gomatrixserverlib" +) // OutputTypingEvent is an entry in typing server output kafka log. // This contains the event with extra fields used to create 'm.typing' event @@ -32,3 +40,12 @@ type TypingEvent struct { UserID string `json:"user_id"` Typing bool `json:"typing"` } + +// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. +// This contains the full event content, along with the user ID and device ID +// to which it is destined. +type OutputSendToDeviceEvent struct { + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` + gomatrixserverlib.SendToDeviceEvent +} diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go index 46f7a2b1..dd535a6d 100644 --- a/eduserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -1,3 +1,7 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -109,6 +113,19 @@ func (t *EDUCache) AddTypingUser( return t.GetLatestSyncPosition() } +// AddSendToDeviceMessage increases the sync position for +// send-to-device updates. +// Returns the sync position before update, as the caller +// will use this to record the current stream position +// at the time that the send-to-device message was sent. +func (t *EDUCache) AddSendToDeviceMessage() int64 { + t.Lock() + defer t.Unlock() + latestSyncPosition := t.latestSyncPosition + t.latestSyncPosition++ + return latestSyncPosition +} + // addUser with mutex lock & replace the previous timer. // Returns the latest typing sync position after update. func (t *EDUCache) addUser( diff --git a/eduserver/cache/cache_test.go b/eduserver/cache/cache_test.go index d1b2f8bd..c7d01879 100644 --- a/eduserver/cache/cache_test.go +++ b/eduserver/cache/cache_test.go @@ -1,3 +1,7 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 14fbd332..6f664eb6 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -1,3 +1,7 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -13,6 +17,7 @@ package eduserver import ( + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/input" @@ -26,11 +31,15 @@ import ( func SetupEDUServerComponent( base *basecomponent.BaseDendrite, eduCache *cache.EDUCache, + deviceDB devices.Database, ) api.EDUServerInputAPI { inputAPI := &input.EDUServerInputAPI{ - Cache: eduCache, - Producer: base.KafkaProducer, - OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + Cache: eduCache, + DeviceDB: deviceDB, + Producer: base.KafkaProducer, + OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEvent), + ServerName: base.Cfg.Matrix.ServerName, } inputAPI.SetupHTTP(base.InternalAPIMux) diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 73777e32..4e305195 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -1,3 +1,7 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -20,11 +24,13 @@ import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // EDUServerInputAPI implements api.EDUServerInputAPI @@ -33,8 +39,14 @@ type EDUServerInputAPI struct { Cache *cache.EDUCache // The kafka topic to output new typing events to. OutputTypingEventTopic string + // The kafka topic to output new send to device events to. + OutputSendToDeviceEventTopic string // kafka producer Producer sarama.SyncProducer + // device database + DeviceDB devices.Database + // our server name + ServerName gomatrixserverlib.ServerName } // InputTypingEvent implements api.EDUServerInputAPI @@ -54,10 +66,20 @@ func (t *EDUServerInputAPI) InputTypingEvent( t.Cache.RemoveUser(ite.UserID, ite.RoomID) } - return t.sendEvent(ite) + return t.sendTypingEvent(ite) +} + +// InputTypingEvent implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputSendToDeviceEvent( + ctx context.Context, + request *api.InputSendToDeviceEventRequest, + response *api.InputSendToDeviceEventResponse, +) error { + ise := &request.InputSendToDeviceEvent + return t.sendToDeviceEvent(ise) } -func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { +func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, RoomID: ite.RoomID, @@ -90,6 +112,65 @@ func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { return err } +func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { + devices := []string{} + localpart, domain, err := gomatrixserverlib.SplitID('@', ise.UserID) + if err != nil { + return err + } + + // If the event is targeted locally then we want to expand the wildcard + // out into individual device IDs so that we can send them to each respective + // device. If the event isn't targeted locally then we can't expand the + // wildcard as we don't know about the remote devices, so instead we leave it + // as-is, so that the federation sender can send it on with the wildcard intact. + if domain == t.ServerName && ise.DeviceID == "*" { + devs, err := t.DeviceDB.GetDevicesByLocalpart(context.TODO(), localpart) + if err != nil { + return err + } + for _, dev := range devs { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, ise.DeviceID) + } + + for _, device := range devices { + ote := &api.OutputSendToDeviceEvent{ + UserID: ise.UserID, + DeviceID: device, + SendToDeviceEvent: ise.SendToDeviceEvent, + } + + logrus.WithFields(logrus.Fields{ + "user_id": ise.UserID, + "device_id": ise.DeviceID, + "event_type": ise.Type, + }).Info("handling send-to-device message") + + eventJSON, err := json.Marshal(ote) + if err != nil { + logrus.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputSendToDeviceEventTopic), + Key: sarama.StringEncoder(ote.UserID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + if err != nil { + logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } + } + + return nil +} + // SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux. func (t *EDUServerInputAPI) SetupHTTP(internalAPIMux *mux.Router) { internalAPIMux.Handle(api.EDUServerInputTypingEventPath, @@ -105,4 +186,17 @@ func (t *EDUServerInputAPI) SetupHTTP(internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(api.EDUServerInputSendToDeviceEventPath, + internal.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse { + var request api.InputSendToDeviceEventRequest + var response api.InputSendToDeviceEventResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } |