aboutsummaryrefslogtreecommitdiff
path: root/setup/mscs/msc2836/msc2836.go
diff options
context:
space:
mode:
Diffstat (limited to 'setup/mscs/msc2836/msc2836.go')
-rw-r--r--setup/mscs/msc2836/msc2836.go598
1 files changed, 460 insertions, 138 deletions
diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go
index 33a65c8f..95473f97 100644
--- a/setup/mscs/msc2836/msc2836.go
+++ b/setup/mscs/msc2836/msc2836.go
@@ -18,10 +18,13 @@ package msc2836
import (
"bytes"
"context"
+ "crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
+ "sort"
+ "strings"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@@ -36,13 +39,12 @@ import (
)
const (
- constRelType = "m.reference"
- constRoomIDKey = "relationship_room_id"
- constRoomServers = "relationship_servers"
+ constRelType = "m.reference"
)
type EventRelationshipRequest struct {
EventID string `json:"event_id"`
+ RoomID string `json:"room_id"`
MaxDepth int `json:"max_depth"`
MaxBreadth int `json:"max_breadth"`
Limit int `json:"limit"`
@@ -52,7 +54,6 @@ type EventRelationshipRequest struct {
IncludeChildren bool `json:"include_children"`
Direction string `json:"direction"`
Batch string `json:"batch"`
- AutoJoin bool `json:"auto_join"`
}
func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) {
@@ -81,8 +82,16 @@ type EventRelationshipResponse struct {
Limited bool `json:"limited"`
}
+func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) *EventRelationshipResponse {
+ out := &EventRelationshipResponse{
+ Events: gomatrixserverlib.ToClientEvents(res.Events, gomatrixserverlib.FormatAll),
+ Limited: res.Limited,
+ NextBatch: res.NextBatch,
+ }
+ return out
+}
+
// Enable this MSC
-// nolint:gocyclo
func Enable(
base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
@@ -96,63 +105,22 @@ func Enable(
he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
hookErr := db.StoreRelation(context.Background(), he)
if hookErr != nil {
- util.GetLogger(context.Background()).WithError(hookErr).Error(
+ util.GetLogger(context.Background()).WithError(hookErr).WithField("event_id", he.EventID()).Error(
"failed to StoreRelation",
)
}
- })
- hooks.Attach(hooks.KindNewEventReceived, func(headeredEvent interface{}) {
- he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
- ctx := context.Background()
- // we only inject metadata for events our server sends
- userID := he.Sender()
- _, domain, err := gomatrixserverlib.SplitID('@', userID)
- if err != nil {
- return
- }
- if domain != base.Cfg.Global.ServerName {
- return
- }
- // if this event has an m.relationship, add on the room_id and servers to unsigned
- parent, child, relType := parentChildEventIDs(he)
- if parent == "" || child == "" || relType == "" {
- return
- }
- event, joinedToRoom := getEventIfVisible(ctx, rsAPI, parent, userID)
- if !joinedToRoom {
- return
- }
- err = he.SetUnsignedField(constRoomIDKey, event.RoomID())
- if err != nil {
- util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField")
- return
- }
-
- var servers []gomatrixserverlib.ServerName
- if fsAPI != nil {
- var res fs.QueryJoinedHostServerNamesInRoomResponse
- err = fsAPI.QueryJoinedHostServerNamesInRoom(ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
- RoomID: event.RoomID(),
- }, &res)
- if err != nil {
- util.GetLogger(context.Background()).WithError(err).Warn("Failed to QueryJoinedHostServerNamesInRoom")
- return
- }
- servers = res.ServerNames
- } else {
- servers = []gomatrixserverlib.ServerName{
- base.Cfg.Global.ServerName,
- }
- }
- err = he.SetUnsignedField(constRoomServers, servers)
- if err != nil {
- util.GetLogger(context.Background()).WithError(err).Warn("Failed to SetUnsignedField")
- return
+ // we need to update child metadata here as well as after doing remote /event_relationships requests
+ // so we catch child metadata originating from /send transactions
+ hookErr = db.UpdateChildMetadata(context.Background(), he)
+ if hookErr != nil {
+ util.GetLogger(context.Background()).WithError(err).WithField("event_id", he.EventID()).Warn(
+ "failed to update child metadata for event",
+ )
}
})
base.PublicClientAPIMux.Handle("/unstable/event_relationships",
- httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI)),
+ httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI, fsAPI)),
).Methods(http.MethodPost, http.MethodOptions)
base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI(
@@ -163,22 +131,27 @@ func Enable(
if fedReq == nil {
return errResp
}
- return federatedEventRelationship(req.Context(), fedReq, db, rsAPI)
+ return federatedEventRelationship(req.Context(), fedReq, db, rsAPI, fsAPI)
},
)).Methods(http.MethodPost, http.MethodOptions)
return nil
}
type reqCtx struct {
- ctx context.Context
- rsAPI roomserver.RoomserverInternalAPI
- db Database
- req *EventRelationshipRequest
- userID string
+ ctx context.Context
+ rsAPI roomserver.RoomserverInternalAPI
+ db Database
+ req *EventRelationshipRequest
+ userID string
+ roomVersion gomatrixserverlib.RoomVersion
+
+ // federated request args
isFederatedRequest bool
+ serverName gomatrixserverlib.ServerName
+ fsAPI fs.FederationSenderInternalAPI
}
-func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
+func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
return func(req *http.Request, device *userapi.Device) util.JSONResponse {
relation, err := NewEventRelationshipRequest(req.Body)
if err != nil {
@@ -193,6 +166,7 @@ func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAP
req: relation,
userID: device.UserID,
rsAPI: rsAPI,
+ fsAPI: fsAPI,
isFederatedRequest: false,
db: db,
}
@@ -203,12 +177,14 @@ func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAP
return util.JSONResponse{
Code: 200,
- JSON: res,
+ JSON: toClientResponse(res),
}
}
}
-func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI) util.JSONResponse {
+func federatedEventRelationship(
+ ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
+) util.JSONResponse {
relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content()))
if err != nil {
util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON")
@@ -218,17 +194,43 @@ func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.F
}
}
rc := reqCtx{
- ctx: ctx,
- req: relation,
- userID: "",
- rsAPI: rsAPI,
+ ctx: ctx,
+ req: relation,
+ rsAPI: rsAPI,
+ db: db,
+ // federation args
isFederatedRequest: true,
- db: db,
+ fsAPI: fsAPI,
+ serverName: fedReq.Origin(),
}
res, resErr := rc.process()
if resErr != nil {
return *resErr
}
+ // add auth chain information
+ requiredAuthEventsSet := make(map[string]bool)
+ var requiredAuthEvents []string
+ for _, ev := range res.Events {
+ for _, a := range ev.AuthEventIDs() {
+ if requiredAuthEventsSet[a] {
+ continue
+ }
+ requiredAuthEvents = append(requiredAuthEvents, a)
+ requiredAuthEventsSet[a] = true
+ }
+ }
+ var queryRes roomserver.QueryAuthChainResponse
+ err = rsAPI.QueryAuthChain(ctx, &roomserver.QueryAuthChainRequest{
+ EventIDs: requiredAuthEvents,
+ }, &queryRes)
+ if err != nil {
+ // they may already have the auth events so don't fail this request
+ util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain")
+ }
+ res.AuthChain = make([]*gomatrixserverlib.Event, len(queryRes.AuthChain))
+ for i := range queryRes.AuthChain {
+ res.AuthChain[i] = queryRes.AuthChain[i].Unwrap()
+ }
return util.JSONResponse{
Code: 200,
@@ -236,18 +238,25 @@ func federatedEventRelationship(ctx context.Context, fedReq *gomatrixserverlib.F
}
}
-func (rc *reqCtx) process() (*EventRelationshipResponse, *util.JSONResponse) {
- var res EventRelationshipResponse
+// nolint:gocyclo
+func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) {
+ var res gomatrixserverlib.MSC2836EventRelationshipsResponse
var returnEvents []*gomatrixserverlib.HeaderedEvent
// Can the user see (according to history visibility) event_id? If no, reject the request, else continue.
- // We should have the event being referenced so don't give any claimed room ID / servers
- event := rc.getEventIfVisible(rc.req.EventID, "", nil)
+ event := rc.getLocalEvent(rc.req.EventID)
if event == nil {
+ event = rc.fetchUnknownEvent(rc.req.EventID, rc.req.RoomID)
+ }
+ if rc.req.RoomID == "" && event != nil {
+ rc.req.RoomID = event.RoomID()
+ }
+ if event == nil || !rc.authorisedToSeeEvent(event) {
return nil, &util.JSONResponse{
Code: 403,
JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"),
}
}
+ rc.roomVersion = event.Version()
// Retrieve the event. Add it to response array.
returnEvents = append(returnEvents, event)
@@ -282,29 +291,122 @@ func (rc *reqCtx) process() (*EventRelationshipResponse, *util.JSONResponse) {
)
returnEvents = append(returnEvents, events...)
}
- res.Events = make([]gomatrixserverlib.ClientEvent, len(returnEvents))
+ res.Events = make([]*gomatrixserverlib.Event, len(returnEvents))
for i, ev := range returnEvents {
- res.Events[i] = gomatrixserverlib.HeaderedToClientEvent(ev, gomatrixserverlib.FormatAll)
+ // for each event, extract the children_count | hash and add it as unsigned data.
+ rc.addChildMetadata(ev)
+ res.Events[i] = ev.Unwrap()
}
res.Limited = remaining == 0 || walkLimited
return &res, nil
}
+// fetchUnknownEvent retrieves an unknown event from the room specified. This server must
+// be joined to the room in question. This has the side effect of injecting surround threaded
+// events into the roomserver.
+func (rc *reqCtx) fetchUnknownEvent(eventID, roomID string) *gomatrixserverlib.HeaderedEvent {
+ if rc.isFederatedRequest || roomID == "" {
+ // we don't do fed hits for fed requests, and we can't ask servers without a room ID!
+ return nil
+ }
+ logger := util.GetLogger(rc.ctx).WithField("room_id", roomID)
+ // if they supplied a room_id, check the room exists.
+ var queryVerRes roomserver.QueryRoomVersionForRoomResponse
+ err := rc.rsAPI.QueryRoomVersionForRoom(rc.ctx, &roomserver.QueryRoomVersionForRoomRequest{
+ RoomID: roomID,
+ }, &queryVerRes)
+ if err != nil {
+ logger.WithError(err).Warn("failed to query room version for room, does this room exist?")
+ return nil
+ }
+
+ // check the user is joined to that room
+ var queryMemRes roomserver.QueryMembershipForUserResponse
+ err = rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{
+ RoomID: roomID,
+ UserID: rc.userID,
+ }, &queryMemRes)
+ if err != nil {
+ logger.WithError(err).Warn("failed to query membership for user in room")
+ return nil
+ }
+ if !queryMemRes.IsInRoom {
+ return nil
+ }
+
+ // ask one of the servers in the room for the event
+ var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
+ err = rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
+ RoomID: roomID,
+ }, &queryRes)
+ if err != nil {
+ logger.WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom")
+ return nil
+ }
+ // query up to 5 servers
+ serversToQuery := queryRes.ServerNames
+ if len(serversToQuery) > 5 {
+ serversToQuery = serversToQuery[:5]
+ }
+
+ // fetch the event, along with some of the surrounding thread (if it's threaded) and the auth chain.
+ // Inject the response into the roomserver to remember the event across multiple calls and to set
+ // unexplored flags correctly.
+ for _, srv := range serversToQuery {
+ res, err := rc.MSC2836EventRelationships(eventID, srv, queryVerRes.RoomVersion)
+ if err != nil {
+ continue
+ }
+ rc.injectResponseToRoomserver(res)
+ for _, ev := range res.Events {
+ if ev.EventID() == eventID {
+ return ev.Headered(ev.Version())
+ }
+ }
+ }
+ logger.WithField("servers", serversToQuery).Warn("failed to query event relationships")
+ return nil
+}
+
// If include_parent: true and there is a valid m.relationship field in the event,
// retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array.
-func (rc *reqCtx) includeParent(event *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) {
- parentID, _, _ := parentChildEventIDs(event)
+func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) {
+ parentID, _, _ := parentChildEventIDs(childEvent)
if parentID == "" {
return nil
}
- claimedRoomID, claimedServers := roomIDAndServers(event)
- return rc.getEventIfVisible(parentID, claimedRoomID, claimedServers)
+ return rc.lookForEvent(parentID)
}
// If include_children: true, lookup all events which have event_id as an m.relationship
// Apply history visibility checks to all these events and add the ones which pass into the response array,
// honouring the recent_first flag and the limit.
func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) {
+ if rc.hasUnexploredChildren(parentID) {
+ // we need to do a remote request to pull in the children as we are missing them locally.
+ serversToQuery := rc.getServersForEventID(parentID)
+ var result *gomatrixserverlib.MSC2836EventRelationshipsResponse
+ for _, srv := range serversToQuery {
+ res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
+ EventID: parentID,
+ Direction: "down",
+ Limit: 100,
+ MaxBreadth: -1,
+ MaxDepth: 1, // we just want the children from this parent
+ RecentFirst: true,
+ }, rc.roomVersion)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships")
+ } else {
+ result = &res
+ break
+ }
+ }
+ if result != nil {
+ rc.injectResponseToRoomserver(result)
+ }
+ // fallthrough to pull these new events from the DB
+ }
children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst)
if err != nil {
util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent")
@@ -313,8 +415,7 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen
}
var childEvents []*gomatrixserverlib.HeaderedEvent
for _, child := range children {
- // in order for us to even know about the children the server must be joined to those rooms, hence pass no claimed room ID or servers.
- childEvent := rc.getEventIfVisible(child.EventID, "", nil)
+ childEvent := rc.lookForEvent(child.EventID)
if childEvent != nil {
childEvents = append(childEvents, childEvent)
}
@@ -327,14 +428,9 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen
// Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag,
// honouring the limit, max_depth and max_breadth values according to the following rules
-// nolint: unparam
func walkThread(
ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int,
) ([]*gomatrixserverlib.HeaderedEvent, bool) {
- if rc.req.Direction != "down" {
- util.GetLogger(ctx).Error("not implemented: direction=up")
- return nil, false
- }
var result []*gomatrixserverlib.HeaderedEvent
eventWalker := walker{
ctx: ctx,
@@ -352,8 +448,11 @@ func walkThread(
}
// Process the event.
- // TODO: Include edge information: room ID and servers
- event := rc.getEventIfVisible(wi.EventID, "", nil)
+ // if event is not found, use remoteEventRelationships to explore that part of the thread remotely.
+ // This will probably be easiest if the event relationships response is directly pumped into the database
+ // so the next walk will do the right thing. This requires those events to be authed and likely injected as
+ // outliers into the roomserver DB, which will de-dupe appropriately.
+ event := rc.lookForEvent(wi.EventID)
if event != nil {
result = append(result, event)
}
@@ -368,74 +467,280 @@ func walkThread(
return result, limited
}
-func (rc *reqCtx) getEventIfVisible(eventID string, claimedRoomID string, claimedServers []string) *gomatrixserverlib.HeaderedEvent {
- event, joinedToRoom := getEventIfVisible(rc.ctx, rc.rsAPI, eventID, rc.userID)
- if event != nil && joinedToRoom {
- return event
+// MSC2836EventRelationships performs an /event_relationships request to a remote server
+func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*gomatrixserverlib.MSC2836EventRelationshipsResponse, error) {
+ res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
+ EventID: eventID,
+ DepthFirst: rc.req.DepthFirst,
+ Direction: rc.req.Direction,
+ Limit: rc.req.Limit,
+ MaxBreadth: rc.req.MaxBreadth,
+ MaxDepth: rc.req.MaxDepth,
+ RecentFirst: rc.req.RecentFirst,
+ }, ver)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships")
+ return nil, err
+ }
+ return &res, nil
+
+}
+
+// authorisedToSeeEvent checks that the user or server is allowed to see this event. Returns true if allowed to
+// see this request. This only needs to be done once per room at present as we just check for joined status.
+func (rc *reqCtx) authorisedToSeeEvent(event *gomatrixserverlib.HeaderedEvent) bool {
+ if rc.isFederatedRequest {
+ // make sure the server is in this room
+ var res fs.QueryJoinedHostServerNamesInRoomResponse
+ err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
+ RoomID: event.RoomID(),
+ }, &res)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryJoinedHostServerNamesInRoom")
+ return false
+ }
+ for _, srv := range res.ServerNames {
+ if srv == rc.serverName {
+ return true
+ }
+ }
+ return false
}
- // either we don't have the event or we aren't joined to the room, regardless we should try joining if auto join is enabled
- if !rc.req.AutoJoin {
+ // make sure the user is in this room
+ // Allow events if the member is in the room
+ // TODO: This does not honour history_visibility
+ // TODO: This does not honour m.room.create content
+ var queryMembershipRes roomserver.QueryMembershipForUserResponse
+ err := rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{
+ RoomID: event.RoomID(),
+ UserID: rc.userID,
+ }, &queryMembershipRes)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryMembershipForUser")
+ return false
+ }
+ return queryMembershipRes.IsInRoom
+}
+
+func (rc *reqCtx) getServersForEventID(eventID string) []gomatrixserverlib.ServerName {
+ if rc.req.RoomID == "" {
+ util.GetLogger(rc.ctx).WithField("event_id", eventID).Error(
+ "getServersForEventID: event exists in unknown room",
+ )
return nil
}
- // if we're doing this on behalf of a random server don't auto-join rooms regardless of what the request says
- if rc.isFederatedRequest {
+ if rc.roomVersion == "" {
+ util.GetLogger(rc.ctx).WithField("event_id", eventID).Errorf(
+ "getServersForEventID: event exists in %s with unknown room version", rc.req.RoomID,
+ )
return nil
}
- roomID := claimedRoomID
- var servers []gomatrixserverlib.ServerName
- if event != nil {
- roomID = event.RoomID()
- }
- for _, s := range claimedServers {
- servers = append(servers, gomatrixserverlib.ServerName(s))
- }
- var joinRes roomserver.PerformJoinResponse
- rc.rsAPI.PerformJoin(rc.ctx, &roomserver.PerformJoinRequest{
- UserID: rc.userID,
- Content: map[string]interface{}{},
- RoomIDOrAlias: roomID,
- ServerNames: servers,
- }, &joinRes)
- if joinRes.Error != nil {
- util.GetLogger(rc.ctx).WithError(joinRes.Error).WithField("room_id", roomID).Error("Failed to auto-join room")
+ var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
+ err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
+ RoomID: rc.req.RoomID,
+ }, &queryRes)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("getServersForEventID: failed to QueryJoinedHostServerNamesInRoom")
return nil
}
- if event != nil {
+ // query up to 5 servers
+ serversToQuery := queryRes.ServerNames
+ if len(serversToQuery) > 5 {
+ serversToQuery = serversToQuery[:5]
+ }
+ return serversToQuery
+}
+
+func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse {
+ if rc.isFederatedRequest {
+ return nil // we don't query remote servers for remote requests
+ }
+ serversToQuery := rc.getServersForEventID(eventID)
+ var res *gomatrixserverlib.MSC2836EventRelationshipsResponse
+ var err error
+ for _, srv := range serversToQuery {
+ res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("remoteEventRelationships: failed to call MSC2836EventRelationships")
+ } else {
+ break
+ }
+ }
+ return res
+}
+
+// lookForEvent returns the event for the event ID given, by trying to query remote servers
+// if the event ID is unknown via /event_relationships.
+func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
+ event := rc.getLocalEvent(eventID)
+ if event == nil {
+ queryRes := rc.remoteEventRelationships(eventID)
+ if queryRes != nil {
+ // inject all the events into the roomserver then return the event in question
+ rc.injectResponseToRoomserver(queryRes)
+ for _, ev := range queryRes.Events {
+ if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() {
+ return ev.Headered(ev.Version())
+ }
+ }
+ }
+ } else if rc.hasUnexploredChildren(eventID) {
+ // we have the local event but we may need to do a remote hit anyway if we are exploring the thread and have unknown children.
+ // If we don't do this then we risk never fetching the children.
+ queryRes := rc.remoteEventRelationships(eventID)
+ if queryRes != nil {
+ rc.injectResponseToRoomserver(queryRes)
+ err := rc.db.MarkChildrenExplored(context.Background(), eventID)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Warnf("failed to mark children of %s as explored", eventID)
+ }
+ }
+ }
+ if rc.req.RoomID == event.RoomID() {
return event
}
- // TODO: hit /event_relationships on the server we joined via
- util.GetLogger(rc.ctx).Infof("joined room but need to fetch event TODO")
return nil
}
-func getEventIfVisible(ctx context.Context, rsAPI roomserver.RoomserverInternalAPI, eventID, userID string) (*gomatrixserverlib.HeaderedEvent, bool) {
+func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
var queryEventsRes roomserver.QueryEventsByIDResponse
- err := rsAPI.QueryEventsByID(ctx, &roomserver.QueryEventsByIDRequest{
+ err := rc.rsAPI.QueryEventsByID(rc.ctx, &roomserver.QueryEventsByIDRequest{
EventIDs: []string{eventID},
}, &queryEventsRes)
if err != nil {
- util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryEventsByID")
- return nil, false
+ util.GetLogger(rc.ctx).WithError(err).Error("getLocalEvent: failed to QueryEventsByID")
+ return nil
}
if len(queryEventsRes.Events) == 0 {
- util.GetLogger(ctx).Infof("event does not exist")
- return nil, false // event does not exist
+ util.GetLogger(rc.ctx).WithField("event_id", eventID).Infof("getLocalEvent: event does not exist")
+ return nil // event does not exist
}
- event := queryEventsRes.Events[0]
+ return queryEventsRes.Events[0]
+}
- // Allow events if the member is in the room
- // TODO: This does not honour history_visibility
- // TODO: This does not honour m.room.create content
- var queryMembershipRes roomserver.QueryMembershipForUserResponse
- err = rsAPI.QueryMembershipForUser(ctx, &roomserver.QueryMembershipForUserRequest{
- RoomID: event.RoomID(),
- UserID: userID,
- }, &queryMembershipRes)
+// injectResponseToRoomserver injects the events
+// into the roomserver as KindOutlier, with auth chains.
+func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) {
+ var stateEvents []*gomatrixserverlib.Event
+ var messageEvents []*gomatrixserverlib.Event
+ for _, ev := range res.Events {
+ if ev.StateKey() != nil {
+ stateEvents = append(stateEvents, ev)
+ } else {
+ messageEvents = append(messageEvents, ev)
+ }
+ }
+ respState := gomatrixserverlib.RespState{
+ AuthEvents: res.AuthChain,
+ StateEvents: stateEvents,
+ }
+ eventsInOrder, err := respState.Events()
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse")
+ return
+ }
+ // everything gets sent as an outlier because auth chain events may be disjoint from the DAG
+ // as may the threaded events.
+ var ires []roomserver.InputRoomEvent
+ for _, outlier := range append(eventsInOrder, messageEvents...) {
+ ires = append(ires, roomserver.InputRoomEvent{
+ Kind: roomserver.KindOutlier,
+ Event: outlier.Headered(outlier.Version()),
+ AuthEventIDs: outlier.AuthEventIDs(),
+ })
+ }
+ // we've got the data by this point so use a background context
+ err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
+ }
+ // update the child count / hash columns for these nodes. We need to do this here because not all events will make it
+ // through to the KindNewEventPersisted hook because the roomserver will ignore duplicates. Duplicates have meaning though
+ // as the `unsigned` field may differ (if the number of children changes).
+ for _, ev := range ires {
+ err = rc.db.UpdateChildMetadata(context.Background(), ev.Event)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).WithField("event_id", ev.Event.EventID()).Warn("failed to update child metadata for event")
+ }
+ }
+}
+
+func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) {
+ count, hash := rc.getChildMetadata(ev.EventID())
+ if count == 0 {
+ return
+ }
+ err := ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hash))
if err != nil {
- util.GetLogger(ctx).WithError(err).Error("getEventIfVisible: failed to QueryMembershipForUser")
- return nil, false
+ util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash")
+ }
+ err = ev.SetUnsignedField("children", map[string]int{
+ constRelType: count,
+ })
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count")
}
- return event, queryMembershipRes.IsInRoom
+}
+
+func (rc *reqCtx) getChildMetadata(eventID string) (count int, hash []byte) {
+ children, err := rc.db.ChildrenForParent(rc.ctx, eventID, constRelType, false)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for getting child metadata")
+ return
+ }
+ if len(children) == 0 {
+ return
+ }
+ // sort it lexiographically
+ sort.Slice(children, func(i, j int) bool {
+ return children[i].EventID < children[j].EventID
+ })
+ // hash it
+ var eventIDs strings.Builder
+ for _, c := range children {
+ _, _ = eventIDs.WriteString(c.EventID)
+ }
+ hashValBytes := sha256.Sum256([]byte(eventIDs.String()))
+
+ count = len(children)
+ hash = hashValBytes[:]
+ return
+}
+
+// hasUnexploredChildren returns true if this event has unexplored children.
+// "An event has unexplored children if the `unsigned` child count on the parent does not match
+// how many children the server believes the parent to have. In addition, if the counts match but
+// the hashes do not match, then the event is unexplored."
+func (rc *reqCtx) hasUnexploredChildren(eventID string) bool {
+ if rc.isFederatedRequest {
+ return false // we only explore children for clients, not servers.
+ }
+ // extract largest child count from event
+ eventCount, eventHash, explored, err := rc.db.ChildMetadata(rc.ctx, eventID)
+ if err != nil {
+ util.GetLogger(rc.ctx).WithError(err).WithField("event_id", eventID).Warn(
+ "failed to get ChildMetadata from db",
+ )
+ return false
+ }
+ // if there are no recorded children then we know we have >= children.
+ // if the event has already been explored (read: we hit /event_relationships successfully)
+ // then don't do it again. We'll only re-do this if we get an even bigger children count,
+ // see Database.UpdateChildMetadata
+ if eventCount == 0 || explored {
+ return false // short-circuit
+ }
+
+ // calculate child count for event
+ calcCount, calcHash := rc.getChildMetadata(eventID)
+
+ if eventCount < calcCount {
+ return false // we have more children
+ } else if eventCount > calcCount {
+ return true // the event has more children than we know about
+ }
+ // we have the same count, so a mismatched hash means some children are different
+ return !bytes.Equal(eventHash, calcHash)
}
type walkInfo struct {
@@ -453,9 +758,9 @@ type walker struct {
// WalkFrom the event ID given
func (w *walker) WalkFrom(eventID string) (limited bool, err error) {
- children, err := w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst)
+ children, err := w.childrenForParent(eventID)
if err != nil {
- util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk")
+ util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk")
return false, err
}
var next *walkInfo
@@ -467,9 +772,9 @@ func (w *walker) WalkFrom(eventID string) (limited bool, err error) {
return true, nil
}
// find the children's children
- children, err = w.db.ChildrenForParent(w.ctx, next.EventID, constRelType, w.req.RecentFirst)
+ children, err = w.childrenForParent(next.EventID)
if err != nil {
- util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() ChildrenForParent failed, cannot walk")
+ util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk")
return false, err
}
toWalk = w.addChildren(toWalk, children, next.Depth+1)
@@ -528,3 +833,20 @@ func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) {
child, toWalk = toWalk[0], toWalk[1:]
return &child, toWalk
}
+
+// childrenForParent returns the children events for this event ID, honouring the direction: up|down flags
+// meaning this can actually be returning the parent for the event instead of the children.
+func (w *walker) childrenForParent(eventID string) ([]eventInfo, error) {
+ if w.req.Direction == "down" {
+ return w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst)
+ }
+ // find the event to pull out the parent
+ ei, err := w.db.ParentForChild(w.ctx, eventID, constRelType)
+ if err != nil {
+ return nil, err
+ }
+ if ei != nil {
+ return []eventInfo{*ei}, nil
+ }
+ return nil, nil
+}