aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-01 18:34:08 +0100
committerGitHub <noreply@github.com>2020-06-01 18:34:08 +0100
commitcfc137652ed0d783d946836cf4e9e18267a438dc (patch)
tree8f1840fe84fad01ec010cdb2818c114ec61093b3 /federationsender
parenta5d822004dd93d6f6a7ed73371aeb4bfb163b5ba (diff)
Add a way to force federationsender to retry sending transactions (#1077)
* Add a way to force federationsender to retry sending transactions And use it in P2P mode when we pick up new nodes. * Linting * Use atomic bool to stop us blocking on the channel
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/api/api.go6
-rw-r--r--federationsender/api/perform.go22
-rw-r--r--federationsender/federationsender.go2
-rw-r--r--federationsender/internal/api.go17
-rw-r--r--federationsender/internal/perform.go13
-rw-r--r--federationsender/queue/destinationqueue.go33
-rw-r--r--federationsender/queue/queue.go16
7 files changed, 106 insertions, 3 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index 678f02e6..4eb20cb6 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -42,6 +42,12 @@ type FederationSenderInternalAPI interface {
request *PerformLeaveRequest,
response *PerformLeaveResponse,
) error
+ // Notifies the federation sender that these servers may be online and to retry sending messages.
+ PerformServersAlive(
+ ctx context.Context,
+ request *PerformServersAliveRequest,
+ response *PerformServersAliveResponse,
+ ) error
}
// NewFederationSenderInternalAPIHTTP creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
diff --git a/federationsender/api/perform.go b/federationsender/api/perform.go
index 2a1834b6..5e4d7fe9 100644
--- a/federationsender/api/perform.go
+++ b/federationsender/api/perform.go
@@ -18,6 +18,9 @@ const (
// FederationSenderPerformLeaveRequestPath is the HTTP path for the PerformLeaveRequest API.
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
+
+ // FederationSenderPerformServersAlivePath is the HTTP path for the PerformServersAlive API.
+ FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
)
type PerformDirectoryLookupRequest struct {
@@ -88,3 +91,22 @@ func (h *httpFederationSenderInternalAPI) PerformLeave(
apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+
+type PerformServersAliveRequest struct {
+ Servers []gomatrixserverlib.ServerName
+}
+
+type PerformServersAliveResponse struct {
+}
+
+func (h *httpFederationSenderInternalAPI) PerformServersAlive(
+ ctx context.Context,
+ request *PerformServersAliveRequest,
+ response *PerformServersAliveResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformServersAlive")
+ defer span.Finish()
+
+ apiURL := h.federationSenderURL + FederationSenderPerformServersAlivePath
+ return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index cca847a5..9e5cc8dd 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -67,7 +67,7 @@ func SetupFederationSenderComponent(
queryAPI := internal.NewFederationSenderInternalAPI(
federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing,
- statistics,
+ statistics, queues,
)
queryAPI.SetupHTTP(base.InternalAPIMux)
diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go
index dd394258..edf8fb4e 100644
--- a/federationsender/internal/api.go
+++ b/federationsender/internal/api.go
@@ -7,6 +7,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/producers"
+ "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
@@ -24,6 +25,7 @@ type FederationSenderInternalAPI struct {
producer *producers.RoomserverProducer
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
+ queues *queue.OutgoingQueues
}
func NewFederationSenderInternalAPI(
@@ -32,6 +34,7 @@ func NewFederationSenderInternalAPI(
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
statistics *types.Statistics,
+ queues *queue.OutgoingQueues,
) *FederationSenderInternalAPI {
return &FederationSenderInternalAPI{
db: db,
@@ -40,6 +43,7 @@ func NewFederationSenderInternalAPI(
federation: federation,
keyRing: keyRing,
statistics: statistics,
+ queues: queues,
}
}
@@ -112,4 +116,17 @@ func (f *FederationSenderInternalAPI) SetupHTTP(internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(api.FederationSenderPerformServersAlivePath,
+ internal.MakeInternalAPI("PerformServersAliveRequest", func(req *http.Request) util.JSONResponse {
+ var request api.PerformServersAliveRequest
+ var response api.PerformServersAliveResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := f.PerformServersAlive(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
}
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 383ce488..c601e960 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -275,3 +275,16 @@ func (r *FederationSenderInternalAPI) PerformLeave(
request.RoomID, len(request.ServerNames),
)
}
+
+// PerformServersAlive implements api.FederationSenderInternalAPI
+func (r *FederationSenderInternalAPI) PerformServersAlive(
+ ctx context.Context,
+ request *api.PerformServersAliveRequest,
+ response *api.PerformServersAliveResponse,
+) (err error) {
+ for _, srv := range request.Servers {
+ r.queues.RetryServer(srv)
+ }
+
+ return nil
+}
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 09dac464..4ab610de 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -39,6 +39,7 @@ type destinationQueue struct {
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
+ backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
@@ -47,6 +48,28 @@ type destinationQueue struct {
pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
+ retryServerCh chan bool // interrupts backoff
+}
+
+// retry will clear the blacklist state and attempt to send built up events to the server,
+// resetting and interrupting any backoff timers.
+func (oq *destinationQueue) retry() {
+ // TODO: We don't send all events in the case where the server has been blacklisted as we
+ // drop events instead then. This means we will send the oldest N events (chan size, currently 128)
+ // and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events
+ // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database
+ // so we can send a lot of events.
+ oq.statistics.Success()
+ // if we were backing off, swap to not backing off and interrupt the select.
+ // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel
+ // as it is unbuffered.
+ if oq.backingOff.CAS(true, false) {
+ oq.retryServerCh <- true
+ }
+ if !oq.running.Load() {
+ log.Infof("Restarting queue for %s", oq.destination)
+ go oq.backgroundSend()
+ }
}
// Send event adds the event to the pending queue for the destination.
@@ -155,9 +178,15 @@ func (oq *destinationQueue) backgroundSend() {
}
// If we are backing off this server then wait for the
- // backoff duration to complete first.
+ // backoff duration to complete first, or until explicitly
+ // told to retry.
if backoff, duration := oq.statistics.BackoffDuration(); backoff {
- <-time.After(duration)
+ oq.backingOff.Store(true)
+ select {
+ case <-time.After(duration):
+ case <-oq.retryServerCh:
+ }
+ oq.backingOff.Store(false)
}
// How many things do we have waiting?
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index aae6c53a..386a3397 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -52,6 +52,12 @@ func NewOutgoingQueues(
}
}
+func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue {
+ oqs.queuesMutex.Lock()
+ defer oqs.queuesMutex.Unlock()
+ return oqs.queues[destination]
+}
+
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@@ -66,6 +72,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
+ retryServerCh: make(chan bool),
}
oqs.queues[destination] = oq
}
@@ -160,6 +167,15 @@ func (oqs *OutgoingQueues) SendEDU(
return nil
}
+// RetryServer attempts to resend events to the given server if we had given up.
+func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
+ q := oqs.getQueueIfExists(srv)
+ if q == nil {
+ return
+ }
+ q.retry()
+}
+
// filterAndDedupeDests removes our own server from the list of destinations
// and deduplicates any servers in the list that may appear more than once.
func filterAndDedupeDests(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) (