aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-17 11:40:49 +0100
committerGitHub <noreply@github.com>2020-08-17 11:40:49 +0100
commit6cb1a65809ccfbeaede6ff164c281ba0ddf90ab7 (patch)
tree56406ef5a5d4335d03e7805c19fa35aad805871e /federationsender
parent6820b3e024474e2c44f7a0632261d2dc86257d77 (diff)
Synchronous invites (#1273)
* Refactor invites to be synchronous * Fix synchronous invites * Fix client API return type for send invite error * Linter * Restore PerformError on rsAPI.PerformInvite * Update sytest-whitelist * Don't override PerformError with normal errors * Fix error passing * Un-whitelist a couple of tests * Update sytest-whitelist * Try to handle multiple invite rejections better * nolint * Update gomatrixserverlib * Fix /v1/invite test * Remove replace from go.mod
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/api/api.go16
-rw-r--r--federationsender/consumers/roomserver.go62
-rw-r--r--federationsender/internal/perform.go37
-rw-r--r--federationsender/inthttp/client.go14
-rw-r--r--federationsender/inthttp/server.go15
-rw-r--r--federationsender/queue/destinationqueue.go145
-rw-r--r--federationsender/queue/queue.go46
7 files changed, 94 insertions, 241 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index b87af0eb..9f9c2645 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -36,6 +36,12 @@ type FederationSenderInternalAPI interface {
request *PerformLeaveRequest,
response *PerformLeaveResponse,
) error
+ // Handle sending an invite to a remote server.
+ PerformInvite(
+ ctx context.Context,
+ request *PerformInviteRequest,
+ response *PerformInviteResponse,
+ ) error
// Notifies the federation sender that these servers may be online and to retry sending messages.
PerformServersAlive(
ctx context.Context,
@@ -81,6 +87,16 @@ type PerformLeaveRequest struct {
type PerformLeaveResponse struct {
}
+type PerformInviteRequest struct {
+ RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
+ Event gomatrixserverlib.HeaderedEvent `json:"event"`
+ InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
+}
+
+type PerformInviteResponse struct {
+ Event gomatrixserverlib.HeaderedEvent `json:"event"`
+}
+
type PerformServersAliveRequest struct {
Servers []gomatrixserverlib.ServerName
}
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index e09350f8..92b4d6f4 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -28,7 +28,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
- "github.com/tidwall/gjson"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@@ -97,22 +96,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write room event failure")
return nil
}
- case api.OutputTypeNewInviteEvent:
- ev := &output.NewInviteEvent.Event
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_id": ev.RoomID(),
- "state_key": ev.StateKey(),
- }).Info("received invite event from roomserver")
-
- if err := s.processInvite(*output.NewInviteEvent); err != nil {
- // panic rather than continue with an inconsistent database
- log.WithFields(log.Fields{
- "event": string(ev.JSON()),
- log.ErrorKey: err,
- }).Panicf("roomserver output log: write invite event failure")
- return nil
- }
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
@@ -172,51 +155,6 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
)
}
-// processInvite handles an invite event for sending over federation.
-func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
- // Don't try to reflect and resend invites that didn't originate from us.
- if s.cfg.Matrix.ServerName != oie.Event.Origin() {
- return nil
- }
-
- // Ignore invites that don't have state keys - they are invalid.
- if oie.Event.StateKey() == nil {
- return fmt.Errorf("event %q doesn't have state key", oie.Event.EventID())
- }
-
- // Don't try to handle events that are actually destined for us.
- stateKey := *oie.Event.StateKey()
- _, destination, err := gomatrixserverlib.SplitID('@', stateKey)
- if err != nil {
- log.WithFields(log.Fields{
- "event_id": oie.Event.EventID(),
- "state_key": stateKey,
- }).Info("failed to split destination from state key")
- return nil
- }
- if s.cfg.Matrix.ServerName == destination {
- return nil
- }
-
- // Try to extract the room invite state. The roomserver will have stashed
- // this for us in invite_room_state if it didn't already exist.
- strippedState := []gomatrixserverlib.InviteV2StrippedState{}
- if inviteRoomState := gjson.GetBytes(oie.Event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
- if err = json.Unmarshal([]byte(inviteRoomState.Raw), &strippedState); err != nil {
- log.WithError(err).Warn("failed to extract invite_room_state from event unsigned")
- }
- }
-
- // Build the invite request with the info we've got.
- inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState)
- if err != nil {
- return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
- }
-
- // Send the event.
- return s.queues.SendInvite(&inviteReq)
-}
-
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event.
// It is important to use the state at the event for sending messages because:
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 1b8e360c..da8d41a7 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -296,6 +296,43 @@ func (r *FederationSenderInternalAPI) PerformLeave(
)
}
+// PerformLeaveRequest implements api.FederationSenderInternalAPI
+func (r *FederationSenderInternalAPI) PerformInvite(
+ ctx context.Context,
+ request *api.PerformInviteRequest,
+ response *api.PerformInviteResponse,
+) (err error) {
+ if request.Event.StateKey() == nil {
+ return errors.New("invite must be a state event")
+ }
+
+ _, destination, err := gomatrixserverlib.SplitID('@', *request.Event.StateKey())
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
+ }
+
+ logrus.WithFields(logrus.Fields{
+ "event_id": request.Event.EventID(),
+ "user_id": *request.Event.StateKey(),
+ "room_id": request.Event.RoomID(),
+ "room_version": request.RoomVersion,
+ "destination": destination,
+ }).Info("Sending invite")
+
+ inviteReq, err := gomatrixserverlib.NewInviteV2Request(&request.Event, request.InviteRoomState)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
+ }
+
+ inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq)
+ if err != nil {
+ return fmt.Errorf("r.federation.SendInviteV2: %w", err)
+ }
+
+ response.Event = inviteRes.Event.Headered(request.RoomVersion)
+ return nil
+}
+
// PerformServersAlive implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go
index 4d968919..13c2c45a 100644
--- a/federationsender/inthttp/client.go
+++ b/federationsender/inthttp/client.go
@@ -18,6 +18,7 @@ const (
FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup"
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
+ FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
)
@@ -49,6 +50,19 @@ func (h *httpFederationSenderInternalAPI) PerformLeave(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+// Handle sending an invite to a remote server.
+func (h *httpFederationSenderInternalAPI) PerformInvite(
+ ctx context.Context,
+ request *api.PerformInviteRequest,
+ response *api.PerformInviteResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInviteRequest")
+ defer span.Finish()
+
+ apiURL := h.federationSenderURL + FederationSenderPerformInviteRequestPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
request *api.PerformServersAliveRequest,
diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go
index 16ef4b09..f02cbd12 100644
--- a/federationsender/inthttp/server.go
+++ b/federationsender/inthttp/server.go
@@ -11,6 +11,7 @@ import (
)
// AddRoutes adds the FederationSenderInternalAPI handlers to the http.ServeMux.
+// nolint:gocyclo
func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Router) {
internalAPIMux.Handle(
FederationSenderQueryJoinedHostServerNamesInRoomPath,
@@ -53,6 +54,20 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
}),
)
internalAPIMux.Handle(
+ FederationSenderPerformInviteRequestPath,
+ httputil.MakeInternalAPI("PerformInviteRequest", func(req *http.Request) util.JSONResponse {
+ var request api.PerformInviteRequest
+ var response api.PerformInviteResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := intAPI.PerformInvite(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
+ internalAPIMux.Handle(
FederationSenderPerformDirectoryLookupRequestPath,
httputil.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformDirectoryLookupRequest
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 9ccfbace..e9e117a7 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -46,20 +46,18 @@ type destinationQueue struct {
db storage.Database
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
- client *gomatrixserverlib.FederationClient // federation client
- origin gomatrixserverlib.ServerName // origin of requests
- destination gomatrixserverlib.ServerName // destination of requests
- running atomic.Bool // is the queue worker running?
- backingOff atomic.Bool // true if we're backing off
- statistics *statistics.ServerStatistics // statistics about this remote server
- incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
- transactionIDMutex sync.Mutex // protects transactionID
- transactionID gomatrixserverlib.TransactionID // last transaction ID
- transactionCount atomic.Int32 // how many events in this transaction so far
- pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
- notifyPDUs chan bool // interrupts idle wait for PDUs
- notifyEDUs chan bool // interrupts idle wait for EDUs
- interruptBackoff chan bool // interrupts backoff
+ client *gomatrixserverlib.FederationClient // federation client
+ origin gomatrixserverlib.ServerName // origin of requests
+ destination gomatrixserverlib.ServerName // destination of requests
+ running atomic.Bool // is the queue worker running?
+ backingOff atomic.Bool // true if we're backing off
+ statistics *statistics.ServerStatistics // statistics about this remote server
+ transactionIDMutex sync.Mutex // protects transactionID
+ transactionID gomatrixserverlib.TransactionID // last transaction ID
+ transactionCount atomic.Int32 // how many events in this transaction so far
+ notifyPDUs chan bool // interrupts idle wait for PDUs
+ notifyEDUs chan bool // interrupts idle wait for EDUs
+ interruptBackoff chan bool // interrupts backoff
}
// Send event adds the event to the pending queue for the destination.
@@ -138,18 +136,6 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
}
}
-// sendInvite adds the invite event to the pending queue for the
-// destination. If the queue is empty then it starts a background
-// goroutine to start sending events to that destination.
-func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
- if oq.statistics.Blacklisted() {
- // If the destination is blacklisted then drop the event.
- return
- }
- oq.wakeQueueIfNeeded()
- oq.incomingInvites <- ev
-}
-
// wakeQueueIfNeeded will wake up the destination queue if it is
// not already running. If it is running but it is backing off
// then we will interrupt the backoff, causing any federation
@@ -234,23 +220,6 @@ func (oq *destinationQueue) backgroundSend() {
// We were woken up because there are new PDUs waiting in the
// database.
pendingEDUs = true
- case invite := <-oq.incomingInvites:
- // There's no strict ordering requirement for invites like
- // there is for transactions, so we put the invite onto the
- // front of the queue. This means that if an invite that is
- // stuck failing already, that it won't block our new invite
- // from being sent.
- oq.pendingInvites = append(
- []*gomatrixserverlib.InviteV2Request{invite},
- oq.pendingInvites...,
- )
- // If there are any more things waiting in the channel queue
- // then read them. This is safe because we guarantee only
- // having one goroutine per destination queue, so the channel
- // isn't being consumed anywhere else.
- for len(oq.incomingInvites) > 0 {
- oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
- }
case <-time.After(queueIdleTimeout):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
@@ -266,7 +235,6 @@ func (oq *destinationQueue) backgroundSend() {
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
- oq.cleanPendingInvites()
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
return
}
@@ -284,33 +252,7 @@ func (oq *destinationQueue) backgroundSend() {
oq.statistics.Success()
}
}
-
- // Try sending the next invite and see what happens.
- if len(oq.pendingInvites) > 0 {
- sent, ierr := oq.nextInvites(oq.pendingInvites)
- if ierr != nil {
- // We failed to send the transaction. Mark it as a failure.
- oq.statistics.Failure()
- } else if sent > 0 {
- // If we successfully sent the invites then clear out
- // the pending invites.
- oq.statistics.Success()
- // Reallocate so that the underlying array can be GC'd, as
- // opposed to growing forever.
- oq.cleanPendingInvites()
- }
- }
- }
-}
-
-// cleanPendingInvites cleans out the pending invite buffer,
-// removing all references so that the underlying objects can
-// be GC'd.
-func (oq *destinationQueue) cleanPendingInvites() {
- for i := 0; i < len(oq.pendingInvites); i++ {
- oq.pendingInvites[i] = nil
}
- oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
}
// nextTransaction creates a new transaction from the pending event
@@ -427,66 +369,3 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
return false, err
}
}
-
-// nextInvite takes pending invite events from the queue and sends
-// them. Returns true if a transaction was sent or false otherwise.
-func (oq *destinationQueue) nextInvites(
- pendingInvites []*gomatrixserverlib.InviteV2Request,
-) (int, error) {
- done := 0
- for _, inviteReq := range pendingInvites {
- ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion()
-
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "room_version": roomVersion,
- "destination": oq.destination,
- }).Info("sending invite")
-
- inviteRes, err := oq.client.SendInviteV2(
- context.TODO(),
- oq.destination,
- *inviteReq,
- )
- switch e := err.(type) {
- case nil:
- done++
- case gomatrix.HTTPError:
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "state_key": ev.StateKey(),
- "destination": oq.destination,
- "status_code": e.Code,
- }).WithError(err).Error("failed to send invite due to HTTP error")
- // Check whether we should do something about the error or
- // just accept it as unavoidable.
- if e.Code >= 400 && e.Code <= 499 {
- // We tried but the remote side has sent back a client error.
- // It's no use retrying because it will happen again.
- done++
- continue
- }
- return done, err
- default:
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "state_key": ev.StateKey(),
- "destination": oq.destination,
- }).WithError(err).Error("failed to send invite")
- return done, err
- }
-
- invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion)
- _, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil)
- if err != nil {
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "state_key": ev.StateKey(),
- "destination": oq.destination,
- }).WithError(err).Error("failed to return signed invite to roomserver")
- return done, err
- }
- }
-
- return done, nil
-}
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 6d856fe2..6561251d 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -108,7 +108,6 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
- incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
notifyPDUs: make(chan bool, 1),
notifyEDUs: make(chan bool, 1),
interruptBackoff: make(chan bool),
@@ -178,51 +177,6 @@ func (oqs *OutgoingQueues) SendEvent(
return nil
}
-// SendEvent sends an event to the destinations
-func (oqs *OutgoingQueues) SendInvite(
- inviteReq *gomatrixserverlib.InviteV2Request,
-) error {
- ev := inviteReq.Event()
- stateKey := ev.StateKey()
- if stateKey == nil {
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- }).Info("Invite had no state key, dropping")
- return nil
- }
-
- _, destination, err := gomatrixserverlib.SplitID('@', *stateKey)
- if err != nil {
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "state_key": stateKey,
- }).Info("Failed to split destination from state key")
- return nil
- }
-
- if stateapi.IsServerBannedFromRoom(
- context.TODO(),
- oqs.stateAPI,
- ev.RoomID(),
- destination,
- ) {
- log.WithFields(log.Fields{
- "room_id": ev.RoomID(),
- "destination": destination,
- }).Info("Dropping invite to server which is prohibited by ACLs")
- return nil
- }
-
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "server_name": destination,
- }).Info("Sending invite")
-
- oqs.getQueue(destination).sendInvite(inviteReq)
-
- return nil
-}
-
// SendEDU sends an EDU event to the destinations.
func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,