diff options
Diffstat (limited to 'setup/mscs/msc2836/msc2836.go')
-rw-r--r-- | setup/mscs/msc2836/msc2836.go | 598 |
1 files changed, 460 insertions, 138 deletions
diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index 33a65c8f..95473f97 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -18,10 +18,13 @@ package msc2836 import ( "bytes" "context" + "crypto/sha256" "encoding/json" "fmt" "io" "net/http" + "sort" + "strings" "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -36,13 +39,12 @@ import ( ) const ( - constRelType = "m.reference" - constRoomIDKey = "relationship_room_id" - constRoomServers = "relationship_servers" + constRelType = "m.reference" ) type EventRelationshipRequest struct { EventID string `json:"event_id"` + RoomID string `json:"room_id"` MaxDepth int `json:"max_depth"` MaxBreadth int `json:"max_breadth"` Limit int `json:"limit"` @@ -52,7 +54,6 @@ type EventRelationshipRequest struct { IncludeChildren bool `json:"include_children"` Direction string `json:"direction"` Batch string `json:"batch"` - AutoJoin bool `json:"auto_join"` } func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) { @@ -81,8 +82,16 @@ type EventRelationshipResponse struct { Limited bool `json:"limited"` } +func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) *EventRelationshipResponse { + out := &EventRelationshipResponse{ + Events: gomatrixserverlib.ToClientEvents(res.Events, gomatrixserverlib.FormatAll), + Limited: res.Limited, + NextBatch: res.NextBatch, + } + return out +} + // Enable this MSC -// nolint:gocyclo func Enable( base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, @@ -96,63 +105,22 @@ func Enable( he := headeredEvent.(*gomatrixserverlib.HeaderedEvent) hookErr := db.StoreRelation(context.Background(), he) if hookErr != nil { - util.GetLogger(context.Background()).WithError(hookErr).Error( + util.GetLogger(context.Background()).WithError(hookErr).WithField("event_id", he.EventID()).Error( "failed to StoreRelation", ) } - }) - hooks.Attach(hooks.KindNewEventReceived, func(headeredEvent interface{}) { - he := headeredEvent.(*gomatrixserverlib.HeaderedEvent) - ctx := context.Background() - // we only inject metadata for events our server sends - userID := he.Sender() - _, domain, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - return - } - if domain != base.Cfg.Global.ServerName { - return - } - // if this event has an m.relationship, add on the room_id and servers to unsigned - parent, child, relType := parentChildEventIDs(he) - if parent == "" || child == "" || relType == "" { - return - } - event, joinedToRoom := getEventIfVisible(ctx, rsAPI, parent, userID) - if !joinedToRoom { - return - } - err = he.SetUnsignedField(constRoomIDKey, event.RoomID()) - if err != nil { - util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField") - return - } - - var servers []gomatrixserverlib.ServerName - if fsAPI != nil { - var res fs.QueryJoinedHostServerNamesInRoomResponse - err = fsAPI.QueryJoinedHostServerNamesInRoom(ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ - RoomID: event.RoomID(), - }, &res) - if err != nil { - util.GetLogger(context.Background()).WithError(err).Warn("Failed to QueryJoinedHostServerNamesInRoom") - return - } - servers = res.ServerNames - } else { - servers = []gomatrixserverlib.ServerName{ - base.Cfg.Global.ServerName, - } - } - err = he.SetUnsignedField(constRoomServers, servers) - if err != nil { - util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField") - return + // we need to update child metadata here as well as after doing remote /event_relationships requests + // so we catch child metadata originating from /send transactions + hookErr = db.UpdateChildMetadata(context.Background(), he) + if hookErr != nil { + util.GetLogger(context.Background()).WithError(err).WithField("event_id", he.EventID()).Warn( + "failed to update child metadata for event", + ) } }) base.PublicClientAPIMux.Handle("/unstable/event_relationships", - httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI)), + httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI, fsAPI)), ).Methods(http.MethodPost, http.MethodOptions) base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI( @@ -163,22 +131,27 @@ func Enable( if fedReq == nil { return errResp } - return federatedEventRelationship(req.Context(), fedReq, db, rsAPI) + return federatedEventRelationship(req.Context(), fedReq, db, rsAPI, fsAPI) }, )).Methods(http.MethodPost, http.MethodOptions) return nil } type reqCtx struct { - ctx context.Context - rsAPI roomserver.RoomserverInternalAPI - db Database - req *EventRelationshipRequest - userID string + ctx context.Context + rsAPI roomserver.RoomserverInternalAPI + db Database + req *EventRelationshipRequest + userID string + roomVersion gomatrixserverlib.RoomVersion + + // federated request args isFederatedRequest bool + serverName gomatrixserverlib.ServerName + fsAPI fs.FederationSenderInternalAPI } -func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse { +func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse { return func(req *http.Request, device *userapi.Device) util.JSONResponse { relation, err := NewEventRelationshipRequest(req.Body) if err != nil { @@ -193,6 +166,7 @@ func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAP req: relation, userID: device.UserID, rsAPI: rsAPI, + fsAPI: fsAPI, isFederatedRequest: false, db: db, } @@ -203,12 +177,14 @@ func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAP return util.JSONResponse{ Code: 200, - JSON: res, + JSON: toClientResponse(res), } } } -func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI) util.JSONResponse { +func federatedEventRelationship( + ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, +) util.JSONResponse { relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content())) if err != nil { util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON") @@ -218,17 +194,43 @@ func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.F } } rc := reqCtx{ - ctx: ctx, - req: relation, - userID: "", - rsAPI: rsAPI, + ctx: ctx, + req: relation, + rsAPI: rsAPI, + db: db, + // federation args isFederatedRequest: true, - db: db, + fsAPI: fsAPI, + serverName: fedReq.Origin(), } res, resErr := rc.process() if resErr != nil { return *resErr } + // add auth chain information + requiredAuthEventsSet := make(map[string]bool) + var requiredAuthEvents []string + for _, ev := range res.Events { + for _, a := range ev.AuthEventIDs() { + if requiredAuthEventsSet[a] { + continue + } + requiredAuthEvents = append(requiredAuthEvents, a) + requiredAuthEventsSet[a] = true + } + } + var queryRes roomserver.QueryAuthChainResponse + err = rsAPI.QueryAuthChain(ctx, &roomserver.QueryAuthChainRequest{ + EventIDs: requiredAuthEvents, + }, &queryRes) + if err != nil { + // they may already have the auth events so don't fail this request + util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain") + } + res.AuthChain = make([]*gomatrixserverlib.Event, len(queryRes.AuthChain)) + for i := range queryRes.AuthChain { + res.AuthChain[i] = queryRes.AuthChain[i].Unwrap() + } return util.JSONResponse{ Code: 200, @@ -236,18 +238,25 @@ func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.F } } -func (rc *reqCtx) process() (*EventRelationshipResponse, *util.JSONResponse) { - var res EventRelationshipResponse +// nolint:gocyclo +func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) { + var res gomatrixserverlib.MSC2836EventRelationshipsResponse var returnEvents []*gomatrixserverlib.HeaderedEvent // Can the user see (according to history visibility) event_id? If no, reject the request, else continue. - // We should have the event being referenced so don't give any claimed room ID / servers - event := rc.getEventIfVisible(rc.req.EventID, "", nil) + event := rc.getLocalEvent(rc.req.EventID) if event == nil { + event = rc.fetchUnknownEvent(rc.req.EventID, rc.req.RoomID) + } + if rc.req.RoomID == "" && event != nil { + rc.req.RoomID = event.RoomID() + } + if event == nil || !rc.authorisedToSeeEvent(event) { return nil, &util.JSONResponse{ Code: 403, JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"), } } + rc.roomVersion = event.Version() // Retrieve the event. Add it to response array. returnEvents = append(returnEvents, event) @@ -282,29 +291,122 @@ func (rc *reqCtx) process() (*EventRelationshipResponse, *util.JSONResponse) { ) returnEvents = append(returnEvents, events...) } - res.Events = make([]gomatrixserverlib.ClientEvent, len(returnEvents)) + res.Events = make([]*gomatrixserverlib.Event, len(returnEvents)) for i, ev := range returnEvents { - res.Events[i] = gomatrixserverlib.HeaderedToClientEvent(ev, gomatrixserverlib.FormatAll) + // for each event, extract the children_count | hash and add it as unsigned data. + rc.addChildMetadata(ev) + res.Events[i] = ev.Unwrap() } res.Limited = remaining == 0 || walkLimited return &res, nil } +// fetchUnknownEvent retrieves an unknown event from the room specified. This server must +// be joined to the room in question. This has the side effect of injecting surround threaded +// events into the roomserver. +func (rc *reqCtx) fetchUnknownEvent(eventID, roomID string) *gomatrixserverlib.HeaderedEvent { + if rc.isFederatedRequest || roomID == "" { + // we don't do fed hits for fed requests, and we can't ask servers without a room ID! + return nil + } + logger := util.GetLogger(rc.ctx).WithField("room_id", roomID) + // if they supplied a room_id, check the room exists. + var queryVerRes roomserver.QueryRoomVersionForRoomResponse + err := rc.rsAPI.QueryRoomVersionForRoom(rc.ctx, &roomserver.QueryRoomVersionForRoomRequest{ + RoomID: roomID, + }, &queryVerRes) + if err != nil { + logger.WithError(err).Warn("failed to query room version for room, does this room exist?") + return nil + } + + // check the user is joined to that room + var queryMemRes roomserver.QueryMembershipForUserResponse + err = rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{ + RoomID: roomID, + UserID: rc.userID, + }, &queryMemRes) + if err != nil { + logger.WithError(err).Warn("failed to query membership for user in room") + return nil + } + if !queryMemRes.IsInRoom { + return nil + } + + // ask one of the servers in the room for the event + var queryRes fs.QueryJoinedHostServerNamesInRoomResponse + err = rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ + RoomID: roomID, + }, &queryRes) + if err != nil { + logger.WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom") + return nil + } + // query up to 5 servers + serversToQuery := queryRes.ServerNames + if len(serversToQuery) > 5 { + serversToQuery = serversToQuery[:5] + } + + // fetch the event, along with some of the surrounding thread (if it's threaded) and the auth chain. + // Inject the response into the roomserver to remember the event across multiple calls and to set + // unexplored flags correctly. + for _, srv := range serversToQuery { + res, err := rc.MSC2836EventRelationships(eventID, srv, queryVerRes.RoomVersion) + if err != nil { + continue + } + rc.injectResponseToRoomserver(res) + for _, ev := range res.Events { + if ev.EventID() == eventID { + return ev.Headered(ev.Version()) + } + } + } + logger.WithField("servers", serversToQuery).Warn("failed to query event relationships") + return nil +} + // If include_parent: true and there is a valid m.relationship field in the event, // retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array. -func (rc *reqCtx) includeParent(event *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) { - parentID, _, _ := parentChildEventIDs(event) +func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) { + parentID, _, _ := parentChildEventIDs(childEvent) if parentID == "" { return nil } - claimedRoomID, claimedServers := roomIDAndServers(event) - return rc.getEventIfVisible(parentID, claimedRoomID, claimedServers) + return rc.lookForEvent(parentID) } // If include_children: true, lookup all events which have event_id as an m.relationship // Apply history visibility checks to all these events and add the ones which pass into the response array, // honouring the recent_first flag and the limit. func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) { + if rc.hasUnexploredChildren(parentID) { + // we need to do a remote request to pull in the children as we are missing them locally. + serversToQuery := rc.getServersForEventID(parentID) + var result *gomatrixserverlib.MSC2836EventRelationshipsResponse + for _, srv := range serversToQuery { + res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{ + EventID: parentID, + Direction: "down", + Limit: 100, + MaxBreadth: -1, + MaxDepth: 1, // we just want the children from this parent + RecentFirst: true, + }, rc.roomVersion) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships") + } else { + result = &res + break + } + } + if result != nil { + rc.injectResponseToRoomserver(result) + } + // fallthrough to pull these new events from the DB + } children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst) if err != nil { util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent") @@ -313,8 +415,7 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen } var childEvents []*gomatrixserverlib.HeaderedEvent for _, child := range children { - // in order for us to even know about the children the server must be joined to those rooms, hence pass no claimed room ID or servers. - childEvent := rc.getEventIfVisible(child.EventID, "", nil) + childEvent := rc.lookForEvent(child.EventID) if childEvent != nil { childEvents = append(childEvents, childEvent) } @@ -327,14 +428,9 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen // Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag, // honouring the limit, max_depth and max_breadth values according to the following rules -// nolint: unparam func walkThread( ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int, ) ([]*gomatrixserverlib.HeaderedEvent, bool) { - if rc.req.Direction != "down" { - util.GetLogger(ctx).Error("not implemented: direction=up") - return nil, false - } var result []*gomatrixserverlib.HeaderedEvent eventWalker := walker{ ctx: ctx, @@ -352,8 +448,11 @@ func walkThread( } // Process the event. - // TODO: Include edge information: room ID and servers - event := rc.getEventIfVisible(wi.EventID, "", nil) + // if event is not found, use remoteEventRelationships to explore that part of the thread remotely. + // This will probably be easiest if the event relationships response is directly pumped into the database + // so the next walk will do the right thing. This requires those events to be authed and likely injected as + // outliers into the roomserver DB, which will de-dupe appropriately. + event := rc.lookForEvent(wi.EventID) if event != nil { result = append(result, event) } @@ -368,74 +467,280 @@ func walkThread( return result, limited } -func (rc *reqCtx) getEventIfVisible(eventID string, claimedRoomID string, claimedServers []string) *gomatrixserverlib.HeaderedEvent { - event, joinedToRoom := getEventIfVisible(rc.ctx, rc.rsAPI, eventID, rc.userID) - if event != nil && joinedToRoom { - return event +// MSC2836EventRelationships performs an /event_relationships request to a remote server +func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*gomatrixserverlib.MSC2836EventRelationshipsResponse, error) { + res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{ + EventID: eventID, + DepthFirst: rc.req.DepthFirst, + Direction: rc.req.Direction, + Limit: rc.req.Limit, + MaxBreadth: rc.req.MaxBreadth, + MaxDepth: rc.req.MaxDepth, + RecentFirst: rc.req.RecentFirst, + }, ver) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships") + return nil, err + } + return &res, nil + +} + +// authorisedToSeeEvent checks that the user or server is allowed to see this event. Returns true if allowed to +// see this request. This only needs to be done once per room at present as we just check for joined status. +func (rc *reqCtx) authorisedToSeeEvent(event *gomatrixserverlib.HeaderedEvent) bool { + if rc.isFederatedRequest { + // make sure the server is in this room + var res fs.QueryJoinedHostServerNamesInRoomResponse + err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ + RoomID: event.RoomID(), + }, &res) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryJoinedHostServerNamesInRoom") + return false + } + for _, srv := range res.ServerNames { + if srv == rc.serverName { + return true + } + } + return false } - // either we don't have the event or we aren't joined to the room, regardless we should try joining if auto join is enabled - if !rc.req.AutoJoin { + // make sure the user is in this room + // Allow events if the member is in the room + // TODO: This does not honour history_visibility + // TODO: This does not honour m.room.create content + var queryMembershipRes roomserver.QueryMembershipForUserResponse + err := rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{ + RoomID: event.RoomID(), + UserID: rc.userID, + }, &queryMembershipRes) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryMembershipForUser") + return false + } + return queryMembershipRes.IsInRoom +} + +func (rc *reqCtx) getServersForEventID(eventID string) []gomatrixserverlib.ServerName { + if rc.req.RoomID == "" { + util.GetLogger(rc.ctx).WithField("event_id", eventID).Error( + "getServersForEventID: event exists in unknown room", + ) return nil } - // if we're doing this on behalf of a random server don't auto-join rooms regardless of what the request says - if rc.isFederatedRequest { + if rc.roomVersion == "" { + util.GetLogger(rc.ctx).WithField("event_id", eventID).Errorf( + "getServersForEventID: event exists in %s with unknown room version", rc.req.RoomID, + ) return nil } - roomID := claimedRoomID - var servers []gomatrixserverlib.ServerName - if event != nil { - roomID = event.RoomID() - } - for _, s := range claimedServers { - servers = append(servers, gomatrixserverlib.ServerName(s)) - } - var joinRes roomserver.PerformJoinResponse - rc.rsAPI.PerformJoin(rc.ctx, &roomserver.PerformJoinRequest{ - UserID: rc.userID, - Content: map[string]interface{}{}, - RoomIDOrAlias: roomID, - ServerNames: servers, - }, &joinRes) - if joinRes.Error != nil { - util.GetLogger(rc.ctx).WithError(joinRes.Error).WithField("room_id", roomID).Error("Failed to auto-join room") + var queryRes fs.QueryJoinedHostServerNamesInRoomResponse + err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ + RoomID: rc.req.RoomID, + }, &queryRes) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("getServersForEventID: failed to QueryJoinedHostServerNamesInRoom") return nil } - if event != nil { + // query up to 5 servers + serversToQuery := queryRes.ServerNames + if len(serversToQuery) > 5 { + serversToQuery = serversToQuery[:5] + } + return serversToQuery +} + +func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse { + if rc.isFederatedRequest { + return nil // we don't query remote servers for remote requests + } + serversToQuery := rc.getServersForEventID(eventID) + var res *gomatrixserverlib.MSC2836EventRelationshipsResponse + var err error + for _, srv := range serversToQuery { + res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("remoteEventRelationships: failed to call MSC2836EventRelationships") + } else { + break + } + } + return res +} + +// lookForEvent returns the event for the event ID given, by trying to query remote servers +// if the event ID is unknown via /event_relationships. +func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent { + event := rc.getLocalEvent(eventID) + if event == nil { + queryRes := rc.remoteEventRelationships(eventID) + if queryRes != nil { + // inject all the events into the roomserver then return the event in question + rc.injectResponseToRoomserver(queryRes) + for _, ev := range queryRes.Events { + if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() { + return ev.Headered(ev.Version()) + } + } + } + } else if rc.hasUnexploredChildren(eventID) { + // we have the local event but we may need to do a remote hit anyway if we are exploring the thread and have unknown children. + // If we don't do this then we risk never fetching the children. + queryRes := rc.remoteEventRelationships(eventID) + if queryRes != nil { + rc.injectResponseToRoomserver(queryRes) + err := rc.db.MarkChildrenExplored(context.Background(), eventID) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Warnf("failed to mark children of %s as explored", eventID) + } + } + } + if rc.req.RoomID == event.RoomID() { return event } - // TODO: hit /event_relationships on the server we joined via - util.GetLogger(rc.ctx).Infof("joined room but need to fetch event TODO") return nil } -func getEventIfVisible(ctx context.Context, rsAPI roomserver.RoomserverInternalAPI, eventID, userID string) (*gomatrixserverlib.HeaderedEvent, bool) { +func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent { var queryEventsRes roomserver.QueryEventsByIDResponse - err := rsAPI.QueryEventsByID(ctx, &roomserver.QueryEventsByIDRequest{ + err := rc.rsAPI.QueryEventsByID(rc.ctx, &roomserver.QueryEventsByIDRequest{ EventIDs: []string{eventID}, }, &queryEventsRes) if err != nil { - util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryEventsByID") - return nil, false + util.GetLogger(rc.ctx).WithError(err).Error("getLocalEvent: failed to QueryEventsByID") + return nil } if len(queryEventsRes.Events) == 0 { - util.GetLogger(ctx).Infof("event does not exist") - return nil, false // event does not exist + util.GetLogger(rc.ctx).WithField("event_id", eventID).Infof("getLocalEvent: event does not exist") + return nil // event does not exist } - event := queryEventsRes.Events[0] + return queryEventsRes.Events[0] +} - // Allow events if the member is in the room - // TODO: This does not honour history_visibility - // TODO: This does not honour m.room.create content - var queryMembershipRes roomserver.QueryMembershipForUserResponse - err = rsAPI.QueryMembershipForUser(ctx, &roomserver.QueryMembershipForUserRequest{ - RoomID: event.RoomID(), - UserID: userID, - }, &queryMembershipRes) +// injectResponseToRoomserver injects the events +// into the roomserver as KindOutlier, with auth chains. +func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) { + var stateEvents []*gomatrixserverlib.Event + var messageEvents []*gomatrixserverlib.Event + for _, ev := range res.Events { + if ev.StateKey() != nil { + stateEvents = append(stateEvents, ev) + } else { + messageEvents = append(messageEvents, ev) + } + } + respState := gomatrixserverlib.RespState{ + AuthEvents: res.AuthChain, + StateEvents: stateEvents, + } + eventsInOrder, err := respState.Events() + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse") + return + } + // everything gets sent as an outlier because auth chain events may be disjoint from the DAG + // as may the threaded events. + var ires []roomserver.InputRoomEvent + for _, outlier := range append(eventsInOrder, messageEvents...) { + ires = append(ires, roomserver.InputRoomEvent{ + Kind: roomserver.KindOutlier, + Event: outlier.Headered(outlier.Version()), + AuthEventIDs: outlier.AuthEventIDs(), + }) + } + // we've got the data by this point so use a background context + err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver") + } + // update the child count / hash columns for these nodes. We need to do this here because not all events will make it + // through to the KindNewEventPersisted hook because the roomserver will ignore duplicates. Duplicates have meaning though + // as the `unsigned` field may differ (if the number of children changes). + for _, ev := range ires { + err = rc.db.UpdateChildMetadata(context.Background(), ev.Event) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).WithField("event_id", ev.Event.EventID()).Warn("failed to update child metadata for event") + } + } +} + +func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) { + count, hash := rc.getChildMetadata(ev.EventID()) + if count == 0 { + return + } + err := ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hash)) if err != nil { - util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryMembershipForUser") - return nil, false + util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash") + } + err = ev.SetUnsignedField("children", map[string]int{ + constRelType: count, + }) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count") } - return event, queryMembershipRes.IsInRoom +} + +func (rc *reqCtx) getChildMetadata(eventID string) (count int, hash []byte) { + children, err := rc.db.ChildrenForParent(rc.ctx, eventID, constRelType, false) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for getting child metadata") + return + } + if len(children) == 0 { + return + } + // sort it lexiographically + sort.Slice(children, func(i, j int) bool { + return children[i].EventID < children[j].EventID + }) + // hash it + var eventIDs strings.Builder + for _, c := range children { + _, _ = eventIDs.WriteString(c.EventID) + } + hashValBytes := sha256.Sum256([]byte(eventIDs.String())) + + count = len(children) + hash = hashValBytes[:] + return +} + +// hasUnexploredChildren returns true if this event has unexplored children. +// "An event has unexplored children if the `unsigned` child count on the parent does not match +// how many children the server believes the parent to have. In addition, if the counts match but +// the hashes do not match, then the event is unexplored." +func (rc *reqCtx) hasUnexploredChildren(eventID string) bool { + if rc.isFederatedRequest { + return false // we only explore children for clients, not servers. + } + // extract largest child count from event + eventCount, eventHash, explored, err := rc.db.ChildMetadata(rc.ctx, eventID) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).WithField("event_id", eventID).Warn( + "failed to get ChildMetadata from db", + ) + return false + } + // if there are no recorded children then we know we have >= children. + // if the event has already been explored (read: we hit /event_relationships successfully) + // then don't do it again. We'll only re-do this if we get an even bigger children count, + // see Database.UpdateChildMetadata + if eventCount == 0 || explored { + return false // short-circuit + } + + // calculate child count for event + calcCount, calcHash := rc.getChildMetadata(eventID) + + if eventCount < calcCount { + return false // we have more children + } else if eventCount > calcCount { + return true // the event has more children than we know about + } + // we have the same count, so a mismatched hash means some children are different + return !bytes.Equal(eventHash, calcHash) } type walkInfo struct { @@ -453,9 +758,9 @@ type walker struct { // WalkFrom the event ID given func (w *walker) WalkFrom(eventID string) (limited bool, err error) { - children, err := w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst) + children, err := w.childrenForParent(eventID) if err != nil { - util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk") + util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk") return false, err } var next *walkInfo @@ -467,9 +772,9 @@ func (w *walker) WalkFrom(eventID string) (limited bool, err error) { return true, nil } // find the children's children - children, err = w.db.ChildrenForParent(w.ctx, next.EventID, constRelType, w.req.RecentFirst) + children, err = w.childrenForParent(next.EventID) if err != nil { - util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk") + util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk") return false, err } toWalk = w.addChildren(toWalk, children, next.Depth+1) @@ -528,3 +833,20 @@ func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) { child, toWalk = toWalk[0], toWalk[1:] return &child, toWalk } + +// childrenForParent returns the children events for this event ID, honouring the direction: up|down flags +// meaning this can actually be returning the parent for the event instead of the children. +func (w *walker) childrenForParent(eventID string) ([]eventInfo, error) { + if w.req.Direction == "down" { + return w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst) + } + // find the event to pull out the parent + ei, err := w.db.ParentForChild(w.ctx, eventID, constRelType) + if err != nil { + return nil, err + } + if ei != nil { + return []eventInfo{*ei}, nil + } + return nil, nil +} |