diff options
author | devonh <devon.dmytro@gmail.com> | 2023-04-27 00:43:46 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-27 00:43:46 +0000 |
commit | dd5e47a9a75f717381c27adebdee18aa80a1f256 (patch) | |
tree | 5bc3a07edf710e7756537f0461cc90ef944e695a | |
parent | ed19efc5d751446a57727c0880437d95be26ba9d (diff) |
Move high level room joining logic to GMSL (#3065)
GMSL PR: https://github.com/matrix-org/gomatrixserverlib/pull/372
-rw-r--r-- | federationapi/api/api.go | 1 | ||||
-rw-r--r-- | federationapi/federationapi_test.go | 4 | ||||
-rw-r--r-- | federationapi/internal/federationclient.go | 42 | ||||
-rw-r--r-- | federationapi/internal/perform.go | 217 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 |
6 files changed, 77 insertions, 193 deletions
diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 0048b4b0..c223f504 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -16,6 +16,7 @@ import ( // FederationInternalAPI is used to query information from the federation sender. type FederationInternalAPI interface { gomatrixserverlib.FederatedStateClient + gomatrixserverlib.FederatedJoinClient KeyserverFederationAPI gomatrixserverlib.KeyDatabase ClientFederationAPI diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index ca144491..46b67aa2 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -106,7 +106,9 @@ func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer spec.ServerN return keys, nil } -func (f *fedClient) MakeJoin(ctx context.Context, origin, s spec.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespMakeJoin, err error) { +func (f *fedClient) MakeJoin(ctx context.Context, origin, s spec.ServerName, roomID, userID string) (res fclient.RespMakeJoin, err error) { + f.fedClientMutex.Lock() + defer f.fedClientMutex.Unlock() for _, r := range f.allowJoins { if r.ID == roomID { res.RoomVersion = r.Version diff --git a/federationapi/internal/federationclient.go b/federationapi/internal/federationclient.go index e4288a20..dd329057 100644 --- a/federationapi/internal/federationclient.go +++ b/federationapi/internal/federationclient.go @@ -9,14 +9,40 @@ import ( "github.com/matrix-org/gomatrixserverlib/spec" ) +const defaultTimeout = time.Second * 30 + // Functions here are "proxying" calls to the gomatrixserverlib federation // client. +func (a *FederationInternalAPI) MakeJoin( + ctx context.Context, origin, s spec.ServerName, roomID, userID string, +) (res gomatrixserverlib.MakeJoinResponse, err error) { + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + ires, err := a.federation.MakeJoin(ctx, origin, s, roomID, userID) + if err != nil { + return &fclient.RespMakeJoin{}, err + } + return &ires, nil +} + +func (a *FederationInternalAPI) SendJoin( + ctx context.Context, origin, s spec.ServerName, event *gomatrixserverlib.Event, +) (res gomatrixserverlib.SendJoinResponse, err error) { + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + ires, err := a.federation.SendJoin(ctx, origin, s, event) + if err != nil { + return &fclient.RespSendJoin{}, err + } + return &ires, nil +} + func (a *FederationInternalAPI) GetEventAuth( ctx context.Context, origin, s spec.ServerName, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, ) (res fclient.RespEventAuth, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.GetEventAuth(ctx, origin, s, roomVersion, roomID, eventID) @@ -30,7 +56,7 @@ func (a *FederationInternalAPI) GetEventAuth( func (a *FederationInternalAPI) GetUserDevices( ctx context.Context, origin, s spec.ServerName, userID string, ) (fclient.RespUserDevices, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.GetUserDevices(ctx, origin, s, userID) @@ -44,7 +70,7 @@ func (a *FederationInternalAPI) GetUserDevices( func (a *FederationInternalAPI) ClaimKeys( ctx context.Context, origin, s spec.ServerName, oneTimeKeys map[string]map[string]string, ) (fclient.RespClaimKeys, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.ClaimKeys(ctx, origin, s, oneTimeKeys) @@ -70,7 +96,7 @@ func (a *FederationInternalAPI) QueryKeys( func (a *FederationInternalAPI) Backfill( ctx context.Context, origin, s spec.ServerName, roomID string, limit int, eventIDs []string, ) (res gomatrixserverlib.Transaction, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.Backfill(ctx, origin, s, roomID, limit, eventIDs) @@ -84,7 +110,7 @@ func (a *FederationInternalAPI) Backfill( func (a *FederationInternalAPI) LookupState( ctx context.Context, origin, s spec.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, ) (res gomatrixserverlib.StateResponse, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupState(ctx, origin, s, roomID, eventID, roomVersion) @@ -99,7 +125,7 @@ func (a *FederationInternalAPI) LookupState( func (a *FederationInternalAPI) LookupStateIDs( ctx context.Context, origin, s spec.ServerName, roomID, eventID string, ) (res gomatrixserverlib.StateIDResponse, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupStateIDs(ctx, origin, s, roomID, eventID) @@ -114,7 +140,7 @@ func (a *FederationInternalAPI) LookupMissingEvents( ctx context.Context, origin, s spec.ServerName, roomID string, missing fclient.MissingEvents, roomVersion gomatrixserverlib.RoomVersion, ) (res fclient.RespMissingEvents, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupMissingEvents(ctx, origin, s, roomID, missing, roomVersion) @@ -128,7 +154,7 @@ func (a *FederationInternalAPI) LookupMissingEvents( func (a *FederationInternalAPI) GetEvent( ctx context.Context, origin, s spec.ServerName, eventID string, ) (res gomatrixserverlib.Transaction, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.GetEvent(ctx, origin, s, eventID) diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index c73b69d9..2f9b0a54 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -73,12 +73,6 @@ func (r *FederationInternalAPI) PerformJoin( r.joins.Store(j, nil) defer r.joins.Delete(j) - // Look up the supported room versions. - var supportedVersions []gomatrixserverlib.RoomVersion - for version := range version.SupportedRoomVersions() { - supportedVersions = append(supportedVersions, version) - } - // Deduplicate the server names we were provided but keep the ordering // as this encodes useful information about which servers are most likely // to respond. @@ -103,7 +97,6 @@ func (r *FederationInternalAPI) PerformJoin( request.UserID, request.Content, serverName, - supportedVersions, request.Unsigned, ); err != nil { logrus.WithError(err).WithFields(logrus.Fields{ @@ -146,128 +139,41 @@ func (r *FederationInternalAPI) performJoinUsingServer( roomID, userID string, content map[string]interface{}, serverName spec.ServerName, - supportedVersions []gomatrixserverlib.RoomVersion, unsigned map[string]interface{}, ) error { if !r.shouldAttemptDirectFederation(serverName) { return fmt.Errorf("relay servers have no meaningful response for join.") } - _, origin, err := r.cfg.Matrix.SplitLocalID('@', userID) + user, err := spec.NewUserID(userID, true) if err != nil { return err } - // Try to perform a make_join using the information supplied in the - // request. - respMakeJoin, err := r.federation.MakeJoin( - ctx, - origin, - serverName, - roomID, - userID, - supportedVersions, - ) - if err != nil { - // TODO: Check if the user was not allowed to join the room. - r.statistics.ForServer(serverName).Failure() - return fmt.Errorf("r.federation.MakeJoin: %w", err) - } - r.statistics.ForServer(serverName).Success(statistics.SendDirect) - - // Set all the fields to be what they should be, this should be a no-op - // but it's possible that the remote server returned us something "odd" - respMakeJoin.JoinEvent.Type = spec.MRoomMember - respMakeJoin.JoinEvent.Sender = userID - respMakeJoin.JoinEvent.StateKey = &userID - respMakeJoin.JoinEvent.RoomID = roomID - respMakeJoin.JoinEvent.Redacts = "" - if content == nil { - content = map[string]interface{}{} - } - _ = json.Unmarshal(respMakeJoin.JoinEvent.Content, &content) - content["membership"] = spec.Join - if err = respMakeJoin.JoinEvent.SetContent(content); err != nil { - return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err) - } - if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil { - return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err) - } - - // Work out if we support the room version that has been supplied in - // the make_join response. - // "If not provided, the room version is assumed to be either "1" or "2"." - // https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid - if respMakeJoin.RoomVersion == "" { - respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent) - } - verImpl, err := gomatrixserverlib.GetRoomVersion(respMakeJoin.RoomVersion) - if err != nil { - return err - } - - // Build the join event. - event, err := respMakeJoin.JoinEvent.Build( - time.Now(), - origin, - r.cfg.Matrix.KeyID, - r.cfg.Matrix.PrivateKey, - respMakeJoin.RoomVersion, - ) - if err != nil { - return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) - } - - // Try to perform a send_join using the newly built event. - respSendJoin, err := r.federation.SendJoin( - context.Background(), - origin, - serverName, - event, - ) - if err != nil { - r.statistics.ForServer(serverName).Failure() - return fmt.Errorf("r.federation.SendJoin: %w", err) + joinInput := gomatrixserverlib.PerformJoinInput{ + UserID: user, + RoomID: roomID, + ServerName: serverName, + Content: content, + Unsigned: unsigned, + PrivateKey: r.cfg.Matrix.PrivateKey, + KeyID: r.cfg.Matrix.KeyID, + KeyRing: r.keyRing, + EventProvider: federatedEventProvider(ctx, r.federation, r.keyRing, user.Domain(), serverName), } - r.statistics.ForServer(serverName).Success(statistics.SendDirect) + response, joinErr := gomatrixserverlib.PerformJoin(ctx, r, joinInput) - // If the remote server returned an event in the "event" key of - // the send_join request then we should use that instead. It may - // contain signatures that we don't know about. - if len(respSendJoin.Event) > 0 { - var remoteEvent *gomatrixserverlib.Event - remoteEvent, err = verImpl.NewEventFromUntrustedJSON(respSendJoin.Event) - if err == nil && isWellFormedMembershipEvent( - remoteEvent, roomID, userID, - ) { - event = remoteEvent + if joinErr != nil { + if !joinErr.Reachable { + r.statistics.ForServer(joinErr.ServerName).Failure() + } else { + r.statistics.ForServer(joinErr.ServerName).Success(statistics.SendDirect) } + return joinErr.Err } - - // Sanity-check the join response to ensure that it has a create - // event, that the room version is known, etc. - authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion) - if err = sanityCheckAuthChain(authEvents); err != nil { - return fmt.Errorf("sanityCheckAuthChain: %w", err) - } - - // Process the join response in a goroutine. The idea here is - // that we'll try and wait for as long as possible for the work - // to complete, but if the client does give up waiting, we'll - // still continue to process the join anyway so that we don't - // waste the effort. - // TODO: Can we expand Check here to return a list of missing auth - // events rather than failing one at a time? - var respState gomatrixserverlib.StateResponse - respState, err = gomatrixserverlib.CheckSendJoinResponse( - context.Background(), - respMakeJoin.RoomVersion, &respSendJoin, - r.keyRing, - event, - federatedAuthProvider(ctx, r.federation, r.keyRing, origin, serverName), - ) - if err != nil { - return fmt.Errorf("respSendJoin.Check: %w", err) + r.statistics.ForServer(serverName).Success(statistics.SendDirect) + if response == nil { + return fmt.Errorf("Received nil response from gomatrixserverlib.PerformJoin") } // We need to immediately update our list of joined hosts for this room now as we are technically @@ -276,60 +182,33 @@ func (r *FederationInternalAPI) performJoinUsingServer( // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") // The events are trusted now as we performed auth checks above. - joinedHosts, err := consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) + joinedHosts, err := consumers.JoinedHostsFromEvents(response.StateSnapshot.GetStateEvents().TrustedEvents(response.JoinEvent.Version(), false)) if err != nil { return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) } + logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) } - // If we successfully performed a send_join above then the other - // server now thinks we're a part of the room. Send the newly - // returned state to the roomserver to update our local view. - if unsigned != nil { - event, err = event.SetUnsigned(unsigned) - if err != nil { - // non-fatal, log and continue - logrus.WithError(err).Errorf("Failed to set unsigned content") - } - } - + // TODO: Can I change this to not take respState but instead just take an opaque list of events? if err = roomserverAPI.SendEventWithState( context.Background(), r.rsAPI, - origin, + user.Domain(), roomserverAPI.KindNew, - respState, - event.Headered(respMakeJoin.RoomVersion), + response.StateSnapshot, + response.JoinEvent.Headered(response.JoinEvent.Version()), serverName, nil, false, ); err != nil { return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) } - return nil } -// isWellFormedMembershipEvent returns true if the event looks like a legitimate -// membership event. -func isWellFormedMembershipEvent(event *gomatrixserverlib.Event, roomID, userID string) bool { - if membership, err := event.Membership(); err != nil { - return false - } else if membership != spec.Join { - return false - } - if event.RoomID() != roomID { - return false - } - if !event.StateKeyEquals(userID) { - return false - } - return true -} - // PerformOutboundPeekRequest implements api.FederationInternalAPI func (r *FederationInternalAPI) PerformOutboundPeek( ctx context.Context, @@ -475,12 +354,12 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer( // authenticate the state returned (check its auth events etc) // the equivalent of CheckSendJoinResponse() authEvents, stateEvents, err := gomatrixserverlib.CheckStateResponse( - ctx, &respPeek, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, r.cfg.Matrix.ServerName, serverName), + ctx, &respPeek, respPeek.RoomVersion, r.keyRing, federatedEventProvider(ctx, r.federation, r.keyRing, r.cfg.Matrix.ServerName, serverName), ) if err != nil { return fmt.Errorf("error checking state returned from peeking: %w", err) } - if err = sanityCheckAuthChain(authEvents); err != nil { + if err = checkEventsContainCreateEvent(authEvents); err != nil { return fmt.Errorf("sanityCheckAuthChain: %w", err) } @@ -719,9 +598,9 @@ func (r *FederationInternalAPI) MarkServersAlive(destinations []spec.ServerName) } } -func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error { +func checkEventsContainCreateEvent(events []*gomatrixserverlib.Event) error { // sanity check we have a create event and it has a known room version - for _, ev := range authChain { + for _, ev := range events { if ev.Type() == spec.MRoomCreate && ev.StateKeyEquals("") { // make sure the room version is known content := ev.Content() @@ -739,43 +618,19 @@ func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error { } knownVersions := gomatrixserverlib.RoomVersions() if _, ok := knownVersions[gomatrixserverlib.RoomVersion(verBody.Version)]; !ok { - return fmt.Errorf("auth chain m.room.create event has an unknown room version: %s", verBody.Version) + return fmt.Errorf("m.room.create event has an unknown room version: %s", verBody.Version) } return nil } } - return fmt.Errorf("auth chain response is missing m.room.create event") -} - -func setDefaultRoomVersionFromJoinEvent( - joinEvent gomatrixserverlib.EventBuilder, -) gomatrixserverlib.RoomVersion { - // if auth events are not event references we know it must be v3+ - // we have to do these shenanigans to satisfy sytest, specifically for: - // "Outbound federation rejects m.room.create events with an unknown room version" - hasEventRefs := true - authEvents, ok := joinEvent.AuthEvents.([]interface{}) - if ok { - if len(authEvents) > 0 { - _, ok = authEvents[0].(string) - if ok { - // event refs are objects, not strings, so we know we must be dealing with a v3+ room. - hasEventRefs = false - } - } - } - - if hasEventRefs { - return gomatrixserverlib.RoomVersionV1 - } - return gomatrixserverlib.RoomVersionV4 + return fmt.Errorf("response is missing m.room.create event") } -// FederatedAuthProvider is an auth chain provider which fetches events from the server provided -func federatedAuthProvider( +// federatedEventProvider is an event provider which fetches events from the server provided +func federatedEventProvider( ctx context.Context, federation fclient.FederationClient, keyRing gomatrixserverlib.JSONVerifier, origin, server spec.ServerName, -) gomatrixserverlib.AuthChainProvider { +) gomatrixserverlib.EventProvider { // A list of events that we have retried, if they were not included in // the auth events supplied in the send_join. retries := map[string][]*gomatrixserverlib.Event{} @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20230424155704-8daeaebaa0bc + github.com/matrix-org/gomatrixserverlib v0.0.0-20230427002343-809b162d0e4f github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.16 @@ -321,8 +321,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230424155704-8daeaebaa0bc h1:F73iHhpTZxWVO6qbyGZxd7Ch44v1gK6xNQZ7QVos/Es= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230424155704-8daeaebaa0bc/go.mod h1:7HTbSZe+CIdmeqVyFMekwD5dFU8khWQyngKATvd12FU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230427002343-809b162d0e4f h1:nck6OTEVtxXoF9mDsvZRXaXjNkz03DuhNgrl462xOso= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230427002343-809b162d0e4f/go.mod h1:7HTbSZe+CIdmeqVyFMekwD5dFU8khWQyngKATvd12FU= github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a h1:awrPDf9LEFySxTLKYBMCiObelNx/cBuv/wzllvCCH3A= github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a/go.mod h1:HchJX9oKMXaT2xYFs0Ha/6Zs06mxLU8k6F1ODnrGkeQ= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= |