aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2020-09-10 14:39:18 +0100
committerGitHub <noreply@github.com>2020-09-10 14:39:18 +0100
commit39507bacc3dbfc532e0d69b42957c87f27af4c77 (patch)
tree7ad845e1b25e03e7b7d7cd2d49278fe843c2ff86 /roomserver
parent35564dd73c48b16b97cd1a972a9b9bc65ec6d7ef (diff)
Peeking via MSC2753 (#1370)
Initial implementation of MSC2753, as tested by https://github.com/matrix-org/sytest/pull/944. Doesn't yet handle unpeeks, peeked EDUs, or history viz changing during a peek - these will follow. https://github.com/matrix-org/dendrite/pull/1370 has full details.
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/api.go6
-rw-r--r--roomserver/api/api_trace.go9
-rw-r--r--roomserver/api/output.go15
-rw-r--r--roomserver/api/perform.go14
-rw-r--r--roomserver/internal/api.go8
-rw-r--r--roomserver/internal/perform/perform_peek.go206
-rw-r--r--roomserver/inthttp/client.go18
-rw-r--r--roomserver/inthttp/server.go11
-rw-r--r--roomserver/storage/shared/storage.go4
9 files changed, 288 insertions, 3 deletions
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index 96bdc767..eecefe32 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -36,6 +36,12 @@ type RoomserverInternalAPI interface {
res *PerformLeaveResponse,
) error
+ PerformPeek(
+ ctx context.Context,
+ req *PerformPeekRequest,
+ res *PerformPeekResponse,
+ )
+
PerformPublish(
ctx context.Context,
req *PerformPublishRequest,
diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go
index 25da2e8e..64330930 100644
--- a/roomserver/api/api_trace.go
+++ b/roomserver/api/api_trace.go
@@ -38,6 +38,15 @@ func (t *RoomserverInternalAPITrace) PerformInvite(
return t.Impl.PerformInvite(ctx, req, res)
}
+func (t *RoomserverInternalAPITrace) PerformPeek(
+ ctx context.Context,
+ req *PerformPeekRequest,
+ res *PerformPeekResponse,
+) {
+ t.Impl.PerformPeek(ctx, req, res)
+ util.GetLogger(ctx).Infof("PerformPeek req=%+v res=%+v", js(req), js(res))
+}
+
func (t *RoomserverInternalAPITrace) PerformJoin(
ctx context.Context,
req *PerformJoinRequest,
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index d6c09f9e..013ebdc8 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -46,6 +46,9 @@ const (
// - Redact the event and set the corresponding `unsigned` fields to indicate it as redacted.
// - Replace the event in the database.
OutputTypeRedactedEvent OutputType = "redacted_event"
+
+ // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
+ OutputTypeNewPeek OutputType = "new_peek"
)
// An OutputEvent is an entry in the roomserver output kafka log.
@@ -59,8 +62,10 @@ type OutputEvent struct {
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
- // The content of event with type OutputTypeRedactedEvent
+ // The content of event with type OutputTypeRedactedEvent
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
+ // The content of event with type OutputTypeNewPeek
+ NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
}
// An OutputNewRoomEvent is written when the roomserver receives a new event.
@@ -195,3 +200,11 @@ type OutputRedactedEvent struct {
// The value of `unsigned.redacted_because` - the redaction event itself
RedactedBecause gomatrixserverlib.HeaderedEvent
}
+
+// An OutputNewPeek is written whenever a user starts peeking into a room
+// using a given device.
+type OutputNewPeek struct {
+ RoomID string
+ UserID string
+ DeviceID string
+}
diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go
index 24e958bb..0c2d96a7 100644
--- a/roomserver/api/perform.go
+++ b/roomserver/api/perform.go
@@ -108,6 +108,20 @@ type PerformInviteResponse struct {
Error *PerformError
}
+type PerformPeekRequest struct {
+ RoomIDOrAlias string `json:"room_id_or_alias"`
+ UserID string `json:"user_id"`
+ DeviceID string `json:"device_id"`
+ ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
+}
+
+type PerformPeekResponse struct {
+ // The room ID, populated on success.
+ RoomID string `json:"room_id"`
+ // If non-nil, the join request failed. Contains more information why it failed.
+ Error *PerformError
+}
+
// PerformBackfillRequest is a request to PerformBackfill.
type PerformBackfillRequest struct {
// The room to backfill
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 4299dd47..8dc1a170 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -22,6 +22,7 @@ type RoomserverInternalAPI struct {
*query.Queryer
*perform.Inviter
*perform.Joiner
+ *perform.Peeker
*perform.Leaver
*perform.Publisher
*perform.Backfiller
@@ -83,6 +84,13 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
+ r.Peeker = &perform.Peeker{
+ ServerName: r.Cfg.Matrix.ServerName,
+ Cfg: r.Cfg,
+ DB: r.DB,
+ FSAPI: r.fsAPI,
+ Inputer: r.Inputer,
+ }
r.Leaver = &perform.Leaver{
Cfg: r.Cfg,
DB: r.DB,
diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go
new file mode 100644
index 00000000..ab6d17b0
--- /dev/null
+++ b/roomserver/internal/perform/perform_peek.go
@@ -0,0 +1,206 @@
+// Copyright 2020 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package perform
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ fsAPI "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
+)
+
+type Peeker struct {
+ ServerName gomatrixserverlib.ServerName
+ Cfg *config.RoomServer
+ FSAPI fsAPI.FederationSenderInternalAPI
+ DB storage.Database
+
+ Inputer *input.Inputer
+}
+
+// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationsender.
+func (r *Peeker) PerformPeek(
+ ctx context.Context,
+ req *api.PerformPeekRequest,
+ res *api.PerformPeekResponse,
+) {
+ roomID, err := r.performPeek(ctx, req)
+ if err != nil {
+ perr, ok := err.(*api.PerformError)
+ if ok {
+ res.Error = perr
+ } else {
+ res.Error = &api.PerformError{
+ Msg: err.Error(),
+ }
+ }
+ }
+ res.RoomID = roomID
+}
+
+func (r *Peeker) performPeek(
+ ctx context.Context,
+ req *api.PerformPeekRequest,
+) (string, error) {
+ // FIXME: there's way too much duplication with performJoin
+ _, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
+ if err != nil {
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID),
+ }
+ }
+ if domain != r.Cfg.Matrix.ServerName {
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID),
+ }
+ }
+ if strings.HasPrefix(req.RoomIDOrAlias, "!") {
+ return r.performPeekRoomByID(ctx, req)
+ }
+ if strings.HasPrefix(req.RoomIDOrAlias, "#") {
+ return r.performPeekRoomByAlias(ctx, req)
+ }
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Room ID or alias %q is invalid", req.RoomIDOrAlias),
+ }
+}
+
+func (r *Peeker) performPeekRoomByAlias(
+ ctx context.Context,
+ req *api.PerformPeekRequest,
+) (string, error) {
+ // Get the domain part of the room alias.
+ _, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias)
+ if err != nil {
+ return "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias)
+ }
+ req.ServerNames = append(req.ServerNames, domain)
+
+ // Check if this alias matches our own server configuration. If it
+ // doesn't then we'll need to try a federated peek.
+ var roomID string
+ if domain != r.Cfg.Matrix.ServerName {
+ // The alias isn't owned by us, so we will need to try peeking using
+ // a remote server.
+ dirReq := fsAPI.PerformDirectoryLookupRequest{
+ RoomAlias: req.RoomIDOrAlias, // the room alias to lookup
+ ServerName: domain, // the server to ask
+ }
+ dirRes := fsAPI.PerformDirectoryLookupResponse{}
+ err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes)
+ if err != nil {
+ logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias)
+ return "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err)
+ }
+ roomID = dirRes.RoomID
+ req.ServerNames = append(req.ServerNames, dirRes.ServerNames...)
+ } else {
+ // Otherwise, look up if we know this room alias locally.
+ roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias)
+ if err != nil {
+ return "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
+ }
+ }
+
+ // If the room ID is empty then we failed to look up the alias.
+ if roomID == "" {
+ return "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias)
+ }
+
+ // If we do, then pluck out the room ID and continue the peek.
+ req.RoomIDOrAlias = roomID
+ return r.performPeekRoomByID(ctx, req)
+}
+
+func (r *Peeker) performPeekRoomByID(
+ ctx context.Context,
+ req *api.PerformPeekRequest,
+) (roomID string, err error) {
+ roomID = req.RoomIDOrAlias
+
+ // Get the domain part of the room ID.
+ _, domain, err := gomatrixserverlib.SplitID('!', roomID)
+ if err != nil {
+ return "", &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Room ID %q is invalid: %s", roomID, err),
+ }
+ }
+
+ // If the server name in the room ID isn't ours then it's a
+ // possible candidate for finding the room via federation. Add
+ // it to the list of servers to try.
+ if domain != r.Cfg.Matrix.ServerName {
+ req.ServerNames = append(req.ServerNames, domain)
+ }
+
+ // If this room isn't world_readable, we reject.
+ // XXX: would be nicer to call this with NIDs
+ // XXX: we should probably factor out history_visibility checks into a common utility method somewhere
+ // which handles the default value etc.
+ var worldReadable = false
+ ev, _ := r.DB.GetStateEvent(ctx, roomID, "m.room.history_visibility", "")
+ if ev != nil {
+ content := map[string]string{}
+ if err = json.Unmarshal(ev.Content(), &content); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("json.Unmarshal for history visibility failed")
+ return
+ }
+ if visibility, ok := content["history_visibility"]; ok {
+ worldReadable = visibility == "world_readable"
+ }
+ }
+
+ if !worldReadable {
+ return "", &api.PerformError{
+ Code: api.PerformErrorNotAllowed,
+ Msg: "Room is not world-readable",
+ }
+ }
+
+ // TODO: handle federated peeks
+
+ err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
+ {
+ Type: api.OutputTypeNewPeek,
+ NewPeek: &api.OutputNewPeek{
+ RoomID: roomID,
+ UserID: req.UserID,
+ DeviceID: req.DeviceID,
+ },
+ },
+ })
+ if err != nil {
+ return
+ }
+
+ // By this point, if req.RoomIDOrAlias contained an alias, then
+ // it will have been overwritten with a room ID by performPeekRoomByAlias.
+ // We should now include this in the response so that the CS API can
+ // return the right room ID.
+ return roomID, nil
+}
diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go
index b414b0d8..1ff1fc82 100644
--- a/roomserver/inthttp/client.go
+++ b/roomserver/inthttp/client.go
@@ -26,6 +26,7 @@ const (
// Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite"
+ RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
@@ -185,6 +186,23 @@ func (h *httpRoomserverInternalAPI) PerformJoin(
}
}
+func (h *httpRoomserverInternalAPI) PerformPeek(
+ ctx context.Context,
+ request *api.PerformPeekRequest,
+ response *api.PerformPeekResponse,
+) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPeek")
+ defer span.Finish()
+
+ apiURL := h.roomserverURL + RoomserverPerformPeekPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+ if err != nil {
+ response.Error = &api.PerformError{
+ Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
+ }
+ }
+}
+
func (h *httpRoomserverInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go
index 97f2a360..5816d4d8 100644
--- a/roomserver/inthttp/server.go
+++ b/roomserver/inthttp/server.go
@@ -63,6 +63,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(RoomserverPerformPeekPath,
+ httputil.MakeInternalAPI("performPeek", func(req *http.Request) util.JSONResponse {
+ var request api.PerformPeekRequest
+ var response api.PerformPeekResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ r.PerformPeek(req.Context(), &request, &response)
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
internalAPIMux.Handle(RoomserverPerformPublishPath,
httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse {
var request api.PerformPublishRequest
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index 5c18c725..262b0f2f 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -359,7 +359,7 @@ func (d *Database) MembershipUpdater(
var updater *MembershipUpdater
_ = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
updater, err = NewMembershipUpdater(ctx, d, txn, roomID, targetUserID, targetLocal, roomVersion)
- return nil
+ return err
})
return updater, err
}
@@ -374,7 +374,7 @@ func (d *Database) GetLatestEventsForUpdate(
var updater *LatestEventsUpdater
_ = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
updater, err = NewLatestEventsUpdater(ctx, d, txn, roomInfo)
- return nil
+ return err
})
return updater, err
}