aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-04-29 15:29:39 +0100
committerGitHub <noreply@github.com>2020-04-29 15:29:39 +0100
commit64e94e9a6f0a138e7fe771f540b57988bc344b59 (patch)
tree9ef06d96a4c5f0bd59ea55dad8d7f74646898894
parenta308e61331f549ae0964f83dff88abc282033ed3 (diff)
Join room support in federation sender (#989)
* Implement PerformJoinRequest * Rename perform functions * Check send join response * Temporary wiring to test federation sender room joins * Actually pass through the config * Make sure membership content shows join
-rw-r--r--clientapi/routing/joinroom.go122
-rw-r--r--clientapi/routing/routing.go3
-rw-r--r--cmd/dendrite-demo-libp2p/main.go2
-rw-r--r--cmd/dendrite-federation-sender-server/main.go5
-rw-r--r--cmd/dendrite-monolith-server/main.go2
-rw-r--r--cmd/dendritejs/main.go2
-rw-r--r--federationsender/api/api.go4
-rw-r--r--federationsender/api/perform.go10
-rw-r--r--federationsender/federationsender.go9
-rw-r--r--federationsender/producers/roomserver.go36
-rw-r--r--federationsender/query/api.go30
-rw-r--r--federationsender/query/perform.go99
-rw-r--r--federationsender/query/perform/join.go70
-rw-r--r--federationsender/query/query.go4
14 files changed, 264 insertions, 134 deletions
diff --git a/clientapi/routing/joinroom.go b/clientapi/routing/joinroom.go
index d0dee7c2..f55d1b6a 100644
--- a/clientapi/routing/joinroom.go
+++ b/clientapi/routing/joinroom.go
@@ -27,12 +27,11 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
- "github.com/matrix-org/dendrite/roomserver/api"
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- "github.com/sirupsen/logrus"
)
// JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API.
@@ -46,6 +45,7 @@ func JoinRoomByIDOrAlias(
producer *producers.RoomserverProducer,
queryAPI roomserverAPI.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
+ fsAPI federationSenderAPI.FederationSenderInternalAPI,
keyRing gomatrixserverlib.KeyRing,
accountDB accounts.Database,
) util.JSONResponse {
@@ -79,7 +79,8 @@ func JoinRoomByIDOrAlias(
content["avatar_url"] = profile.AvatarURL
r := joinRoomReq{
- req, evTime, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing,
+ req, evTime, content, device.UserID, cfg, federation, producer,
+ queryAPI, aliasAPI, fsAPI, keyRing,
}
if strings.HasPrefix(roomIDOrAlias, "!") {
@@ -107,6 +108,7 @@ type joinRoomReq struct {
producer *producers.RoomserverProducer
queryAPI roomserverAPI.RoomserverQueryAPI
aliasAPI roomserverAPI.RoomserverAliasAPI
+ fsAPI federationSenderAPI.FederationSenderInternalAPI
keyRing gomatrixserverlib.KeyRing
}
@@ -326,72 +328,16 @@ func (r joinRoomReq) joinRoomUsingServers(
// server was invalid this returns an error.
// Otherwise this returns a JSONResponse.
func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) {
- // Ask the room server for information about room versions.
- var request api.QueryRoomVersionCapabilitiesRequest
- var response api.QueryRoomVersionCapabilitiesResponse
- if err := r.queryAPI.QueryRoomVersionCapabilities(r.req.Context(), &request, &response); err != nil {
- return nil, err
- }
- var supportedVersions []gomatrixserverlib.RoomVersion
- for version := range response.AvailableRoomVersions {
- supportedVersions = append(supportedVersions, version)
- }
- respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID, supportedVersions)
- if err != nil {
- // TODO: Check if the user was not allowed to join the room.
- return nil, fmt.Errorf("r.federation.MakeJoin: %w", err)
- }
-
- // Set all the fields to be what they should be, this should be a no-op
- // but it's possible that the remote server returned us something "odd"
- err = r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
- if err != nil {
- return nil, fmt.Errorf("r.writeToBuilder: %w", err)
- }
-
- if respMakeJoin.RoomVersion == "" {
- respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
- }
- if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
- return &util.JSONResponse{
- Code: http.StatusBadRequest,
- JSON: jsonerror.UnsupportedRoomVersion(
- fmt.Sprintf("Room version '%s' is not supported", respMakeJoin.RoomVersion),
- ),
- }, nil
- }
-
- event, err := respMakeJoin.JoinEvent.Build(
- r.evTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID,
- r.cfg.Matrix.PrivateKey, respMakeJoin.RoomVersion,
- )
- if err != nil {
- return nil, fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
+ fedJoinReq := federationSenderAPI.PerformJoinRequest{
+ RoomID: roomID,
+ UserID: r.userID,
+ ServerName: server,
}
-
- respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event, respMakeJoin.RoomVersion)
- if err != nil {
- return nil, fmt.Errorf("r.federation.SendJoin: %w", err)
- }
-
- if err = r.checkSendJoinResponse(event, server, respMakeJoin, respSendJoin); err != nil {
+ fedJoinRes := federationSenderAPI.PerformJoinResponse{}
+ if err := r.fsAPI.PerformJoin(r.req.Context(), &fedJoinReq, &fedJoinRes); err != nil {
return nil, err
}
- util.GetLogger(r.req.Context()).WithFields(logrus.Fields{
- "room_id": roomID,
- "num_auth_events": len(respSendJoin.AuthEvents),
- "num_state_events": len(respSendJoin.StateEvents),
- }).Info("Room join signature and auth verification passed")
-
- if err = r.producer.SendEventWithState(
- r.req.Context(),
- respSendJoin.ToRespState(),
- event.Headered(respMakeJoin.RoomVersion),
- ); err != nil {
- util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEventWithState")
- }
-
return &util.JSONResponse{
Code: http.StatusOK,
// TODO: Put the response struct somewhere common.
@@ -400,49 +346,3 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
}{roomID},
}, nil
}
-
-// checkSendJoinResponse checks that all of the signatures are correct
-// and that the join is allowed by the supplied state.
-func (r joinRoomReq) checkSendJoinResponse(
- event gomatrixserverlib.Event,
- server gomatrixserverlib.ServerName,
- respMakeJoin gomatrixserverlib.RespMakeJoin,
- respSendJoin gomatrixserverlib.RespSendJoin,
-) error {
- // A list of events that we have retried, if they were not included in
- // the auth events supplied in the send_join.
- retries := map[string]bool{}
-
-retryCheck:
- // TODO: Can we expand Check here to return a list of missing auth
- // events rather than failing one at a time?
- if err := respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil {
- switch e := err.(type) {
- case gomatrixserverlib.MissingAuthEventError:
- // Check that we haven't already retried for this event, prevents
- // us from ending up in endless loops
- if !retries[e.AuthEventID] {
- // Ask the server that we're talking to right now for the event
- tx, txerr := r.federation.GetEvent(r.req.Context(), server, e.AuthEventID)
- if txerr != nil {
- return fmt.Errorf("r.federation.GetEvent: %w", txerr)
- }
- // For each event returned, add it to the auth events.
- for _, pdu := range tx.PDUs {
- ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
- if everr != nil {
- return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
- }
- respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
- }
- // Mark the event as retried and then give the check another go.
- retries[e.AuthEventID] = true
- goto retryCheck
- }
- return fmt.Errorf("respSendJoin (after retries): %w", e)
- default:
- return fmt.Errorf("respSendJoin: %w", err)
- }
- }
- return nil
-}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index 9ab22cbe..e62b5193 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -101,7 +101,8 @@ func Setup(
return util.ErrorResponse(err)
}
return JoinRoomByIDOrAlias(
- req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB,
+ req, device, vars["roomIDOrAlias"], cfg, federation, producer,
+ queryAPI, aliasAPI, federationSender, keyRing, accountDB,
)
}),
).Methods(http.MethodPost, http.MethodOptions)
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index b9fbfc53..a2a4675b 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -153,7 +153,7 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
- fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input)
+ fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input, &keyRing)
clientapi.SetupClientAPIComponent(
&base.Base, deviceDB, accountDB,
diff --git a/cmd/dendrite-federation-sender-server/main.go b/cmd/dendrite-federation-sender-server/main.go
index 1593afaa..f8d43b99 100644
--- a/cmd/dendrite-federation-sender-server/main.go
+++ b/cmd/dendrite-federation-sender-server/main.go
@@ -16,6 +16,7 @@ package main
import (
"github.com/matrix-org/dendrite/common/basecomponent"
+ "github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/federationsender"
)
@@ -25,11 +26,13 @@ func main() {
defer base.Close() // nolint: errcheck
federation := base.CreateFederationClient()
+ keyDB := base.CreateKeyDB()
+ keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
_, input, query := base.CreateHTTPRoomserverAPIs()
federationsender.SetupFederationSenderComponent(
- base, federation, query, input,
+ base, federation, query, input, &keyRing,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender))
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index e806f6f3..f43f8b04 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -62,7 +62,7 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
- fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
+ fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing)
input.SetFederationSenderAPI(fsAPI)
clientapi.SetupClientAPIComponent(
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 7665138e..1f2f20fb 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -128,7 +128,7 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
- fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
+ fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing)
input.SetFederationSenderAPI(fedSenderAPI)
clientapi.SetupClientAPIComponent(
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index 1340179e..10dc66da 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -25,13 +25,13 @@ type FederationSenderInternalAPI interface {
response *QueryJoinedHostServerNamesInRoomResponse,
) error
// Handle an instruction to make_join & send_join with a remote server.
- PerformJoinRequest(
+ PerformJoin(
ctx context.Context,
request *PerformJoinRequest,
response *PerformJoinResponse,
) error
// Handle an instruction to make_leave & send_leave with a remote server.
- PerformLeaveRequest(
+ PerformLeave(
ctx context.Context,
request *PerformLeaveRequest,
response *PerformLeaveResponse,
diff --git a/federationsender/api/perform.go b/federationsender/api/perform.go
index 8c30ecbe..87736f29 100644
--- a/federationsender/api/perform.go
+++ b/federationsender/api/perform.go
@@ -4,6 +4,7 @@ import (
"context"
commonHTTP "github.com/matrix-org/dendrite/common/http"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
@@ -16,14 +17,17 @@ const (
)
type PerformJoinRequest struct {
- RoomID string `json:"room_id"`
+ RoomID string `json:"room_id"`
+ UserID string `json:"user_id"`
+ ServerName gomatrixserverlib.ServerName `json:"server_name"`
+ Content map[string]interface{} `json:"content"`
}
type PerformJoinResponse struct {
}
// Handle an instruction to make_join & send_join with a remote server.
-func (h *httpFederationSenderInternalAPI) PerformJoinRequest(
+func (h *httpFederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *PerformJoinRequest,
response *PerformJoinResponse,
@@ -43,7 +47,7 @@ type PerformLeaveResponse struct {
}
// Handle an instruction to make_leave & send_leave with a remote server.
-func (h *httpFederationSenderInternalAPI) PerformLeaveRequest(
+func (h *httpFederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *PerformLeaveRequest,
response *PerformLeaveResponse,
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index 355775f8..aa9a7bc9 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -36,6 +36,7 @@ func SetupFederationSenderComponent(
federation *gomatrixserverlib.FederationClient,
rsQueryAPI roomserverAPI.RoomserverQueryAPI,
rsInputAPI roomserverAPI.RoomserverInputAPI,
+ keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
if err != nil {
@@ -61,10 +62,10 @@ func SetupFederationSenderComponent(
logrus.WithError(err).Panic("failed to start typing server consumer")
}
- queryAPI := query.FederationSenderInternalAPI{
- DB: federationSenderDB,
- }
+ queryAPI := query.NewFederationSenderInternalAPI(
+ federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing,
+ )
queryAPI.SetupHTTP(http.DefaultServeMux)
- return &queryAPI
+ return queryAPI
}
diff --git a/federationsender/producers/roomserver.go b/federationsender/producers/roomserver.go
index 0395f962..ff4cda5b 100644
--- a/federationsender/producers/roomserver.go
+++ b/federationsender/producers/roomserver.go
@@ -54,6 +54,42 @@ func (c *RoomserverProducer) SendInviteResponse(
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
}
+// SendEventWithState writes an event with KindNew to the roomserver input log
+// with the state at the event as KindOutlier before it.
+func (c *RoomserverProducer) SendEventWithState(
+ ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
+) error {
+ outliers, err := state.Events()
+ if err != nil {
+ return err
+ }
+
+ var ires []api.InputRoomEvent
+ for _, outlier := range outliers {
+ ires = append(ires, api.InputRoomEvent{
+ Kind: api.KindOutlier,
+ Event: outlier.Headered(event.RoomVersion),
+ AuthEventIDs: outlier.AuthEventIDs(),
+ })
+ }
+
+ stateEventIDs := make([]string, len(state.StateEvents))
+ for i := range state.StateEvents {
+ stateEventIDs[i] = state.StateEvents[i].EventID()
+ }
+
+ ires = append(ires, api.InputRoomEvent{
+ Kind: api.KindNew,
+ Event: event,
+ AuthEventIDs: event.AuthEventIDs(),
+ HasState: true,
+ StateEventIDs: stateEventIDs,
+ })
+
+ _, err = c.SendInputRoomEvents(ctx, ires)
+ return err
+}
+
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
diff --git a/federationsender/query/api.go b/federationsender/query/api.go
index e33bcc11..e92453f9 100644
--- a/federationsender/query/api.go
+++ b/federationsender/query/api.go
@@ -5,17 +5,37 @@ import (
"net/http"
"github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/storage"
- rsAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
api.FederationSenderInternalAPI
- DB storage.Database
- RoomserverInputAPI rsAPI.RoomserverInputAPI
+ db storage.Database
+ cfg *config.Dendrite
+ producer *producers.RoomserverProducer
+ federation *gomatrixserverlib.FederationClient
+ keyRing *gomatrixserverlib.KeyRing
+}
+
+func NewFederationSenderInternalAPI(
+ db storage.Database, cfg *config.Dendrite,
+ producer *producers.RoomserverProducer,
+ federation *gomatrixserverlib.FederationClient,
+ keyRing *gomatrixserverlib.KeyRing,
+) *FederationSenderInternalAPI {
+ return &FederationSenderInternalAPI{
+ db: db,
+ cfg: cfg,
+ producer: producer,
+ federation: federation,
+ keyRing: keyRing,
+ }
}
// SetupHTTP adds the FederationSenderInternalAPI handlers to the http.ServeMux.
@@ -55,7 +75,7 @@ func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
- if err := f.PerformJoinRequest(req.Context(), &request, &response); err != nil {
+ if err := f.PerformJoin(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
@@ -68,7 +88,7 @@ func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
- if err := f.PerformLeaveRequest(req.Context(), &request, &response); err != nil {
+ if err := f.PerformLeave(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
diff --git a/federationsender/query/perform.go b/federationsender/query/perform.go
index 2486873c..d39fef5e 100644
--- a/federationsender/query/perform.go
+++ b/federationsender/query/perform.go
@@ -2,21 +2,116 @@ package query
import (
"context"
+ "fmt"
+ "time"
"github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/federationsender/query/perform"
+ "github.com/matrix-org/dendrite/roomserver/version"
+ "github.com/matrix-org/gomatrixserverlib"
)
// PerformJoinRequest implements api.FederationSenderInternalAPI
-func (r *FederationSenderInternalAPI) PerformJoinRequest(
+func (r *FederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *api.PerformJoinRequest,
response *api.PerformJoinResponse,
) (err error) {
+ // Look up the supported room versions.
+ var supportedVersions []gomatrixserverlib.RoomVersion
+ for version := range version.SupportedRoomVersions() {
+ supportedVersions = append(supportedVersions, version)
+ }
+
+ // Try to perform a make_join using the information supplied in the
+ // request.
+ respMakeJoin, err := r.federation.MakeJoin(
+ ctx,
+ request.ServerName,
+ request.RoomID,
+ request.UserID,
+ supportedVersions,
+ )
+ if err != nil {
+ // TODO: Check if the user was not allowed to join the room.
+ return fmt.Errorf("r.federation.MakeJoin: %w", err)
+ }
+
+ // Set all the fields to be what they should be, this should be a no-op
+ // but it's possible that the remote server returned us something "odd"
+ respMakeJoin.JoinEvent.Type = "m.room.member"
+ respMakeJoin.JoinEvent.Sender = request.UserID
+ respMakeJoin.JoinEvent.StateKey = &request.UserID
+ respMakeJoin.JoinEvent.RoomID = request.RoomID
+ respMakeJoin.JoinEvent.Redacts = ""
+ if request.Content == nil {
+ request.Content = map[string]interface{}{}
+ }
+ request.Content["membership"] = "join"
+ if err = respMakeJoin.JoinEvent.SetContent(request.Content); err != nil {
+ return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
+ }
+ if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
+ return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
+ }
+
+ // Work out if we support the room version that has been supplied in
+ // the make_join response.
+ if respMakeJoin.RoomVersion == "" {
+ respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
+ }
+ if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
+ return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
+ }
+
+ // Build the join event.
+ event, err := respMakeJoin.JoinEvent.Build(
+ time.Now(),
+ r.cfg.Matrix.ServerName,
+ r.cfg.Matrix.KeyID,
+ r.cfg.Matrix.PrivateKey,
+ respMakeJoin.RoomVersion,
+ )
+ if err != nil {
+ return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
+ }
+
+ // Try to perform a send_join using the newly built event.
+ respSendJoin, err := r.federation.SendJoin(
+ ctx,
+ request.ServerName,
+ event,
+ respMakeJoin.RoomVersion,
+ )
+ if err != nil {
+ return fmt.Errorf("r.federation.SendJoin: %w", err)
+ }
+
+ // Check that the send_join response was valid.
+ joinCtx := perform.JoinContext(r.federation, r.keyRing)
+ if err = joinCtx.CheckSendJoinResponse(
+ ctx, event, request.ServerName, respMakeJoin, respSendJoin,
+ ); err != nil {
+ return fmt.Errorf("perform.JoinRequest.CheckSendJoinResponse: %w", err)
+ }
+
+ // If we successfully performed a send_join above then the other
+ // server now thinks we're a part of the room. Send the newly
+ // returned state to the roomserver to update our local view.
+ if err = r.producer.SendEventWithState(
+ ctx,
+ respSendJoin.ToRespState(),
+ event.Headered(respMakeJoin.RoomVersion),
+ ); err != nil {
+ return fmt.Errorf("r.producer.SendEventWithState: %w", err)
+ }
+
+ // Everything went to plan.
return nil
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI
-func (r *FederationSenderInternalAPI) PerformLeaveRequest(
+func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
response *api.PerformLeaveResponse,
diff --git a/federationsender/query/perform/join.go b/federationsender/query/perform/join.go
new file mode 100644
index 00000000..3c7ef076
--- /dev/null
+++ b/federationsender/query/perform/join.go
@@ -0,0 +1,70 @@
+package perform
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// This file contains helpers for the PerformJoin function.
+
+type joinContext struct {
+ federation *gomatrixserverlib.FederationClient
+ keyRing *gomatrixserverlib.KeyRing
+}
+
+// Returns a new join context.
+func JoinContext(f *gomatrixserverlib.FederationClient, k *gomatrixserverlib.KeyRing) *joinContext {
+ return &joinContext{
+ federation: f,
+ keyRing: k,
+ }
+}
+
+// checkSendJoinResponse checks that all of the signatures are correct
+// and that the join is allowed by the supplied state.
+func (r joinContext) CheckSendJoinResponse(
+ ctx context.Context,
+ event gomatrixserverlib.Event,
+ server gomatrixserverlib.ServerName,
+ respMakeJoin gomatrixserverlib.RespMakeJoin,
+ respSendJoin gomatrixserverlib.RespSendJoin,
+) error {
+ // A list of events that we have retried, if they were not included in
+ // the auth events supplied in the send_join.
+ retries := map[string]bool{}
+
+retryCheck:
+ // TODO: Can we expand Check here to return a list of missing auth
+ // events rather than failing one at a time?
+ if err := respSendJoin.Check(ctx, r.keyRing, event); err != nil {
+ switch e := err.(type) {
+ case gomatrixserverlib.MissingAuthEventError:
+ // Check that we haven't already retried for this event, prevents
+ // us from ending up in endless loops
+ if !retries[e.AuthEventID] {
+ // Ask the server that we're talking to right now for the event
+ tx, txerr := r.federation.GetEvent(ctx, server, e.AuthEventID)
+ if txerr != nil {
+ return fmt.Errorf("r.federation.GetEvent: %w", txerr)
+ }
+ // For each event returned, add it to the auth events.
+ for _, pdu := range tx.PDUs {
+ ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
+ if everr != nil {
+ return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
+ }
+ respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
+ }
+ // Mark the event as retried and then give the check another go.
+ retries[e.AuthEventID] = true
+ goto retryCheck
+ }
+ return fmt.Errorf("respSendJoin (after retries): %w", e)
+ default:
+ return fmt.Errorf("respSendJoin: %w", err)
+ }
+ }
+ return nil
+}
diff --git a/federationsender/query/query.go b/federationsender/query/query.go
index ec668204..004ad156 100644
--- a/federationsender/query/query.go
+++ b/federationsender/query/query.go
@@ -13,7 +13,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostsInRoom(
request *api.QueryJoinedHostsInRoomRequest,
response *api.QueryJoinedHostsInRoomResponse,
) (err error) {
- response.JoinedHosts, err = f.DB.GetJoinedHosts(ctx, request.RoomID)
+ response.JoinedHosts, err = f.db.GetJoinedHosts(ctx, request.RoomID)
return
}
@@ -23,7 +23,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
request *api.QueryJoinedHostServerNamesInRoomRequest,
response *api.QueryJoinedHostServerNamesInRoomResponse,
) (err error) {
- joinedHosts, err := f.DB.GetJoinedHosts(ctx, request.RoomID)
+ joinedHosts, err := f.db.GetJoinedHosts(ctx, request.RoomID)
if err != nil {
return
}