diff options
author | Matthew Hodgson <matthew@matrix.org> | 2020-09-10 14:39:18 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-10 14:39:18 +0100 |
commit | 39507bacc3dbfc532e0d69b42957c87f27af4c77 (patch) | |
tree | 7ad845e1b25e03e7b7d7cd2d49278fe843c2ff86 /roomserver | |
parent | 35564dd73c48b16b97cd1a972a9b9bc65ec6d7ef (diff) |
Peeking via MSC2753 (#1370)
Initial implementation of MSC2753, as tested by https://github.com/matrix-org/sytest/pull/944.
Doesn't yet handle unpeeks, peeked EDUs, or history viz changing during a peek - these will follow.
https://github.com/matrix-org/dendrite/pull/1370 has full details.
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/api/api.go | 6 | ||||
-rw-r--r-- | roomserver/api/api_trace.go | 9 | ||||
-rw-r--r-- | roomserver/api/output.go | 15 | ||||
-rw-r--r-- | roomserver/api/perform.go | 14 | ||||
-rw-r--r-- | roomserver/internal/api.go | 8 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_peek.go | 206 | ||||
-rw-r--r-- | roomserver/inthttp/client.go | 18 | ||||
-rw-r--r-- | roomserver/inthttp/server.go | 11 | ||||
-rw-r--r-- | roomserver/storage/shared/storage.go | 4 |
9 files changed, 288 insertions, 3 deletions
diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 96bdc767..eecefe32 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -36,6 +36,12 @@ type RoomserverInternalAPI interface { res *PerformLeaveResponse, ) error + PerformPeek( + ctx context.Context, + req *PerformPeekRequest, + res *PerformPeekResponse, + ) + PerformPublish( ctx context.Context, req *PerformPublishRequest, diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 25da2e8e..64330930 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -38,6 +38,15 @@ func (t *RoomserverInternalAPITrace) PerformInvite( return t.Impl.PerformInvite(ctx, req, res) } +func (t *RoomserverInternalAPITrace) PerformPeek( + ctx context.Context, + req *PerformPeekRequest, + res *PerformPeekResponse, +) { + t.Impl.PerformPeek(ctx, req, res) + util.GetLogger(ctx).Infof("PerformPeek req=%+v res=%+v", js(req), js(res)) +} + func (t *RoomserverInternalAPITrace) PerformJoin( ctx context.Context, req *PerformJoinRequest, diff --git a/roomserver/api/output.go b/roomserver/api/output.go index d6c09f9e..013ebdc8 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -46,6 +46,9 @@ const ( // - Redact the event and set the corresponding `unsigned` fields to indicate it as redacted. // - Replace the event in the database. OutputTypeRedactedEvent OutputType = "redacted_event" + + // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek + OutputTypeNewPeek OutputType = "new_peek" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -59,8 +62,10 @@ type OutputEvent struct { NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` - // The content of event with type OutputTypeRedactedEvent + // The content of event with type OutputTypeRedactedEvent RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` + // The content of event with type OutputTypeNewPeek + NewPeek *OutputNewPeek `json:"new_peek,omitempty"` } // An OutputNewRoomEvent is written when the roomserver receives a new event. @@ -195,3 +200,11 @@ type OutputRedactedEvent struct { // The value of `unsigned.redacted_because` - the redaction event itself RedactedBecause gomatrixserverlib.HeaderedEvent } + +// An OutputNewPeek is written whenever a user starts peeking into a room +// using a given device. +type OutputNewPeek struct { + RoomID string + UserID string + DeviceID string +} diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 24e958bb..0c2d96a7 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -108,6 +108,20 @@ type PerformInviteResponse struct { Error *PerformError } +type PerformPeekRequest struct { + RoomIDOrAlias string `json:"room_id_or_alias"` + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` + ServerNames []gomatrixserverlib.ServerName `json:"server_names"` +} + +type PerformPeekResponse struct { + // The room ID, populated on success. + RoomID string `json:"room_id"` + // If non-nil, the join request failed. Contains more information why it failed. + Error *PerformError +} + // PerformBackfillRequest is a request to PerformBackfill. type PerformBackfillRequest struct { // The room to backfill diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 4299dd47..8dc1a170 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -22,6 +22,7 @@ type RoomserverInternalAPI struct { *query.Queryer *perform.Inviter *perform.Joiner + *perform.Peeker *perform.Leaver *perform.Publisher *perform.Backfiller @@ -83,6 +84,13 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen FSAPI: r.fsAPI, Inputer: r.Inputer, } + r.Peeker = &perform.Peeker{ + ServerName: r.Cfg.Matrix.ServerName, + Cfg: r.Cfg, + DB: r.DB, + FSAPI: r.fsAPI, + Inputer: r.Inputer, + } r.Leaver = &perform.Leaver{ Cfg: r.Cfg, DB: r.DB, diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go new file mode 100644 index 00000000..ab6d17b0 --- /dev/null +++ b/roomserver/internal/perform/perform_peek.go @@ -0,0 +1,206 @@ +// Copyright 2020 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package perform + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + fsAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal/input" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +type Peeker struct { + ServerName gomatrixserverlib.ServerName + Cfg *config.RoomServer + FSAPI fsAPI.FederationSenderInternalAPI + DB storage.Database + + Inputer *input.Inputer +} + +// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationsender. +func (r *Peeker) PerformPeek( + ctx context.Context, + req *api.PerformPeekRequest, + res *api.PerformPeekResponse, +) { + roomID, err := r.performPeek(ctx, req) + if err != nil { + perr, ok := err.(*api.PerformError) + if ok { + res.Error = perr + } else { + res.Error = &api.PerformError{ + Msg: err.Error(), + } + } + } + res.RoomID = roomID +} + +func (r *Peeker) performPeek( + ctx context.Context, + req *api.PerformPeekRequest, +) (string, error) { + // FIXME: there's way too much duplication with performJoin + _, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID), + } + } + if domain != r.Cfg.Matrix.ServerName { + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID), + } + } + if strings.HasPrefix(req.RoomIDOrAlias, "!") { + return r.performPeekRoomByID(ctx, req) + } + if strings.HasPrefix(req.RoomIDOrAlias, "#") { + return r.performPeekRoomByAlias(ctx, req) + } + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Room ID or alias %q is invalid", req.RoomIDOrAlias), + } +} + +func (r *Peeker) performPeekRoomByAlias( + ctx context.Context, + req *api.PerformPeekRequest, +) (string, error) { + // Get the domain part of the room alias. + _, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias) + if err != nil { + return "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias) + } + req.ServerNames = append(req.ServerNames, domain) + + // Check if this alias matches our own server configuration. If it + // doesn't then we'll need to try a federated peek. + var roomID string + if domain != r.Cfg.Matrix.ServerName { + // The alias isn't owned by us, so we will need to try peeking using + // a remote server. + dirReq := fsAPI.PerformDirectoryLookupRequest{ + RoomAlias: req.RoomIDOrAlias, // the room alias to lookup + ServerName: domain, // the server to ask + } + dirRes := fsAPI.PerformDirectoryLookupResponse{} + err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes) + if err != nil { + logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias) + return "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err) + } + roomID = dirRes.RoomID + req.ServerNames = append(req.ServerNames, dirRes.ServerNames...) + } else { + // Otherwise, look up if we know this room alias locally. + roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias) + if err != nil { + return "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err) + } + } + + // If the room ID is empty then we failed to look up the alias. + if roomID == "" { + return "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias) + } + + // If we do, then pluck out the room ID and continue the peek. + req.RoomIDOrAlias = roomID + return r.performPeekRoomByID(ctx, req) +} + +func (r *Peeker) performPeekRoomByID( + ctx context.Context, + req *api.PerformPeekRequest, +) (roomID string, err error) { + roomID = req.RoomIDOrAlias + + // Get the domain part of the room ID. + _, domain, err := gomatrixserverlib.SplitID('!', roomID) + if err != nil { + return "", &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Room ID %q is invalid: %s", roomID, err), + } + } + + // If the server name in the room ID isn't ours then it's a + // possible candidate for finding the room via federation. Add + // it to the list of servers to try. + if domain != r.Cfg.Matrix.ServerName { + req.ServerNames = append(req.ServerNames, domain) + } + + // If this room isn't world_readable, we reject. + // XXX: would be nicer to call this with NIDs + // XXX: we should probably factor out history_visibility checks into a common utility method somewhere + // which handles the default value etc. + var worldReadable = false + ev, _ := r.DB.GetStateEvent(ctx, roomID, "m.room.history_visibility", "") + if ev != nil { + content := map[string]string{} + if err = json.Unmarshal(ev.Content(), &content); err != nil { + util.GetLogger(ctx).WithError(err).Error("json.Unmarshal for history visibility failed") + return + } + if visibility, ok := content["history_visibility"]; ok { + worldReadable = visibility == "world_readable" + } + } + + if !worldReadable { + return "", &api.PerformError{ + Code: api.PerformErrorNotAllowed, + Msg: "Room is not world-readable", + } + } + + // TODO: handle federated peeks + + err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ + { + Type: api.OutputTypeNewPeek, + NewPeek: &api.OutputNewPeek{ + RoomID: roomID, + UserID: req.UserID, + DeviceID: req.DeviceID, + }, + }, + }) + if err != nil { + return + } + + // By this point, if req.RoomIDOrAlias contained an alias, then + // it will have been overwritten with a room ID by performPeekRoomByAlias. + // We should now include this in the response so that the CS API can + // return the right room ID. + return roomID, nil +} diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index b414b0d8..1ff1fc82 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -26,6 +26,7 @@ const ( // Perform operations RoomserverPerformInvitePath = "/roomserver/performInvite" + RoomserverPerformPeekPath = "/roomserver/performPeek" RoomserverPerformJoinPath = "/roomserver/performJoin" RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformBackfillPath = "/roomserver/performBackfill" @@ -185,6 +186,23 @@ func (h *httpRoomserverInternalAPI) PerformJoin( } } +func (h *httpRoomserverInternalAPI) PerformPeek( + ctx context.Context, + request *api.PerformPeekRequest, + response *api.PerformPeekResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPeek") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverPerformPeekPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + response.Error = &api.PerformError{ + Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err), + } + } +} + func (h *httpRoomserverInternalAPI) PerformLeave( ctx context.Context, request *api.PerformLeaveRequest, diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index 97f2a360..5816d4d8 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -63,6 +63,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(RoomserverPerformPeekPath, + httputil.MakeInternalAPI("performPeek", func(req *http.Request) util.JSONResponse { + var request api.PerformPeekRequest + var response api.PerformPeekResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + r.PerformPeek(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle(RoomserverPerformPublishPath, httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse { var request api.PerformPublishRequest diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 5c18c725..262b0f2f 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -359,7 +359,7 @@ func (d *Database) MembershipUpdater( var updater *MembershipUpdater _ = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { updater, err = NewMembershipUpdater(ctx, d, txn, roomID, targetUserID, targetLocal, roomVersion) - return nil + return err }) return updater, err } @@ -374,7 +374,7 @@ func (d *Database) GetLatestEventsForUpdate( var updater *LatestEventsUpdater _ = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { updater, err = NewLatestEventsUpdater(ctx, d, txn, roomInfo) - return nil + return err }) return updater, err } |