diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-04-29 15:29:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-29 15:29:39 +0100 |
commit | 64e94e9a6f0a138e7fe771f540b57988bc344b59 (patch) | |
tree | 9ef06d96a4c5f0bd59ea55dad8d7f74646898894 | |
parent | a308e61331f549ae0964f83dff88abc282033ed3 (diff) |
Join room support in federation sender (#989)
* Implement PerformJoinRequest
* Rename perform functions
* Check send join response
* Temporary wiring to test federation sender room joins
* Actually pass through the config
* Make sure membership content shows join
-rw-r--r-- | clientapi/routing/joinroom.go | 122 | ||||
-rw-r--r-- | clientapi/routing/routing.go | 3 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/main.go | 2 | ||||
-rw-r--r-- | cmd/dendrite-federation-sender-server/main.go | 5 | ||||
-rw-r--r-- | cmd/dendrite-monolith-server/main.go | 2 | ||||
-rw-r--r-- | cmd/dendritejs/main.go | 2 | ||||
-rw-r--r-- | federationsender/api/api.go | 4 | ||||
-rw-r--r-- | federationsender/api/perform.go | 10 | ||||
-rw-r--r-- | federationsender/federationsender.go | 9 | ||||
-rw-r--r-- | federationsender/producers/roomserver.go | 36 | ||||
-rw-r--r-- | federationsender/query/api.go | 30 | ||||
-rw-r--r-- | federationsender/query/perform.go | 99 | ||||
-rw-r--r-- | federationsender/query/perform/join.go | 70 | ||||
-rw-r--r-- | federationsender/query/query.go | 4 |
14 files changed, 264 insertions, 134 deletions
diff --git a/clientapi/routing/joinroom.go b/clientapi/routing/joinroom.go index d0dee7c2..f55d1b6a 100644 --- a/clientapi/routing/joinroom.go +++ b/clientapi/routing/joinroom.go @@ -27,12 +27,11 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" - "github.com/matrix-org/dendrite/roomserver/api" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API. @@ -46,6 +45,7 @@ func JoinRoomByIDOrAlias( producer *producers.RoomserverProducer, queryAPI roomserverAPI.RoomserverQueryAPI, aliasAPI roomserverAPI.RoomserverAliasAPI, + fsAPI federationSenderAPI.FederationSenderInternalAPI, keyRing gomatrixserverlib.KeyRing, accountDB accounts.Database, ) util.JSONResponse { @@ -79,7 +79,8 @@ func JoinRoomByIDOrAlias( content["avatar_url"] = profile.AvatarURL r := joinRoomReq{ - req, evTime, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing, + req, evTime, content, device.UserID, cfg, federation, producer, + queryAPI, aliasAPI, fsAPI, keyRing, } if strings.HasPrefix(roomIDOrAlias, "!") { @@ -107,6 +108,7 @@ type joinRoomReq struct { producer *producers.RoomserverProducer queryAPI roomserverAPI.RoomserverQueryAPI aliasAPI roomserverAPI.RoomserverAliasAPI + fsAPI federationSenderAPI.FederationSenderInternalAPI keyRing gomatrixserverlib.KeyRing } @@ -326,72 +328,16 @@ func (r joinRoomReq) joinRoomUsingServers( // server was invalid this returns an error. // Otherwise this returns a JSONResponse. func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) { - // Ask the room server for information about room versions. - var request api.QueryRoomVersionCapabilitiesRequest - var response api.QueryRoomVersionCapabilitiesResponse - if err := r.queryAPI.QueryRoomVersionCapabilities(r.req.Context(), &request, &response); err != nil { - return nil, err - } - var supportedVersions []gomatrixserverlib.RoomVersion - for version := range response.AvailableRoomVersions { - supportedVersions = append(supportedVersions, version) - } - respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID, supportedVersions) - if err != nil { - // TODO: Check if the user was not allowed to join the room. - return nil, fmt.Errorf("r.federation.MakeJoin: %w", err) - } - - // Set all the fields to be what they should be, this should be a no-op - // but it's possible that the remote server returned us something "odd" - err = r.writeToBuilder(&respMakeJoin.JoinEvent, roomID) - if err != nil { - return nil, fmt.Errorf("r.writeToBuilder: %w", err) - } - - if respMakeJoin.RoomVersion == "" { - respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1 - } - if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil { - return &util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.UnsupportedRoomVersion( - fmt.Sprintf("Room version '%s' is not supported", respMakeJoin.RoomVersion), - ), - }, nil - } - - event, err := respMakeJoin.JoinEvent.Build( - r.evTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, - r.cfg.Matrix.PrivateKey, respMakeJoin.RoomVersion, - ) - if err != nil { - return nil, fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) + fedJoinReq := federationSenderAPI.PerformJoinRequest{ + RoomID: roomID, + UserID: r.userID, + ServerName: server, } - - respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event, respMakeJoin.RoomVersion) - if err != nil { - return nil, fmt.Errorf("r.federation.SendJoin: %w", err) - } - - if err = r.checkSendJoinResponse(event, server, respMakeJoin, respSendJoin); err != nil { + fedJoinRes := federationSenderAPI.PerformJoinResponse{} + if err := r.fsAPI.PerformJoin(r.req.Context(), &fedJoinReq, &fedJoinRes); err != nil { return nil, err } - util.GetLogger(r.req.Context()).WithFields(logrus.Fields{ - "room_id": roomID, - "num_auth_events": len(respSendJoin.AuthEvents), - "num_state_events": len(respSendJoin.StateEvents), - }).Info("Room join signature and auth verification passed") - - if err = r.producer.SendEventWithState( - r.req.Context(), - respSendJoin.ToRespState(), - event.Headered(respMakeJoin.RoomVersion), - ); err != nil { - util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEventWithState") - } - return &util.JSONResponse{ Code: http.StatusOK, // TODO: Put the response struct somewhere common. @@ -400,49 +346,3 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib }{roomID}, }, nil } - -// checkSendJoinResponse checks that all of the signatures are correct -// and that the join is allowed by the supplied state. -func (r joinRoomReq) checkSendJoinResponse( - event gomatrixserverlib.Event, - server gomatrixserverlib.ServerName, - respMakeJoin gomatrixserverlib.RespMakeJoin, - respSendJoin gomatrixserverlib.RespSendJoin, -) error { - // A list of events that we have retried, if they were not included in - // the auth events supplied in the send_join. - retries := map[string]bool{} - -retryCheck: - // TODO: Can we expand Check here to return a list of missing auth - // events rather than failing one at a time? - if err := respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil { - switch e := err.(type) { - case gomatrixserverlib.MissingAuthEventError: - // Check that we haven't already retried for this event, prevents - // us from ending up in endless loops - if !retries[e.AuthEventID] { - // Ask the server that we're talking to right now for the event - tx, txerr := r.federation.GetEvent(r.req.Context(), server, e.AuthEventID) - if txerr != nil { - return fmt.Errorf("r.federation.GetEvent: %w", txerr) - } - // For each event returned, add it to the auth events. - for _, pdu := range tx.PDUs { - ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion) - if everr != nil { - return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr) - } - respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev) - } - // Mark the event as retried and then give the check another go. - retries[e.AuthEventID] = true - goto retryCheck - } - return fmt.Errorf("respSendJoin (after retries): %w", e) - default: - return fmt.Errorf("respSendJoin: %w", err) - } - } - return nil -} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 9ab22cbe..e62b5193 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -101,7 +101,8 @@ func Setup( return util.ErrorResponse(err) } return JoinRoomByIDOrAlias( - req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB, + req, device, vars["roomIDOrAlias"], cfg, federation, producer, + queryAPI, aliasAPI, federationSender, keyRing, accountDB, ) }), ).Methods(http.MethodPost, http.MethodOptions) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index b9fbfc53..a2a4675b 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -153,7 +153,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) + fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input, &keyRing) clientapi.SetupClientAPIComponent( &base.Base, deviceDB, accountDB, diff --git a/cmd/dendrite-federation-sender-server/main.go b/cmd/dendrite-federation-sender-server/main.go index 1593afaa..f8d43b99 100644 --- a/cmd/dendrite-federation-sender-server/main.go +++ b/cmd/dendrite-federation-sender-server/main.go @@ -16,6 +16,7 @@ package main import ( "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/federationsender" ) @@ -25,11 +26,13 @@ func main() { defer base.Close() // nolint: errcheck federation := base.CreateFederationClient() + keyDB := base.CreateKeyDB() + keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) _, input, query := base.CreateHTTPRoomserverAPIs() federationsender.SetupFederationSenderComponent( - base, federation, query, input, + base, federation, query, input, &keyRing, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index e806f6f3..f43f8b04 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -62,7 +62,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) + fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing) input.SetFederationSenderAPI(fsAPI) clientapi.SetupClientAPIComponent( diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 7665138e..1f2f20fb 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -128,7 +128,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) + fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing) input.SetFederationSenderAPI(fedSenderAPI) clientapi.SetupClientAPIComponent( diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 1340179e..10dc66da 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -25,13 +25,13 @@ type FederationSenderInternalAPI interface { response *QueryJoinedHostServerNamesInRoomResponse, ) error // Handle an instruction to make_join & send_join with a remote server. - PerformJoinRequest( + PerformJoin( ctx context.Context, request *PerformJoinRequest, response *PerformJoinResponse, ) error // Handle an instruction to make_leave & send_leave with a remote server. - PerformLeaveRequest( + PerformLeave( ctx context.Context, request *PerformLeaveRequest, response *PerformLeaveResponse, diff --git a/federationsender/api/perform.go b/federationsender/api/perform.go index 8c30ecbe..87736f29 100644 --- a/federationsender/api/perform.go +++ b/federationsender/api/perform.go @@ -4,6 +4,7 @@ import ( "context" commonHTTP "github.com/matrix-org/dendrite/common/http" + "github.com/matrix-org/gomatrixserverlib" "github.com/opentracing/opentracing-go" ) @@ -16,14 +17,17 @@ const ( ) type PerformJoinRequest struct { - RoomID string `json:"room_id"` + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + ServerName gomatrixserverlib.ServerName `json:"server_name"` + Content map[string]interface{} `json:"content"` } type PerformJoinResponse struct { } // Handle an instruction to make_join & send_join with a remote server. -func (h *httpFederationSenderInternalAPI) PerformJoinRequest( +func (h *httpFederationSenderInternalAPI) PerformJoin( ctx context.Context, request *PerformJoinRequest, response *PerformJoinResponse, @@ -43,7 +47,7 @@ type PerformLeaveResponse struct { } // Handle an instruction to make_leave & send_leave with a remote server. -func (h *httpFederationSenderInternalAPI) PerformLeaveRequest( +func (h *httpFederationSenderInternalAPI) PerformLeave( ctx context.Context, request *PerformLeaveRequest, response *PerformLeaveResponse, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 355775f8..aa9a7bc9 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -36,6 +36,7 @@ func SetupFederationSenderComponent( federation *gomatrixserverlib.FederationClient, rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsInputAPI roomserverAPI.RoomserverInputAPI, + keyRing *gomatrixserverlib.KeyRing, ) api.FederationSenderInternalAPI { federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) if err != nil { @@ -61,10 +62,10 @@ func SetupFederationSenderComponent( logrus.WithError(err).Panic("failed to start typing server consumer") } - queryAPI := query.FederationSenderInternalAPI{ - DB: federationSenderDB, - } + queryAPI := query.NewFederationSenderInternalAPI( + federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, + ) queryAPI.SetupHTTP(http.DefaultServeMux) - return &queryAPI + return queryAPI } diff --git a/federationsender/producers/roomserver.go b/federationsender/producers/roomserver.go index 0395f962..ff4cda5b 100644 --- a/federationsender/producers/roomserver.go +++ b/federationsender/producers/roomserver.go @@ -54,6 +54,42 @@ func (c *RoomserverProducer) SendInviteResponse( return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire}) } +// SendEventWithState writes an event with KindNew to the roomserver input log +// with the state at the event as KindOutlier before it. +func (c *RoomserverProducer) SendEventWithState( + ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, +) error { + outliers, err := state.Events() + if err != nil { + return err + } + + var ires []api.InputRoomEvent + for _, outlier := range outliers { + ires = append(ires, api.InputRoomEvent{ + Kind: api.KindOutlier, + Event: outlier.Headered(event.RoomVersion), + AuthEventIDs: outlier.AuthEventIDs(), + }) + } + + stateEventIDs := make([]string, len(state.StateEvents)) + for i := range state.StateEvents { + stateEventIDs[i] = state.StateEvents[i].EventID() + } + + ires = append(ires, api.InputRoomEvent{ + Kind: api.KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + HasState: true, + StateEventIDs: stateEventIDs, + }) + + _, err = c.SendInputRoomEvents(ctx, ires) + return err +} + // SendInputRoomEvents writes the given input room events to the roomserver input API. func (c *RoomserverProducer) SendInputRoomEvents( ctx context.Context, ires []api.InputRoomEvent, diff --git a/federationsender/query/api.go b/federationsender/query/api.go index e33bcc11..e92453f9 100644 --- a/federationsender/query/api.go +++ b/federationsender/query/api.go @@ -5,17 +5,37 @@ import ( "net/http" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/storage" - rsAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) // FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI type FederationSenderInternalAPI struct { api.FederationSenderInternalAPI - DB storage.Database - RoomserverInputAPI rsAPI.RoomserverInputAPI + db storage.Database + cfg *config.Dendrite + producer *producers.RoomserverProducer + federation *gomatrixserverlib.FederationClient + keyRing *gomatrixserverlib.KeyRing +} + +func NewFederationSenderInternalAPI( + db storage.Database, cfg *config.Dendrite, + producer *producers.RoomserverProducer, + federation *gomatrixserverlib.FederationClient, + keyRing *gomatrixserverlib.KeyRing, +) *FederationSenderInternalAPI { + return &FederationSenderInternalAPI{ + db: db, + cfg: cfg, + producer: producer, + federation: federation, + keyRing: keyRing, + } } // SetupHTTP adds the FederationSenderInternalAPI handlers to the http.ServeMux. @@ -55,7 +75,7 @@ func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) { if err := json.NewDecoder(req.Body).Decode(&request); err != nil { return util.MessageResponse(http.StatusBadRequest, err.Error()) } - if err := f.PerformJoinRequest(req.Context(), &request, &response); err != nil { + if err := f.PerformJoin(req.Context(), &request, &response); err != nil { return util.ErrorResponse(err) } return util.JSONResponse{Code: http.StatusOK, JSON: &response} @@ -68,7 +88,7 @@ func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) { if err := json.NewDecoder(req.Body).Decode(&request); err != nil { return util.MessageResponse(http.StatusBadRequest, err.Error()) } - if err := f.PerformLeaveRequest(req.Context(), &request, &response); err != nil { + if err := f.PerformLeave(req.Context(), &request, &response); err != nil { return util.ErrorResponse(err) } return util.JSONResponse{Code: http.StatusOK, JSON: &response} diff --git a/federationsender/query/perform.go b/federationsender/query/perform.go index 2486873c..d39fef5e 100644 --- a/federationsender/query/perform.go +++ b/federationsender/query/perform.go @@ -2,21 +2,116 @@ package query import ( "context" + "fmt" + "time" "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationsender/query/perform" + "github.com/matrix-org/dendrite/roomserver/version" + "github.com/matrix-org/gomatrixserverlib" ) // PerformJoinRequest implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformJoinRequest( +func (r *FederationSenderInternalAPI) PerformJoin( ctx context.Context, request *api.PerformJoinRequest, response *api.PerformJoinResponse, ) (err error) { + // Look up the supported room versions. + var supportedVersions []gomatrixserverlib.RoomVersion + for version := range version.SupportedRoomVersions() { + supportedVersions = append(supportedVersions, version) + } + + // Try to perform a make_join using the information supplied in the + // request. + respMakeJoin, err := r.federation.MakeJoin( + ctx, + request.ServerName, + request.RoomID, + request.UserID, + supportedVersions, + ) + if err != nil { + // TODO: Check if the user was not allowed to join the room. + return fmt.Errorf("r.federation.MakeJoin: %w", err) + } + + // Set all the fields to be what they should be, this should be a no-op + // but it's possible that the remote server returned us something "odd" + respMakeJoin.JoinEvent.Type = "m.room.member" + respMakeJoin.JoinEvent.Sender = request.UserID + respMakeJoin.JoinEvent.StateKey = &request.UserID + respMakeJoin.JoinEvent.RoomID = request.RoomID + respMakeJoin.JoinEvent.Redacts = "" + if request.Content == nil { + request.Content = map[string]interface{}{} + } + request.Content["membership"] = "join" + if err = respMakeJoin.JoinEvent.SetContent(request.Content); err != nil { + return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err) + } + if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil { + return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err) + } + + // Work out if we support the room version that has been supplied in + // the make_join response. + if respMakeJoin.RoomVersion == "" { + respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1 + } + if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil { + return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err) + } + + // Build the join event. + event, err := respMakeJoin.JoinEvent.Build( + time.Now(), + r.cfg.Matrix.ServerName, + r.cfg.Matrix.KeyID, + r.cfg.Matrix.PrivateKey, + respMakeJoin.RoomVersion, + ) + if err != nil { + return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) + } + + // Try to perform a send_join using the newly built event. + respSendJoin, err := r.federation.SendJoin( + ctx, + request.ServerName, + event, + respMakeJoin.RoomVersion, + ) + if err != nil { + return fmt.Errorf("r.federation.SendJoin: %w", err) + } + + // Check that the send_join response was valid. + joinCtx := perform.JoinContext(r.federation, r.keyRing) + if err = joinCtx.CheckSendJoinResponse( + ctx, event, request.ServerName, respMakeJoin, respSendJoin, + ); err != nil { + return fmt.Errorf("perform.JoinRequest.CheckSendJoinResponse: %w", err) + } + + // If we successfully performed a send_join above then the other + // server now thinks we're a part of the room. Send the newly + // returned state to the roomserver to update our local view. + if err = r.producer.SendEventWithState( + ctx, + respSendJoin.ToRespState(), + event.Headered(respMakeJoin.RoomVersion), + ); err != nil { + return fmt.Errorf("r.producer.SendEventWithState: %w", err) + } + + // Everything went to plan. return nil } // PerformLeaveRequest implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformLeaveRequest( +func (r *FederationSenderInternalAPI) PerformLeave( ctx context.Context, request *api.PerformLeaveRequest, response *api.PerformLeaveResponse, diff --git a/federationsender/query/perform/join.go b/federationsender/query/perform/join.go new file mode 100644 index 00000000..3c7ef076 --- /dev/null +++ b/federationsender/query/perform/join.go @@ -0,0 +1,70 @@ +package perform + +import ( + "context" + "fmt" + + "github.com/matrix-org/gomatrixserverlib" +) + +// This file contains helpers for the PerformJoin function. + +type joinContext struct { + federation *gomatrixserverlib.FederationClient + keyRing *gomatrixserverlib.KeyRing +} + +// Returns a new join context. +func JoinContext(f *gomatrixserverlib.FederationClient, k *gomatrixserverlib.KeyRing) *joinContext { + return &joinContext{ + federation: f, + keyRing: k, + } +} + +// checkSendJoinResponse checks that all of the signatures are correct +// and that the join is allowed by the supplied state. +func (r joinContext) CheckSendJoinResponse( + ctx context.Context, + event gomatrixserverlib.Event, + server gomatrixserverlib.ServerName, + respMakeJoin gomatrixserverlib.RespMakeJoin, + respSendJoin gomatrixserverlib.RespSendJoin, +) error { + // A list of events that we have retried, if they were not included in + // the auth events supplied in the send_join. + retries := map[string]bool{} + +retryCheck: + // TODO: Can we expand Check here to return a list of missing auth + // events rather than failing one at a time? + if err := respSendJoin.Check(ctx, r.keyRing, event); err != nil { + switch e := err.(type) { + case gomatrixserverlib.MissingAuthEventError: + // Check that we haven't already retried for this event, prevents + // us from ending up in endless loops + if !retries[e.AuthEventID] { + // Ask the server that we're talking to right now for the event + tx, txerr := r.federation.GetEvent(ctx, server, e.AuthEventID) + if txerr != nil { + return fmt.Errorf("r.federation.GetEvent: %w", txerr) + } + // For each event returned, add it to the auth events. + for _, pdu := range tx.PDUs { + ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion) + if everr != nil { + return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr) + } + respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev) + } + // Mark the event as retried and then give the check another go. + retries[e.AuthEventID] = true + goto retryCheck + } + return fmt.Errorf("respSendJoin (after retries): %w", e) + default: + return fmt.Errorf("respSendJoin: %w", err) + } + } + return nil +} diff --git a/federationsender/query/query.go b/federationsender/query/query.go index ec668204..004ad156 100644 --- a/federationsender/query/query.go +++ b/federationsender/query/query.go @@ -13,7 +13,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostsInRoom( request *api.QueryJoinedHostsInRoomRequest, response *api.QueryJoinedHostsInRoomResponse, ) (err error) { - response.JoinedHosts, err = f.DB.GetJoinedHosts(ctx, request.RoomID) + response.JoinedHosts, err = f.db.GetJoinedHosts(ctx, request.RoomID) return } @@ -23,7 +23,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom( request *api.QueryJoinedHostServerNamesInRoomRequest, response *api.QueryJoinedHostServerNamesInRoomResponse, ) (err error) { - joinedHosts, err := f.DB.GetJoinedHosts(ctx, request.RoomID) + joinedHosts, err := f.db.GetJoinedHosts(ctx, request.RoomID) if err != nil { return } |