diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-03-27 16:28:22 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-27 16:28:22 +0000 |
commit | 05e1ae8745725245ee3b85a588000b0d26bae96c (patch) | |
tree | a54f2dfbc751fca5cf9b02327825bc7e9c340b3c /federationapi | |
parent | 314da91f1dff5e4c3921b06180110e7a15b38f22 (diff) |
Further room version wiring (#936)
* Room version 2 by default, other wiring updates, update gomatrixserverlib
* Fix nil pointer exception
* Fix some more nil pointer exceptions hopefully
* Update gomatrixserverlib
* Send all room versions when joining, not just stable ones
* Remove room version cquery
* Get room version when getting events from the roomserver database
* Reset default back to room version 2
* Don't generate event IDs unless needed
* Revert "Remove room version cquery"
This reverts commit a170d5873360dd059614460acc8b21ab2cda9767.
* Query room version in federation API, client API as needed
* Improvements to make_join send_join dance
* Make room server producers use headered events
* Lint tweaks
* Update gomatrixserverlib
* Versioned SendJoin
* Query room version in syncapi backfill
* Handle transaction marshalling/unmarshalling within Dendrite
* Sorta fix federation (kinda)
* whoops commit federation API too
* Use NewEventFromTrustedJSON when getting events from the database
* Update gomatrixserverlib
* Strip headers on federationapi endpoints
* Fix bug in clientapi profile room version query
* Update gomatrixserverlib
* Return more useful error if room version query doesn't find the room
* Update gomatrixserverlib
* Update gomatrixserverlib
* Maybe fix federation
* Fix formatting directive
* Update sytest whitelist and blacklist
* Temporarily disable room versions 3 and 4 until gmsl is fixed
* Fix count of EDUs in logging
* Update gomatrixserverlib
* Update gomatrixserverlib
* Update gomatrixserverlib
* Rely on EventBuilder in gmsl to generate the event IDs for us
* Some review comments fixed
* Move function out of common and into gmsl
* Comment in federationsender destinationqueue
* Update gomatrixserverlib
Diffstat (limited to 'federationapi')
-rw-r--r-- | federationapi/federationapi.go | 2 | ||||
-rw-r--r-- | federationapi/routing/backfill.go | 8 | ||||
-rw-r--r-- | federationapi/routing/invite.go | 19 | ||||
-rw-r--r-- | federationapi/routing/join.go | 50 | ||||
-rw-r--r-- | federationapi/routing/leave.go | 27 | ||||
-rw-r--r-- | federationapi/routing/send.go | 68 | ||||
-rw-r--r-- | federationapi/routing/state.go | 12 | ||||
-rw-r--r-- | federationapi/routing/threepid.go | 41 |
8 files changed, 175 insertions, 52 deletions
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index ef57da88..90db95b3 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -42,7 +42,7 @@ func SetupFederationAPIComponent( asAPI appserviceAPI.AppServiceQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, ) { - roomserverProducer := producers.NewRoomserverProducer(inputAPI) + roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) routing.Setup( base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI, diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index a4bc3c67..72ce0c66 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -15,6 +15,7 @@ package routing import ( + "encoding/json" "net/http" "strconv" "time" @@ -91,9 +92,14 @@ func Backfill( } } + var eventJSONs []json.RawMessage + for _, e := range evs { + eventJSONs = append(eventJSONs, e.JSON()) + } + txn := gomatrixserverlib.Transaction{ Origin: cfg.Matrix.ServerName, - PDUs: evs, + PDUs: eventJSONs, OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } diff --git a/federationapi/routing/invite.go b/federationapi/routing/invite.go index 94610346..09c3734b 100644 --- a/federationapi/routing/invite.go +++ b/federationapi/routing/invite.go @@ -15,12 +15,13 @@ package routing import ( - "encoding/json" + "context" "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -35,10 +36,19 @@ func Invite( producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, ) util.JSONResponse { + // Look up the room version for the room. + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := producer.QueryAPI.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } // Decode the event JSON from the request. - var event gomatrixserverlib.Event - if err := json.Unmarshal(request.Content(), &event); err != nil { + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion) + if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), @@ -70,9 +80,10 @@ func Invite( } // Check that the event is signed by the server sending the request. + redacted := event.Redact() verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{ ServerName: event.Origin(), - Message: event.Redact().JSON(), + Message: redacted.JSON(), AtTS: event.OriginServerTS(), }} verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests) diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 7d48c86d..a39ff639 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -15,7 +15,6 @@ package routing import ( - "encoding/json" "net/http" "time" @@ -36,6 +35,15 @@ func MakeJoin( query api.RoomserverQueryAPI, roomID, userID string, ) util.JSONResponse { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := query.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { return util.JSONResponse{ @@ -63,7 +71,9 @@ func MakeJoin( return jsonerror.InternalServerError() } - var queryRes api.QueryLatestEventsAndStateResponse + queryRes := api.QueryLatestEventsAndStateResponse{ + RoomVersion: verRes.RoomVersion, + } event, err := common.BuildEvent(httpReq.Context(), &builder, cfg, time.Now(), query, &queryRes) if err == common.ErrRoomNoExists { return util.JSONResponse{ @@ -80,6 +90,7 @@ func MakeJoin( for i := range queryRes.StateEvents { stateEvents[i] = &queryRes.StateEvents[i].Event } + provider := gomatrixserverlib.NewAuthEvents(stateEvents) if err = gomatrixserverlib.Allowed(*event, &provider); err != nil { return util.JSONResponse{ @@ -90,7 +101,10 @@ func MakeJoin( return util.JSONResponse{ Code: http.StatusOK, - JSON: map[string]interface{}{"event": builder}, + JSON: map[string]interface{}{ + "event": builder, + "room_version": verRes.RoomVersion, + }, } } @@ -104,8 +118,18 @@ func SendJoin( keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { - var event gomatrixserverlib.Event - if err := json.Unmarshal(request.Content(), &event); err != nil { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := query.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryRoomVersionForRoom failed") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion) + if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), @@ -137,9 +161,10 @@ func SendJoin( } // Check that the event is signed by the server sending the request. + redacted := event.Redact() verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{ ServerName: event.Origin(), - Message: event.Redact().JSON(), + Message: redacted.JSON(), AtTS: event.OriginServerTS(), }} verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests) @@ -150,7 +175,7 @@ func SendJoin( if verifyResults[0].Error != nil { return util.JSONResponse{ Code: http.StatusForbidden, - JSON: jsonerror.Forbidden("The join must be signed by the server it originated on"), + JSON: jsonerror.Forbidden("Signature check failed: " + verifyResults[0].Error.Error()), } } @@ -178,7 +203,12 @@ func SendJoin( // We are responsible for notifying other servers that the user has joined // the room, so set SendAsServer to cfg.Matrix.ServerName _, err = producer.SendEvents( - httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil, + httpReq.Context(), + []gomatrixserverlib.HeaderedEvent{ + event.Headered(stateAndAuthChainResponse.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, ) if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") @@ -188,8 +218,8 @@ func SendJoin( return util.JSONResponse{ Code: http.StatusOK, JSON: map[string]interface{}{ - "state": stateAndAuthChainResponse.StateEvents, - "auth_chain": stateAndAuthChainResponse.AuthChainEvents, + "state": gomatrixserverlib.UnwrapEventHeaders(stateAndAuthChainResponse.StateEvents), + "auth_chain": gomatrixserverlib.UnwrapEventHeaders(stateAndAuthChainResponse.AuthChainEvents), }, } } diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index 3eceb6f2..e0a14263 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -13,7 +13,6 @@ package routing import ( - "encoding/json" "net/http" "time" @@ -101,8 +100,18 @@ func SendLeave( keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { - var event gomatrixserverlib.Event - if err := json.Unmarshal(request.Content(), &event); err != nil { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := producer.QueryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + + // Decode the event JSON from the request. + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion) + if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), @@ -134,9 +143,10 @@ func SendLeave( } // Check that the event is signed by the server sending the request. + redacted := event.Redact() verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{ ServerName: event.Origin(), - Message: event.Redact().JSON(), + Message: redacted.JSON(), AtTS: event.OriginServerTS(), }} verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests) @@ -166,7 +176,14 @@ func SendLeave( // Send the events to the room server. // We are responsible for notifying other servers that the user has left // the room, so set SendAsServer to cfg.Matrix.ServerName - _, err = producer.SendEvents(httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil) + _, err = producer.SendEvents( + httpReq.Context(), + []gomatrixserverlib.HeaderedEvent{ + event.Headered(verRes.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, + ) if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index d3e060ac..4c92c7e5 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -39,7 +39,6 @@ func Send( keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { - t := txnReq{ context: httpReq.Context(), query: query, @@ -47,17 +46,26 @@ func Send( keys: keys, federation: federation, } - if err := json.Unmarshal(request.Content(), &t); err != nil { + + var txnEvents struct { + PDUs []json.RawMessage `json:"pdus"` + EDUs []json.RawMessage `json:"edus"` + } + + if err := json.Unmarshal(request.Content(), &txnEvents); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), } } + t.PDUs = txnEvents.PDUs t.Origin = request.Origin() t.TransactionID = txnID t.Destination = cfg.Matrix.ServerName + util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs)) + resp, err := t.processTransaction() if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed") @@ -80,15 +88,37 @@ type txnReq struct { } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { - // Check the event signatures - if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, t.PDUs, t.keys); err != nil { - return nil, err + var pdus []gomatrixserverlib.HeaderedEvent + for _, pdu := range t.PDUs { + var header struct { + RoomID string `json:"room_id"` + } + if err := json.Unmarshal(pdu, &header); err != nil { + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event") + return nil, err + } + verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil { + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID) + return nil, err + } + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion) + if err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) + return nil, err + } + if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) + return nil, err + } + pdus = append(pdus, event.Headered(verRes.RoomVersion)) } // Process the events. results := map[string]gomatrixserverlib.PDUResult{} - for _, e := range t.PDUs { - err := t.processEvent(e) + for _, e := range pdus { + err := t.processEvent(e.Unwrap()) if err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -123,7 +153,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { } // TODO: Process the EDUs. - + util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID) return &gomatrixserverlib.RespSend{PDUs: results}, nil } @@ -159,13 +189,13 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { } if !stateResp.PrevEventsExist { - return t.processEventWithMissingState(e) + return t.processEventWithMissingState(e, stateResp.RoomVersion) } // Check that the event is allowed by the state at the event. var events []gomatrixserverlib.Event for _, headeredEvent := range stateResp.StateEvents { - events = append(events, headeredEvent.Event) + events = append(events, headeredEvent.Unwrap()) } if err := checkAllowedByState(e, events); err != nil { return err @@ -175,7 +205,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { // TODO: Check that the event is allowed by its auth_events. // pass the event to the roomserver - _, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil) + _, err := t.producer.SendEvents( + t.context, + []gomatrixserverlib.HeaderedEvent{ + e.Headered(stateResp.RoomVersion), + }, + api.DoNotSendToOtherServers, + nil, + ) return err } @@ -190,7 +227,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver return gomatrixserverlib.Allowed(e, &authUsingState) } -func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -207,7 +244,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { // need to fallback to /state. // TODO: Attempt to fill in the gap using /get_missing_events // TODO: Attempt to fetch the state using /state_ids and /events - state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID()) + state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) if err != nil { return err } @@ -225,7 +262,7 @@ retryAllowedState: if s.EventID() != missing.AuthEventID { continue } - err = t.processEventWithMissingState(s) + err = t.processEventWithMissingState(s, roomVersion) // If there was no error retrieving the event from federation then // we assume that it succeeded, so retry the original state check if err == nil { @@ -236,6 +273,7 @@ retryAllowedState: } return err } + // pass the event along with the state to the roomserver - return t.producer.SendEventWithState(t.context, state, e) + return t.producer.SendEventWithState(t.context, state, e.Headered(roomVersion)) } diff --git a/federationapi/routing/state.go b/federationapi/routing/state.go index 86a1e08d..6a47882b 100644 --- a/federationapi/routing/state.go +++ b/federationapi/routing/state.go @@ -129,17 +129,9 @@ func getState( return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } - var stateEvents, authEvents []gomatrixserverlib.Event - for _, headeredEvent := range response.StateEvents { - stateEvents = append(stateEvents, headeredEvent.Event) - } - for _, headeredEvent := range response.AuthChainEvents { - authEvents = append(authEvents, headeredEvent.Event) - } - return &gomatrixserverlib.RespState{ - StateEvents: stateEvents, - AuthEvents: authEvents, + StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), + AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), }, nil } diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 18ebc07e..da717473 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "net/http" "time" @@ -28,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -68,8 +68,17 @@ func CreateInvitesFrom3PIDInvites( return *reqErr } - evs := []gomatrixserverlib.Event{} + evs := []gomatrixserverlib.HeaderedEvent{} for _, inv := range body.Invites { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + event, err := createInviteFrom3PIDInvite( req.Context(), queryAPI, asAPI, cfg, inv, federation, accountDB, ) @@ -78,7 +87,7 @@ func CreateInvitesFrom3PIDInvites( return jsonerror.InternalServerError() } if event != nil { - evs = append(evs, *event) + evs = append(evs, (*event).Headered(verRes.RoomVersion)) } } @@ -137,6 +146,15 @@ func ExchangeThirdPartyInvite( } } + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err = queryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + // Auth and build the event from what the remote server sent us event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg) if err == errNotInRoom { @@ -159,7 +177,12 @@ func ExchangeThirdPartyInvite( // Send the event to the roomserver if _, err = producer.SendEvents( - httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil, + httpReq.Context(), + []gomatrixserverlib.HeaderedEvent{ + signedEvent.Event.Headered(verRes.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() @@ -181,6 +204,12 @@ func createInviteFrom3PIDInvite( inv invite, federation *gomatrixserverlib.FederationClient, accountDB accounts.Database, ) (*gomatrixserverlib.Event, error) { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { + return nil, err + } + _, server, err := gomatrixserverlib.SplitID('@', inv.MXID) if err != nil { return nil, err @@ -280,9 +309,9 @@ func buildMembershipEvent( } builder.AuthEvents = refs - eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName) event, err := builder.Build( - eventID, time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, + time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, + cfg.Matrix.PrivateKey, queryRes.RoomVersion, ) return &event, err |