diff options
author | S7evinK <2353100+S7evinK@users.noreply.github.com> | 2022-03-29 14:14:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-29 14:14:35 +0200 |
commit | 49dc49b232432d52c082646cc6f778593f4cb8b4 (patch) | |
tree | e809deedcae2127e930b0d382c863561b61fd9d2 /clientapi | |
parent | 7972915806348847ecd9a9b8a1b1ff0609cb883c (diff) |
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer
* Move SendToDevice to producer
* Remove most parts of the EDU server
* Fix SendToDevice & copyrights
* Move structs, cleanup EDU Server traces
* Use HeadersOnly subscription
* Missing file
* Fix linter issues
* Move consumers to own files
* Rename durable consumer; Consumer cleanup
* Docs/config cleanup
Diffstat (limited to 'clientapi')
-rw-r--r-- | clientapi/clientapi.go | 13 | ||||
-rw-r--r-- | clientapi/producers/syncapi.go | 124 | ||||
-rw-r--r-- | clientapi/routing/account_data.go | 5 | ||||
-rw-r--r-- | clientapi/routing/receipt.go | 11 | ||||
-rw-r--r-- | clientapi/routing/routing.go | 12 | ||||
-rw-r--r-- | clientapi/routing/sendtodevice.go | 10 | ||||
-rw-r--r-- | clientapi/routing/sendtyping.go | 11 |
7 files changed, 149 insertions, 37 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index d0ef368d..d4b417a3 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" @@ -40,7 +39,6 @@ func AddPublicRoutes( cfg *config.ClientAPI, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, - eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, transactionsCache *transactions.Cache, fsAPI federationAPI.FederationInternalAPI, @@ -53,12 +51,17 @@ func AddPublicRoutes( js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ - JetStream: js, - Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + JetStream: js, + TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + UserAPI: userAPI, + ServerName: cfg.Matrix.ServerName, } routing.Setup( - router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI, + router, synapseAdminRouter, cfg, rsAPI, asAPI, userAPI, userDirectoryProvider, federation, syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg, diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 9ab90391..2dee04e3 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -15,24 +15,34 @@ package producers import ( + "context" "encoding/json" + "strconv" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // SyncAPIProducer produces events for the sync API server to consume type SyncAPIProducer struct { - Topic string - JetStream nats.JetStreamContext + TopicClientData string + TopicReceiptEvent string + TopicSendToDeviceEvent string + TopicTypingEvent string + JetStream nats.JetStreamContext + ServerName gomatrixserverlib.ServerName + UserAPI userapi.UserInternalAPI } // SendData sends account data to the sync API server func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error { m := &nats.Msg{ - Subject: p.Topic, + Subject: p.TopicClientData, Header: nats.Header{}, } m.Header.Set(jetstream.UserID, userID) @@ -52,8 +62,114 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string "user_id": userID, "room_id": roomID, "data_type": dataType, - }).Tracef("Producing to topic '%s'", p.Topic) + }).Tracef("Producing to topic '%s'", p.TopicClientData) _, err = p.JetStream.PublishMsg(m) return err } + +func (p *SyncAPIProducer) SendReceipt( + ctx context.Context, + userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp, +) error { + m := &nats.Msg{ + Subject: p.TopicReceiptEvent, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + m.Header.Set(jetstream.EventID, eventID) + m.Header.Set("type", receiptType) + m.Header.Set("timestamp", strconv.Itoa(int(timestamp))) + + log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent) + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} + +func (p *SyncAPIProducer) SendToDevice( + ctx context.Context, sender, userID, deviceID, eventType string, + message interface{}, +) error { + devices := []string{} + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return err + } + + // If the event is targeted locally then we want to expand the wildcard + // out into individual device IDs so that we can send them to each respective + // device. If the event isn't targeted locally then we can't expand the + // wildcard as we don't know about the remote devices, so instead we leave it + // as-is, so that the federation sender can send it on with the wildcard intact. + if domain == p.ServerName && deviceID == "*" { + var res userapi.QueryDevicesResponse + err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ + UserID: userID, + }, &res) + if err != nil { + return err + } + for _, dev := range res.Devices { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, deviceID) + } + + js, err := json.Marshal(message) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "user_id": userID, + "num_devices": len(devices), + "type": eventType, + }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) + for _, device := range devices { + ote := &types.OutputSendToDeviceEvent{ + UserID: userID, + DeviceID: device, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + Sender: sender, + Type: eventType, + Content: js, + }, + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + log.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + m := &nats.Msg{ + Subject: p.TopicSendToDeviceEvent, + Data: eventJSON, + Header: nats.Header{}, + } + m.Header.Set("sender", sender) + m.Header.Set(jetstream.UserID, userID) + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { + log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } + } + return nil +} + +func (p *SyncAPIProducer) SendTyping( + ctx context.Context, userID, roomID string, typing bool, timeoutMS int64, +) error { + m := &nats.Msg{ + Subject: p.TopicTypingEvent, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + m.Header.Set("typing", strconv.FormatBool(typing)) + m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS))) + + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index d8e98269..873ffaf5 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/eventutil" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/userapi/api" @@ -146,7 +145,7 @@ type fullyReadEvent struct { // SaveReadMarker implements POST /rooms/{roomId}/read_markers func SaveReadMarker( req *http.Request, - userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, + userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, syncProducer *producers.SyncAPIProducer, device *api.Device, roomID string, ) util.JSONResponse { // Verify that the user is a member of this room @@ -192,7 +191,7 @@ func SaveReadMarker( // Handle the read receipt that may be included in the read marker if r.Read != "" { - return SetReceipt(req, eduAPI, device, roomID, "m.read", r.Read) + return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read) } return util.JSONResponse{ diff --git a/clientapi/routing/receipt.go b/clientapi/routing/receipt.go index fe8fe765..0f9b1b4f 100644 --- a/clientapi/routing/receipt.go +++ b/clientapi/routing/receipt.go @@ -19,21 +19,20 @@ import ( "net/http" "time" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/eduserver/api" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" "github.com/sirupsen/logrus" ) -func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse { +func SetReceipt(req *http.Request, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse { timestamp := gomatrixserverlib.AsTimestamp(time.Now()) logrus.WithFields(logrus.Fields{ - "roomId": roomId, + "roomID": roomID, "receiptType": receiptType, - "eventId": eventId, + "eventID": eventID, "userId": device.UserID, "timestamp": timestamp, }).Debug("Setting receipt") @@ -43,7 +42,7 @@ func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType)) } - if err := api.SendReceipt(req.Context(), eduAPI, device.UserID, roomId, eventId, receiptType, timestamp); err != nil { + if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil { return util.ErrorResponse(err) } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 218087bb..8afaba56 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -26,7 +26,6 @@ import ( clientutil "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" - eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/transactions" @@ -47,7 +46,6 @@ import ( // nolint: gocyclo func Setup( publicAPIMux, synapseAdminRouter *mux.Router, cfg *config.ClientAPI, - eduAPI eduServerAPI.EDUServerInputAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, userAPI userapi.UserInternalAPI, @@ -467,7 +465,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], eduAPI, rsAPI) + return SendTyping(req, device, vars["roomID"], vars["userID"], rsAPI, syncProducer) }), ).Methods(http.MethodPut, http.MethodOptions) v3mux.Handle("/rooms/{roomID}/redact/{eventID}", @@ -496,7 +494,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -510,7 +508,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -942,7 +940,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SaveReadMarker(req, userAPI, rsAPI, eduAPI, syncProducer, device, vars["roomID"]) + return SaveReadMarker(req, userAPI, rsAPI, syncProducer, device, vars["roomID"]) }), ).Methods(http.MethodPost, http.MethodOptions) @@ -1297,7 +1295,7 @@ func Setup( return util.ErrorResponse(err) } - return SetReceipt(req, eduAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"]) + return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"]) }), ).Methods(http.MethodPost, http.MethodOptions) } diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 768e8e0e..4a5f0888 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -18,17 +18,17 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/transactions" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" ) // SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId} -// sends the device events to the EDU Server +// sends the device events to the syncapi & federationsender func SendToDevice( req *http.Request, device *userapi.Device, - eduAPI api.EDUServerInputAPI, + syncProducer *producers.SyncAPIProducer, txnCache *transactions.Cache, eventType string, txnID *string, ) util.JSONResponse { @@ -48,8 +48,8 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { - if err := api.SendToDevice( - req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, + if err := syncProducer.SendToDevice( + req.Context(), device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index abd2061a..6a27ee61 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -17,7 +17,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/clientapi/producers" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" @@ -32,9 +32,8 @@ type typingContentJSON struct { // sends the typing events to client API typingProducer func SendTyping( req *http.Request, device *userapi.Device, roomID string, - userID string, - eduAPI api.EDUServerInputAPI, - rsAPI roomserverAPI.RoomserverInternalAPI, + userID string, rsAPI roomserverAPI.RoomserverInternalAPI, + syncProducer *producers.SyncAPIProducer, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -56,9 +55,7 @@ func SendTyping( return *resErr } - if err := api.SendTyping( - req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout, - ); err != nil { + if err := syncProducer.SendTyping(req.Context(), userID, roomID, r.Typing, r.Timeout); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() } |