aboutsummaryrefslogtreecommitdiff
path: root/setup
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2021-01-19 17:14:25 +0000
committerGitHub <noreply@github.com>2021-01-19 17:14:25 +0000
commit80aa9aa8b053655683cbdae1aeccb083166bc714 (patch)
treee1be2d3ead37283424d8b47482bf4175028279b5 /setup
parentccfcb2d2808f4daa960921174a40b52b956d3a2a (diff)
Implement MSC2946 over federation (#1722)
* Add fedsender dep on msc2946 * Add MSC2946Spaces to fsAPI * Add exclude_rooms impl * Implement fed spaces handler * Use stripped state not room version * Call federated spaces at the right time
Diffstat (limited to 'setup')
-rw-r--r--setup/mscs/msc2946/msc2946.go317
-rw-r--r--setup/mscs/msc2946/msc2946_test.go22
-rw-r--r--setup/mscs/mscs.go2
3 files changed, 289 insertions, 52 deletions
diff --git a/setup/mscs/msc2946/msc2946.go b/setup/mscs/msc2946/msc2946.go
index 2b547737..c3a68632 100644
--- a/setup/mscs/msc2946/msc2946.go
+++ b/setup/mscs/msc2946/msc2946.go
@@ -17,13 +17,17 @@ package msc2946
import (
"context"
+ "encoding/json"
"fmt"
"net/http"
"strings"
"sync"
+ "time"
"github.com/gorilla/mux"
chttputil "github.com/matrix-org/dendrite/clientapi/httputil"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ fs "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/hooks"
"github.com/matrix-org/dendrite/internal/httputil"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
@@ -40,38 +44,16 @@ const (
ConstSpaceParentEventType = "org.matrix.msc1772.space.parent"
)
-// SpacesRequest is the request body to POST /_matrix/client/r0/rooms/{roomID}/spaces
-type SpacesRequest struct {
- MaxRoomsPerSpace int `json:"max_rooms_per_space"`
- Limit int `json:"limit"`
- Batch string `json:"batch"`
-}
-
// Defaults sets the request defaults
-func (r *SpacesRequest) Defaults() {
+func Defaults(r *gomatrixserverlib.MSC2946SpacesRequest) {
r.Limit = 100
r.MaxRoomsPerSpace = -1
}
-// SpacesResponse is the response body to POST /_matrix/client/r0/rooms/{roomID}/spaces
-type SpacesResponse struct {
- NextBatch string `json:"next_batch"`
- // Rooms are nodes on the space graph.
- Rooms []Room `json:"rooms"`
- // Events are edges on the space graph, exclusively m.space.child or m.space.parent events
- Events []gomatrixserverlib.ClientEvent `json:"events"`
-}
-
-// Room is a node on the space graph
-type Room struct {
- gomatrixserverlib.PublicRoom
- NumRefs int `json:"num_refs"`
- RoomType string `json:"room_type"`
-}
-
// Enable this MSC
func Enable(
base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, userAPI userapi.UserInternalAPI,
+ fsAPI fs.FederationSenderInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
) error {
db, err := NewDatabase(&base.Cfg.MSCs.Database)
if err != nil {
@@ -89,12 +71,69 @@ func Enable(
})
base.PublicClientAPIMux.Handle("/unstable/rooms/{roomID}/spaces",
- httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI)),
+ httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI, fsAPI, base.Cfg.Global.ServerName)),
).Methods(http.MethodPost, http.MethodOptions)
+
+ base.PublicFederationAPIMux.Handle("/unstable/spaces/{roomID}", httputil.MakeExternalAPI(
+ "msc2946_fed_spaces", func(req *http.Request) util.JSONResponse {
+ fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
+ req, time.Now(), base.Cfg.Global.ServerName, keyRing,
+ )
+ if fedReq == nil {
+ return errResp
+ }
+ // Extract the room ID from the request. Sanity check request data.
+ params, err := httputil.URLDecodeMapValues(mux.Vars(req))
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+ roomID := params["roomID"]
+ return federatedSpacesHandler(req.Context(), fedReq, roomID, db, rsAPI, fsAPI, base.Cfg.Global.ServerName)
+ },
+ )).Methods(http.MethodPost, http.MethodOptions)
return nil
}
-func spacesHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
+func federatedSpacesHandler(
+ ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, roomID string, db Database,
+ rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
+ thisServer gomatrixserverlib.ServerName,
+) util.JSONResponse {
+ inMemoryBatchCache := make(map[string]set)
+ var r gomatrixserverlib.MSC2946SpacesRequest
+ Defaults(&r)
+ if err := json.Unmarshal(fedReq.Content(), &r); err != nil {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()),
+ }
+ }
+ if r.Limit > 100 {
+ r.Limit = 100
+ }
+ w := walker{
+ req: &r,
+ rootRoomID: roomID,
+ serverName: fedReq.Origin(),
+ thisServer: thisServer,
+ ctx: ctx,
+
+ db: db,
+ rsAPI: rsAPI,
+ fsAPI: fsAPI,
+ inMemoryBatchCache: inMemoryBatchCache,
+ }
+ res := w.walk()
+ return util.JSONResponse{
+ Code: 200,
+ JSON: res,
+ }
+}
+
+func spacesHandler(
+ db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
+ thisServer gomatrixserverlib.ServerName,
+) func(*http.Request, *userapi.Device) util.JSONResponse {
return func(req *http.Request, device *userapi.Device) util.JSONResponse {
inMemoryBatchCache := make(map[string]set)
// Extract the room ID from the request. Sanity check request data.
@@ -103,8 +142,8 @@ func spacesHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*ht
return util.ErrorResponse(err)
}
roomID := params["roomID"]
- var r SpacesRequest
- r.Defaults()
+ var r gomatrixserverlib.MSC2946SpacesRequest
+ Defaults(&r)
if resErr := chttputil.UnmarshalJSONRequest(req, &r); resErr != nil {
return *resErr
}
@@ -115,10 +154,12 @@ func spacesHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*ht
req: &r,
rootRoomID: roomID,
caller: device,
+ thisServer: thisServer,
ctx: req.Context(),
db: db,
rsAPI: rsAPI,
+ fsAPI: fsAPI,
inMemoryBatchCache: inMemoryBatchCache,
}
res := w.walk()
@@ -130,11 +171,14 @@ func spacesHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*ht
}
type walker struct {
- req *SpacesRequest
+ req *gomatrixserverlib.MSC2946SpacesRequest
rootRoomID string
caller *userapi.Device
+ serverName gomatrixserverlib.ServerName
+ thisServer gomatrixserverlib.ServerName
db Database
rsAPI roomserver.RoomserverInternalAPI
+ fsAPI fs.FederationSenderInternalAPI
ctx context.Context
// user ID|device ID|batch_num => event/room IDs sent to client
@@ -142,10 +186,26 @@ type walker struct {
mu sync.Mutex
}
+func (w *walker) roomIsExcluded(roomID string) bool {
+ for _, exclRoom := range w.req.ExcludeRooms {
+ if exclRoom == roomID {
+ return true
+ }
+ }
+ return false
+}
+
+func (w *walker) callerID() string {
+ if w.caller != nil {
+ return w.caller.UserID + "|" + w.caller.ID
+ }
+ return string(w.serverName)
+}
+
func (w *walker) alreadySent(id string) bool {
w.mu.Lock()
defer w.mu.Unlock()
- m, ok := w.inMemoryBatchCache[w.caller.UserID+"|"+w.caller.ID]
+ m, ok := w.inMemoryBatchCache[w.callerID()]
if !ok {
return false
}
@@ -155,17 +215,17 @@ func (w *walker) alreadySent(id string) bool {
func (w *walker) markSent(id string) {
w.mu.Lock()
defer w.mu.Unlock()
- m := w.inMemoryBatchCache[w.caller.UserID+"|"+w.caller.ID]
+ m := w.inMemoryBatchCache[w.callerID()]
if m == nil {
m = make(set)
}
m[id] = true
- w.inMemoryBatchCache[w.caller.UserID+"|"+w.caller.ID] = m
+ w.inMemoryBatchCache[w.callerID()] = m
}
// nolint:gocyclo
-func (w *walker) walk() *SpacesResponse {
- var res SpacesResponse
+func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
+ var res gomatrixserverlib.MSC2946SpacesResponse
// Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms
unvisited := []string{w.rootRoomID}
processed := make(set)
@@ -178,9 +238,20 @@ func (w *walker) walk() *SpacesResponse {
}
// Mark this room as processed.
processed[roomID] = true
+
// Is the caller currently joined to the room or is the room `world_readable`
// If no, skip this room. If yes, continue.
- if !w.authorised(roomID) {
+ if !w.roomExists(roomID) || !w.authorised(roomID) {
+ // attempt to query this room over federation, as either we've never heard of it before
+ // or we've left it and hence are not authorised (but info may be exposed regardless)
+ fedRes, err := w.federatedRoomInfo(roomID)
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Errorf("failed to query federated spaces")
+ continue
+ }
+ if fedRes != nil {
+ res = combineResponses(res, *fedRes)
+ }
continue
}
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
@@ -194,7 +265,7 @@ func (w *walker) walk() *SpacesResponse {
// If this room has not ever been in `rooms` (across multiple requests), extract the
// `PublicRoomsChunk` for this room.
- if !w.alreadySent(roomID) {
+ if !w.alreadySent(roomID) && !w.roomIsExcluded(roomID) {
pubRoom := w.publicRoomsChunk(roomID)
roomType := ""
create := w.stateEvent(roomID, gomatrixserverlib.MRoomCreate, "")
@@ -204,11 +275,12 @@ func (w *walker) walk() *SpacesResponse {
}
// Add the total number of events to `PublicRoomsChunk` under `num_refs`. Add `PublicRoomsChunk` to `rooms`.
- res.Rooms = append(res.Rooms, Room{
+ res.Rooms = append(res.Rooms, gomatrixserverlib.MSC2946Room{
PublicRoom: *pubRoom,
NumRefs: refs.len(),
RoomType: roomType,
})
+ w.markSent(roomID)
}
uniqueRooms := make(set)
@@ -218,9 +290,11 @@ func (w *walker) walk() *SpacesResponse {
if w.rootRoomID == roomID {
for _, ev := range refs.events() {
if !w.alreadySent(ev.EventID()) {
- res.Events = append(res.Events, gomatrixserverlib.HeaderedToClientEvent(
- ev, gomatrixserverlib.FormatAll,
- ))
+ strip := stripped(ev.Event)
+ if strip == nil {
+ continue
+ }
+ res.Events = append(res.Events, *strip)
uniqueRooms[ev.RoomID()] = true
uniqueRooms[SpaceTarget(ev)] = true
w.markSent(ev.EventID())
@@ -240,9 +314,16 @@ func (w *walker) walk() *SpacesResponse {
if w.alreadySent(ev.EventID()) {
continue
}
- res.Events = append(res.Events, gomatrixserverlib.HeaderedToClientEvent(
- ev, gomatrixserverlib.FormatAll,
- ))
+ // Skip the room if it's part of exclude_rooms but ONLY IF the source matches, as we still
+ // want to catch arrows which point to excluded rooms.
+ if w.roomIsExcluded(ev.RoomID()) {
+ continue
+ }
+ strip := stripped(ev.Event)
+ if strip == nil {
+ continue
+ }
+ res.Events = append(res.Events, *strip)
uniqueRooms[ev.RoomID()] = true
uniqueRooms[SpaceTarget(ev)] = true
w.markSent(ev.EventID())
@@ -289,8 +370,120 @@ func (w *walker) publicRoomsChunk(roomID string) *gomatrixserverlib.PublicRoom {
return &pubRooms[0]
}
+// federatedRoomInfo returns more of the spaces graph from another server. Returns nil if this was
+// unsuccessful.
+func (w *walker) federatedRoomInfo(roomID string) (*gomatrixserverlib.MSC2946SpacesResponse, error) {
+ // only do federated requests for client requests
+ if w.caller == nil {
+ return nil, nil
+ }
+ // extract events which point to this room ID and extract their vias
+ events, err := w.db.References(w.ctx, roomID)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get References events: %w", err)
+ }
+ vias := make(set)
+ for _, ev := range events {
+ if ev.StateKeyEquals(roomID) {
+ // event points at this room, extract vias
+ content := struct {
+ Vias []string `json:"via"`
+ }{}
+ if err = json.Unmarshal(ev.Content(), &content); err != nil {
+ continue // silently ignore corrupted state events
+ }
+ for _, v := range content.Vias {
+ vias[v] = true
+ }
+ }
+ }
+ util.GetLogger(w.ctx).Infof("Querying federatedRoomInfo via %+v", vias)
+ ctx := context.Background()
+ // query more of the spaces graph using these servers
+ for serverName := range vias {
+ if serverName == string(w.thisServer) {
+ continue
+ }
+ res, err := w.fsAPI.MSC2946Spaces(ctx, gomatrixserverlib.ServerName(serverName), roomID, gomatrixserverlib.MSC2946SpacesRequest{
+ Limit: w.req.Limit,
+ MaxRoomsPerSpace: w.req.MaxRoomsPerSpace,
+ })
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).Warnf("failed to call MSC2946Spaces on server %s", serverName)
+ continue
+ }
+ return &res, nil
+ }
+ return nil, nil
+}
+
+func (w *walker) roomExists(roomID string) bool {
+ var queryRes roomserver.QueryServerJoinedToRoomResponse
+ err := w.rsAPI.QueryServerJoinedToRoom(w.ctx, &roomserver.QueryServerJoinedToRoomRequest{
+ RoomID: roomID,
+ ServerName: w.thisServer,
+ }, &queryRes)
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).Error("failed to QueryServerJoinedToRoom")
+ return false
+ }
+ // if the room exists but we aren't in the room then we might have stale data so we want to fetch
+ // it fresh via federation
+ return queryRes.RoomExists && queryRes.IsInRoom
+}
+
// authorised returns true iff the user is joined this room or the room is world_readable
func (w *walker) authorised(roomID string) bool {
+ if w.caller != nil {
+ return w.authorisedUser(roomID)
+ }
+ return w.authorisedServer(roomID)
+}
+
+// authorisedServer returns true iff the server is joined this room or the room is world_readable
+func (w *walker) authorisedServer(roomID string) bool {
+ // Check history visibility first
+ hisVisTuple := gomatrixserverlib.StateKeyTuple{
+ EventType: gomatrixserverlib.MRoomHistoryVisibility,
+ StateKey: "",
+ }
+ var queryRoomRes roomserver.QueryCurrentStateResponse
+ err := w.rsAPI.QueryCurrentState(w.ctx, &roomserver.QueryCurrentStateRequest{
+ RoomID: roomID,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ hisVisTuple,
+ },
+ }, &queryRoomRes)
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).Error("failed to QueryCurrentState")
+ return false
+ }
+ hisVisEv := queryRoomRes.StateEvents[hisVisTuple]
+ if hisVisEv != nil {
+ hisVis, _ := hisVisEv.HistoryVisibility()
+ if hisVis == "world_readable" {
+ return true
+ }
+ }
+ // check if server is joined to the room
+ var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
+ err = w.fsAPI.QueryJoinedHostServerNamesInRoom(w.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
+ RoomID: roomID,
+ }, &queryRes)
+ if err != nil {
+ util.GetLogger(w.ctx).WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom")
+ return false
+ }
+ for _, srv := range queryRes.ServerNames {
+ if srv == w.serverName {
+ return true
+ }
+ }
+ return false
+}
+
+// authorisedUser returns true iff the user is joined this room or the room is world_readable
+func (w *walker) authorisedUser(roomID string) bool {
hisVisTuple := gomatrixserverlib.StateKeyTuple{
EventType: gomatrixserverlib.MRoomHistoryVisibility,
StateKey: "",
@@ -374,3 +567,41 @@ func (el eventLookup) events() (events []*gomatrixserverlib.HeaderedEvent) {
}
type set map[string]bool
+
+func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEvent {
+ if ev.StateKey() == nil {
+ return nil
+ }
+ return &gomatrixserverlib.MSC2946StrippedEvent{
+ Type: ev.Type(),
+ StateKey: *ev.StateKey(),
+ Content: ev.Content(),
+ Sender: ev.Sender(),
+ RoomID: ev.RoomID(),
+ }
+}
+
+func combineResponses(local, remote gomatrixserverlib.MSC2946SpacesResponse) gomatrixserverlib.MSC2946SpacesResponse {
+ knownRooms := make(set)
+ for _, room := range local.Rooms {
+ knownRooms[room.RoomID] = true
+ }
+ knownEvents := make(set)
+ for _, event := range local.Events {
+ knownEvents[event.RoomID+event.Type+event.StateKey] = true
+ }
+ // mux in remote entries if and only if they aren't present already
+ for _, room := range remote.Rooms {
+ if knownRooms[room.RoomID] {
+ continue
+ }
+ local.Rooms = append(local.Rooms, room)
+ }
+ for _, event := range remote.Events {
+ if knownEvents[event.RoomID+event.Type+event.StateKey] {
+ continue
+ }
+ local.Events = append(local.Events, event)
+ }
+ return local
+}
diff --git a/setup/mscs/msc2946/msc2946_test.go b/setup/mscs/msc2946/msc2946_test.go
index d2d935e8..4f180a98 100644
--- a/setup/mscs/msc2946/msc2946_test.go
+++ b/setup/mscs/msc2946/msc2946_test.go
@@ -41,6 +41,7 @@ var (
client = &http.Client{
Timeout: 10 * time.Second,
}
+ roomVer = gomatrixserverlib.RoomVersionV6
)
// Basic sanity check of MSC2946 logic. Tests a single room with a few state events
@@ -269,13 +270,13 @@ func TestMSC2946(t *testing.T) {
})
}
-func newReq(t *testing.T, jsonBody map[string]interface{}) *msc2946.SpacesRequest {
+func newReq(t *testing.T, jsonBody map[string]interface{}) *gomatrixserverlib.MSC2946SpacesRequest {
t.Helper()
b, err := json.Marshal(jsonBody)
if err != nil {
t.Fatalf("Failed to marshal request: %s", err)
}
- var r msc2946.SpacesRequest
+ var r gomatrixserverlib.MSC2946SpacesRequest
if err := json.Unmarshal(b, &r); err != nil {
t.Fatalf("Failed to unmarshal request: %s", err)
}
@@ -299,10 +300,10 @@ func runServer(t *testing.T, router *mux.Router) func() {
}
}
-func postSpaces(t *testing.T, expectCode int, accessToken, roomID string, req *msc2946.SpacesRequest) *msc2946.SpacesResponse {
+func postSpaces(t *testing.T, expectCode int, accessToken, roomID string, req *gomatrixserverlib.MSC2946SpacesRequest) *gomatrixserverlib.MSC2946SpacesResponse {
t.Helper()
- var r msc2946.SpacesRequest
- r.Defaults()
+ var r gomatrixserverlib.MSC2946SpacesRequest
+ msc2946.Defaults(&r)
data, err := json.Marshal(req)
if err != nil {
t.Fatalf("failed to marshal request: %s", err)
@@ -324,7 +325,7 @@ func postSpaces(t *testing.T, expectCode int, accessToken, roomID string, req *m
t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body))
}
if res.StatusCode == 200 {
- var result msc2946.SpacesResponse
+ var result gomatrixserverlib.MSC2946SpacesResponse
body, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatalf("response 200 OK but failed to read response body: %s", err)
@@ -400,6 +401,12 @@ type testRoomserverAPI struct {
pubRoomState map[string]map[gomatrixserverlib.StateKeyTuple]string
}
+func (r *testRoomserverAPI) QueryServerJoinedToRoom(ctx context.Context, req *roomserver.QueryServerJoinedToRoomRequest, res *roomserver.QueryServerJoinedToRoomResponse) error {
+ res.IsInRoom = true
+ res.RoomExists = true
+ return nil
+}
+
func (r *testRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *roomserver.QueryBulkStateContentRequest, res *roomserver.QueryBulkStateContentResponse) error {
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
for _, roomID := range req.RoomIDs {
@@ -452,7 +459,7 @@ func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserve
PublicFederationAPIMux: mux.NewRouter().PathPrefix(httputil.PublicFederationPathPrefix).Subrouter(),
}
- err := msc2946.Enable(base, rsAPI, userAPI)
+ err := msc2946.Enable(base, rsAPI, userAPI, nil, nil)
if err != nil {
t.Fatalf("failed to enable MSC2946: %s", err)
}
@@ -472,7 +479,6 @@ type fledglingEvent struct {
func mustCreateEvent(t *testing.T, ev fledglingEvent) (result *gomatrixserverlib.HeaderedEvent) {
t.Helper()
- roomVer := gomatrixserverlib.RoomVersionV6
seed := make([]byte, ed25519.SeedSize) // zero seed
key := ed25519.NewKeyFromSeed(seed)
eb := gomatrixserverlib.EventBuilder{
diff --git a/setup/mscs/mscs.go b/setup/mscs/mscs.go
index bf210362..027885c8 100644
--- a/setup/mscs/mscs.go
+++ b/setup/mscs/mscs.go
@@ -41,7 +41,7 @@ func EnableMSC(base *setup.BaseDendrite, monolith *setup.Monolith, msc string) e
case "msc2836":
return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationSenderAPI, monolith.UserAPI, monolith.KeyRing)
case "msc2946":
- return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI)
+ return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationSenderAPI, monolith.KeyRing)
default:
return fmt.Errorf("EnableMSC: unknown msc '%s'", msc)
}