diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-08-17 11:40:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-17 11:40:49 +0100 |
commit | 6cb1a65809ccfbeaede6ff164c281ba0ddf90ab7 (patch) | |
tree | 56406ef5a5d4335d03e7805c19fa35aad805871e /federationsender | |
parent | 6820b3e024474e2c44f7a0632261d2dc86257d77 (diff) |
Synchronous invites (#1273)
* Refactor invites to be synchronous
* Fix synchronous invites
* Fix client API return type for send invite error
* Linter
* Restore PerformError on rsAPI.PerformInvite
* Update sytest-whitelist
* Don't override PerformError with normal errors
* Fix error passing
* Un-whitelist a couple of tests
* Update sytest-whitelist
* Try to handle multiple invite rejections better
* nolint
* Update gomatrixserverlib
* Fix /v1/invite test
* Remove replace from go.mod
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/api/api.go | 16 | ||||
-rw-r--r-- | federationsender/consumers/roomserver.go | 62 | ||||
-rw-r--r-- | federationsender/internal/perform.go | 37 | ||||
-rw-r--r-- | federationsender/inthttp/client.go | 14 | ||||
-rw-r--r-- | federationsender/inthttp/server.go | 15 | ||||
-rw-r--r-- | federationsender/queue/destinationqueue.go | 145 | ||||
-rw-r--r-- | federationsender/queue/queue.go | 46 |
7 files changed, 94 insertions, 241 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go index b87af0eb..9f9c2645 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -36,6 +36,12 @@ type FederationSenderInternalAPI interface { request *PerformLeaveRequest, response *PerformLeaveResponse, ) error + // Handle sending an invite to a remote server. + PerformInvite( + ctx context.Context, + request *PerformInviteRequest, + response *PerformInviteResponse, + ) error // Notifies the federation sender that these servers may be online and to retry sending messages. PerformServersAlive( ctx context.Context, @@ -81,6 +87,16 @@ type PerformLeaveRequest struct { type PerformLeaveResponse struct { } +type PerformInviteRequest struct { + RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` + Event gomatrixserverlib.HeaderedEvent `json:"event"` + InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"` +} + +type PerformInviteResponse struct { + Event gomatrixserverlib.HeaderedEvent `json:"event"` +} + type PerformServersAliveRequest struct { Servers []gomatrixserverlib.ServerName } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index e09350f8..92b4d6f4 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -28,7 +28,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" ) // OutputRoomEventConsumer consumes events that originated in the room server. @@ -97,22 +96,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write room event failure") return nil } - case api.OutputTypeNewInviteEvent: - ev := &output.NewInviteEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "state_key": ev.StateKey(), - }).Info("received invite event from roomserver") - - if err := s.processInvite(*output.NewInviteEvent); err != nil { - // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - }).Panicf("roomserver output log: write invite event failure") - return nil - } default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -172,51 +155,6 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err ) } -// processInvite handles an invite event for sending over federation. -func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error { - // Don't try to reflect and resend invites that didn't originate from us. - if s.cfg.Matrix.ServerName != oie.Event.Origin() { - return nil - } - - // Ignore invites that don't have state keys - they are invalid. - if oie.Event.StateKey() == nil { - return fmt.Errorf("event %q doesn't have state key", oie.Event.EventID()) - } - - // Don't try to handle events that are actually destined for us. - stateKey := *oie.Event.StateKey() - _, destination, err := gomatrixserverlib.SplitID('@', stateKey) - if err != nil { - log.WithFields(log.Fields{ - "event_id": oie.Event.EventID(), - "state_key": stateKey, - }).Info("failed to split destination from state key") - return nil - } - if s.cfg.Matrix.ServerName == destination { - return nil - } - - // Try to extract the room invite state. The roomserver will have stashed - // this for us in invite_room_state if it didn't already exist. - strippedState := []gomatrixserverlib.InviteV2StrippedState{} - if inviteRoomState := gjson.GetBytes(oie.Event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() { - if err = json.Unmarshal([]byte(inviteRoomState.Raw), &strippedState); err != nil { - log.WithError(err).Warn("failed to extract invite_room_state from event unsigned") - } - } - - // Build the invite request with the info we've got. - inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState) - if err != nil { - return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err) - } - - // Send the event. - return s.queues.SendInvite(&inviteReq) -} - // joinedHostsAtEvent works out a list of matrix servers that were joined to // the room at the event. // It is important to use the state at the event for sending messages because: diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 1b8e360c..da8d41a7 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -296,6 +296,43 @@ func (r *FederationSenderInternalAPI) PerformLeave( ) } +// PerformLeaveRequest implements api.FederationSenderInternalAPI +func (r *FederationSenderInternalAPI) PerformInvite( + ctx context.Context, + request *api.PerformInviteRequest, + response *api.PerformInviteResponse, +) (err error) { + if request.Event.StateKey() == nil { + return errors.New("invite must be a state event") + } + + _, destination, err := gomatrixserverlib.SplitID('@', *request.Event.StateKey()) + if err != nil { + return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) + } + + logrus.WithFields(logrus.Fields{ + "event_id": request.Event.EventID(), + "user_id": *request.Event.StateKey(), + "room_id": request.Event.RoomID(), + "room_version": request.RoomVersion, + "destination": destination, + }).Info("Sending invite") + + inviteReq, err := gomatrixserverlib.NewInviteV2Request(&request.Event, request.InviteRoomState) + if err != nil { + return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err) + } + + inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq) + if err != nil { + return fmt.Errorf("r.federation.SendInviteV2: %w", err) + } + + response.Event = inviteRes.Event.Headered(request.RoomVersion) + return nil +} + // PerformServersAlive implements api.FederationSenderInternalAPI func (r *FederationSenderInternalAPI) PerformServersAlive( ctx context.Context, diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 4d968919..13c2c45a 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -18,6 +18,7 @@ const ( FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup" FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" + FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" ) @@ -49,6 +50,19 @@ func (h *httpFederationSenderInternalAPI) PerformLeave( return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } +// Handle sending an invite to a remote server. +func (h *httpFederationSenderInternalAPI) PerformInvite( + ctx context.Context, + request *api.PerformInviteRequest, + response *api.PerformInviteResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInviteRequest") + defer span.Finish() + + apiURL := h.federationSenderURL + FederationSenderPerformInviteRequestPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + func (h *httpFederationSenderInternalAPI) PerformServersAlive( ctx context.Context, request *api.PerformServersAliveRequest, diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go index 16ef4b09..f02cbd12 100644 --- a/federationsender/inthttp/server.go +++ b/federationsender/inthttp/server.go @@ -11,6 +11,7 @@ import ( ) // AddRoutes adds the FederationSenderInternalAPI handlers to the http.ServeMux. +// nolint:gocyclo func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Router) { internalAPIMux.Handle( FederationSenderQueryJoinedHostServerNamesInRoomPath, @@ -53,6 +54,20 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( + FederationSenderPerformInviteRequestPath, + httputil.MakeInternalAPI("PerformInviteRequest", func(req *http.Request) util.JSONResponse { + var request api.PerformInviteRequest + var response api.PerformInviteResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := intAPI.PerformInvite(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle( FederationSenderPerformDirectoryLookupRequestPath, httputil.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse { var request api.PerformDirectoryLookupRequest diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 9ccfbace..e9e117a7 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -46,20 +46,18 @@ type destinationQueue struct { db storage.Database signing *SigningInfo rsAPI api.RoomserverInternalAPI - client *gomatrixserverlib.FederationClient // federation client - origin gomatrixserverlib.ServerName // origin of requests - destination gomatrixserverlib.ServerName // destination of requests - running atomic.Bool // is the queue worker running? - backingOff atomic.Bool // true if we're backing off - statistics *statistics.ServerStatistics // statistics about this remote server - incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send - transactionIDMutex sync.Mutex // protects transactionID - transactionID gomatrixserverlib.TransactionID // last transaction ID - transactionCount atomic.Int32 // how many events in this transaction so far - pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend - notifyPDUs chan bool // interrupts idle wait for PDUs - notifyEDUs chan bool // interrupts idle wait for EDUs - interruptBackoff chan bool // interrupts backoff + client *gomatrixserverlib.FederationClient // federation client + origin gomatrixserverlib.ServerName // origin of requests + destination gomatrixserverlib.ServerName // destination of requests + running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off + statistics *statistics.ServerStatistics // statistics about this remote server + transactionIDMutex sync.Mutex // protects transactionID + transactionID gomatrixserverlib.TransactionID // last transaction ID + transactionCount atomic.Int32 // how many events in this transaction so far + notifyPDUs chan bool // interrupts idle wait for PDUs + notifyEDUs chan bool // interrupts idle wait for EDUs + interruptBackoff chan bool // interrupts backoff } // Send event adds the event to the pending queue for the destination. @@ -138,18 +136,6 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) { } } -// sendInvite adds the invite event to the pending queue for the -// destination. If the queue is empty then it starts a background -// goroutine to start sending events to that destination. -func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { - if oq.statistics.Blacklisted() { - // If the destination is blacklisted then drop the event. - return - } - oq.wakeQueueIfNeeded() - oq.incomingInvites <- ev -} - // wakeQueueIfNeeded will wake up the destination queue if it is // not already running. If it is running but it is backing off // then we will interrupt the backoff, causing any federation @@ -234,23 +220,6 @@ func (oq *destinationQueue) backgroundSend() { // We were woken up because there are new PDUs waiting in the // database. pendingEDUs = true - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } case <-time.After(queueIdleTimeout): // The worker is idle so stop the goroutine. It'll get // restarted automatically the next time we have an event to @@ -266,7 +235,6 @@ func (oq *destinationQueue) backgroundSend() { // It's been suggested that we should give up because the backoff // has exceeded a maximum allowable value. Clean up the in-memory // buffers at this point. The PDU clean-up is already on a defer. - oq.cleanPendingInvites() log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination) return } @@ -284,33 +252,7 @@ func (oq *destinationQueue) backgroundSend() { oq.statistics.Success() } } - - // Try sending the next invite and see what happens. - if len(oq.pendingInvites) > 0 { - sent, ierr := oq.nextInvites(oq.pendingInvites) - if ierr != nil { - // We failed to send the transaction. Mark it as a failure. - oq.statistics.Failure() - } else if sent > 0 { - // If we successfully sent the invites then clear out - // the pending invites. - oq.statistics.Success() - // Reallocate so that the underlying array can be GC'd, as - // opposed to growing forever. - oq.cleanPendingInvites() - } - } - } -} - -// cleanPendingInvites cleans out the pending invite buffer, -// removing all references so that the underlying objects can -// be GC'd. -func (oq *destinationQueue) cleanPendingInvites() { - for i := 0; i < len(oq.pendingInvites); i++ { - oq.pendingInvites[i] = nil } - oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{} } // nextTransaction creates a new transaction from the pending event @@ -427,66 +369,3 @@ func (oq *destinationQueue) nextTransaction() (bool, error) { return false, err } } - -// nextInvite takes pending invite events from the queue and sends -// them. Returns true if a transaction was sent or false otherwise. -func (oq *destinationQueue) nextInvites( - pendingInvites []*gomatrixserverlib.InviteV2Request, -) (int, error) { - done := 0 - for _, inviteReq := range pendingInvites { - ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion() - - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_version": roomVersion, - "destination": oq.destination, - }).Info("sending invite") - - inviteRes, err := oq.client.SendInviteV2( - context.TODO(), - oq.destination, - *inviteReq, - ) - switch e := err.(type) { - case nil: - done++ - case gomatrix.HTTPError: - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "state_key": ev.StateKey(), - "destination": oq.destination, - "status_code": e.Code, - }).WithError(err).Error("failed to send invite due to HTTP error") - // Check whether we should do something about the error or - // just accept it as unavoidable. - if e.Code >= 400 && e.Code <= 499 { - // We tried but the remote side has sent back a client error. - // It's no use retrying because it will happen again. - done++ - continue - } - return done, err - default: - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "state_key": ev.StateKey(), - "destination": oq.destination, - }).WithError(err).Error("failed to send invite") - return done, err - } - - invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion) - _, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil) - if err != nil { - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "state_key": ev.StateKey(), - "destination": oq.destination, - }).WithError(err).Error("failed to return signed invite to roomserver") - return done, err - } - } - - return done, nil -} diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 6d856fe2..6561251d 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -108,7 +108,6 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d destination: destination, client: oqs.client, statistics: oqs.statistics.ForServer(destination), - incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), notifyPDUs: make(chan bool, 1), notifyEDUs: make(chan bool, 1), interruptBackoff: make(chan bool), @@ -178,51 +177,6 @@ func (oqs *OutgoingQueues) SendEvent( return nil } -// SendEvent sends an event to the destinations -func (oqs *OutgoingQueues) SendInvite( - inviteReq *gomatrixserverlib.InviteV2Request, -) error { - ev := inviteReq.Event() - stateKey := ev.StateKey() - if stateKey == nil { - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - }).Info("Invite had no state key, dropping") - return nil - } - - _, destination, err := gomatrixserverlib.SplitID('@', *stateKey) - if err != nil { - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "state_key": stateKey, - }).Info("Failed to split destination from state key") - return nil - } - - if stateapi.IsServerBannedFromRoom( - context.TODO(), - oqs.stateAPI, - ev.RoomID(), - destination, - ) { - log.WithFields(log.Fields{ - "room_id": ev.RoomID(), - "destination": destination, - }).Info("Dropping invite to server which is prohibited by ACLs") - return nil - } - - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "server_name": destination, - }).Info("Sending invite") - - oqs.getQueue(destination).sendInvite(inviteReq) - - return nil -} - // SendEDU sends an EDU event to the destinations. func (oqs *OutgoingQueues) SendEDU( e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, |