diff options
author | Kegsay <kegan@matrix.org> | 2020-06-01 18:34:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-01 18:34:08 +0100 |
commit | cfc137652ed0d783d946836cf4e9e18267a438dc (patch) | |
tree | 8f1840fe84fad01ec010cdb2818c114ec61093b3 /federationsender | |
parent | a5d822004dd93d6f6a7ed73371aeb4bfb163b5ba (diff) |
Add a way to force federationsender to retry sending transactions (#1077)
* Add a way to force federationsender to retry sending transactions
And use it in P2P mode when we pick up new nodes.
* Linting
* Use atomic bool to stop us blocking on the channel
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/api/api.go | 6 | ||||
-rw-r--r-- | federationsender/api/perform.go | 22 | ||||
-rw-r--r-- | federationsender/federationsender.go | 2 | ||||
-rw-r--r-- | federationsender/internal/api.go | 17 | ||||
-rw-r--r-- | federationsender/internal/perform.go | 13 | ||||
-rw-r--r-- | federationsender/queue/destinationqueue.go | 33 | ||||
-rw-r--r-- | federationsender/queue/queue.go | 16 |
7 files changed, 106 insertions, 3 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 678f02e6..4eb20cb6 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -42,6 +42,12 @@ type FederationSenderInternalAPI interface { request *PerformLeaveRequest, response *PerformLeaveResponse, ) error + // Notifies the federation sender that these servers may be online and to retry sending messages. + PerformServersAlive( + ctx context.Context, + request *PerformServersAliveRequest, + response *PerformServersAliveResponse, + ) error } // NewFederationSenderInternalAPIHTTP creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. diff --git a/federationsender/api/perform.go b/federationsender/api/perform.go index 2a1834b6..5e4d7fe9 100644 --- a/federationsender/api/perform.go +++ b/federationsender/api/perform.go @@ -18,6 +18,9 @@ const ( // FederationSenderPerformLeaveRequestPath is the HTTP path for the PerformLeaveRequest API. FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" + + // FederationSenderPerformServersAlivePath is the HTTP path for the PerformServersAlive API. + FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" ) type PerformDirectoryLookupRequest struct { @@ -88,3 +91,22 @@ func (h *httpFederationSenderInternalAPI) PerformLeave( apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +type PerformServersAliveRequest struct { + Servers []gomatrixserverlib.ServerName +} + +type PerformServersAliveResponse struct { +} + +func (h *httpFederationSenderInternalAPI) PerformServersAlive( + ctx context.Context, + request *PerformServersAliveRequest, + response *PerformServersAliveResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformServersAlive") + defer span.Finish() + + apiURL := h.federationSenderURL + FederationSenderPerformServersAlivePath + return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index cca847a5..9e5cc8dd 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -67,7 +67,7 @@ func SetupFederationSenderComponent( queryAPI := internal.NewFederationSenderInternalAPI( federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, - statistics, + statistics, queues, ) queryAPI.SetupHTTP(base.InternalAPIMux) diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index dd394258..edf8fb4e 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -7,6 +7,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/producers" + "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" @@ -24,6 +25,7 @@ type FederationSenderInternalAPI struct { producer *producers.RoomserverProducer federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing + queues *queue.OutgoingQueues } func NewFederationSenderInternalAPI( @@ -32,6 +34,7 @@ func NewFederationSenderInternalAPI( federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, statistics *types.Statistics, + queues *queue.OutgoingQueues, ) *FederationSenderInternalAPI { return &FederationSenderInternalAPI{ db: db, @@ -40,6 +43,7 @@ func NewFederationSenderInternalAPI( federation: federation, keyRing: keyRing, statistics: statistics, + queues: queues, } } @@ -112,4 +116,17 @@ func (f *FederationSenderInternalAPI) SetupHTTP(internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(api.FederationSenderPerformServersAlivePath, + internal.MakeInternalAPI("PerformServersAliveRequest", func(req *http.Request) util.JSONResponse { + var request api.PerformServersAliveRequest + var response api.PerformServersAliveResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := f.PerformServersAlive(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 383ce488..c601e960 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -275,3 +275,16 @@ func (r *FederationSenderInternalAPI) PerformLeave( request.RoomID, len(request.ServerNames), ) } + +// PerformServersAlive implements api.FederationSenderInternalAPI +func (r *FederationSenderInternalAPI) PerformServersAlive( + ctx context.Context, + request *api.PerformServersAliveRequest, + response *api.PerformServersAliveResponse, +) (err error) { + for _, srv := range request.Servers { + r.queues.RetryServer(srv) + } + + return nil +} diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 09dac464..4ab610de 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -39,6 +39,7 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName // origin of requests destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send @@ -47,6 +48,28 @@ type destinationQueue struct { pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + retryServerCh chan bool // interrupts backoff +} + +// retry will clear the blacklist state and attempt to send built up events to the server, +// resetting and interrupting any backoff timers. +func (oq *destinationQueue) retry() { + // TODO: We don't send all events in the case where the server has been blacklisted as we + // drop events instead then. This means we will send the oldest N events (chan size, currently 128) + // and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events + // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database + // so we can send a lot of events. + oq.statistics.Success() + // if we were backing off, swap to not backing off and interrupt the select. + // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel + // as it is unbuffered. + if oq.backingOff.CAS(true, false) { + oq.retryServerCh <- true + } + if !oq.running.Load() { + log.Infof("Restarting queue for %s", oq.destination) + go oq.backgroundSend() + } } // Send event adds the event to the pending queue for the destination. @@ -155,9 +178,15 @@ func (oq *destinationQueue) backgroundSend() { } // If we are backing off this server then wait for the - // backoff duration to complete first. + // backoff duration to complete first, or until explicitly + // told to retry. if backoff, duration := oq.statistics.BackoffDuration(); backoff { - <-time.After(duration) + oq.backingOff.Store(true) + select { + case <-time.After(duration): + case <-oq.retryServerCh: + } + oq.backingOff.Store(false) } // How many things do we have waiting? diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index aae6c53a..386a3397 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -52,6 +52,12 @@ func NewOutgoingQueues( } } +func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + return oqs.queues[destination] +} + func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() @@ -66,6 +72,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + retryServerCh: make(chan bool), } oqs.queues[destination] = oq } @@ -160,6 +167,15 @@ func (oqs *OutgoingQueues) SendEDU( return nil } +// RetryServer attempts to resend events to the given server if we had given up. +func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { + q := oqs.getQueueIfExists(srv) + if q == nil { + return + } + q.retry() +} + // filterAndDedupeDests removes our own server from the list of destinations // and deduplicates any servers in the list that may appear more than once. func filterAndDedupeDests(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) ( |