aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-08-20 17:03:07 +0100
committerGitHub <noreply@github.com>2020-08-20 17:03:07 +0100
commit6d6bb7513710db1009c474eff031434916feda1b (patch)
tree62eb40ba6944580a7cf6da1541d7070ad65bd362 /federationsender
parent068a3d3c9f9be3473b68e3a13912182caf1c7117 (diff)
Add FederationClient interface to federationsender (#1284)
* Add FederationClient interface to federationsender - Use a shim struct in HTTP mode to keep the same API as `FederationClient`. - Use `federationsender` instead of `FederationClient` in `keyserver`. * Pointers not values * Review comments * Fix unit tests * Rejig backoff * Unbreak test * Remove debug logs * Review comments and linting
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/api/api.go24
-rw-r--r--federationsender/internal/api.go104
-rw-r--r--federationsender/inthttp/client.go95
-rw-r--r--federationsender/inthttp/server.go66
-rw-r--r--federationsender/statistics/statistics.go11
5 files changed, 297 insertions, 3 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index 9f9c2645..cea0010d 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -2,14 +2,38 @@ package api
import (
"context"
+ "fmt"
+ "time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
)
+// FederationClient is a subset of gomatrixserverlib.FederationClient functions which the fedsender
+// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in
+// this interface are of type FederationClientError
+type FederationClient interface {
+ GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error)
+ ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error)
+ QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error)
+}
+
+// FederationClientError is returned from FederationClient methods in the event of a problem.
+type FederationClientError struct {
+ Err string
+ RetryAfter time.Duration
+ Blacklisted bool
+}
+
+func (e *FederationClientError) Error() string {
+ return fmt.Sprintf("%s - (retry_after=%d, blacklisted=%v)", e.Err, e.RetryAfter, e.Blacklisted)
+}
+
// FederationSenderInternalAPI is used to query information from the federation sender.
type FederationSenderInternalAPI interface {
+ FederationClient
+
// PerformDirectoryLookup looks up a remote room ID from a room alias.
PerformDirectoryLookup(
ctx context.Context,
diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go
index 647e3fcb..6b5f4c34 100644
--- a/federationsender/internal/api.go
+++ b/federationsender/internal/api.go
@@ -1,11 +1,16 @@
package internal
import (
+ "context"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal/config"
- "github.com/matrix-org/dendrite/roomserver/api"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -14,7 +19,7 @@ type FederationSenderInternalAPI struct {
db storage.Database
cfg *config.FederationSender
statistics *statistics.Statistics
- rsAPI api.RoomserverInternalAPI
+ rsAPI roomserverAPI.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
queues *queue.OutgoingQueues
@@ -22,7 +27,7 @@ type FederationSenderInternalAPI struct {
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.FederationSender,
- rsAPI api.RoomserverInternalAPI,
+ rsAPI roomserverAPI.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
statistics *statistics.Statistics,
@@ -38,3 +43,96 @@ func NewFederationSenderInternalAPI(
queues: queues,
}
}
+
+func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) {
+ stats := a.statistics.ForServer(s)
+ until, blacklisted := stats.BackoffInfo()
+ if blacklisted {
+ return stats, &api.FederationClientError{
+ Blacklisted: true,
+ }
+ }
+ now := time.Now()
+ if until != nil && now.Before(*until) {
+ return stats, &api.FederationClientError{
+ RetryAfter: time.Until(*until),
+ }
+ }
+
+ return stats, nil
+}
+
+func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) {
+ if err == nil {
+ return
+ }
+ mxerr, ok := err.(gomatrix.HTTPError)
+ if !ok {
+ return stats.Failure()
+ }
+ if mxerr.Code >= 500 && mxerr.Code < 600 {
+ return stats.Failure()
+ }
+ return
+}
+
+func (a *FederationSenderInternalAPI) doRequest(
+ s gomatrixserverlib.ServerName, request func() (interface{}, error),
+) (interface{}, error) {
+ stats, err := a.isBlacklistedOrBackingOff(s)
+ if err != nil {
+ return nil, err
+ }
+ res, err := request()
+ if err != nil {
+ until, blacklisted := failBlacklistableError(err, stats)
+ now := time.Now()
+ var retryAfter time.Duration
+ if until.After(now) {
+ retryAfter = time.Until(until)
+ }
+ return res, &api.FederationClientError{
+ Err: err.Error(),
+ Blacklisted: blacklisted,
+ RetryAfter: retryAfter,
+ }
+ }
+ stats.Success()
+ return res, nil
+}
+
+func (a *FederationSenderInternalAPI) GetUserDevices(
+ ctx context.Context, s gomatrixserverlib.ServerName, userID string,
+) (gomatrixserverlib.RespUserDevices, error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.GetUserDevices(ctx, s, userID)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespUserDevices{}, err
+ }
+ return ires.(gomatrixserverlib.RespUserDevices), nil
+}
+
+func (a *FederationSenderInternalAPI) ClaimKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string,
+) (gomatrixserverlib.RespClaimKeys, error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespClaimKeys{}, err
+ }
+ return ires.(gomatrixserverlib.RespClaimKeys), nil
+}
+
+func (a *FederationSenderInternalAPI) QueryKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
+) (gomatrixserverlib.RespQueryKeys, error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.QueryKeys(ctx, s, keys)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespQueryKeys{}, err
+ }
+ return ires.(gomatrixserverlib.RespQueryKeys), nil
+}
diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go
index 13c2c45a..79e220c3 100644
--- a/federationsender/inthttp/client.go
+++ b/federationsender/inthttp/client.go
@@ -8,6 +8,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/gomatrix"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
@@ -21,6 +22,10 @@ const (
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
+
+ FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices"
+ FederationSenderClaimKeysPath = "/federationsender/client/claimKeys"
+ FederationSenderQueryKeysPath = "/federationsender/client/queryKeys"
)
// NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
@@ -133,3 +138,93 @@ func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU(
apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+
+type getUserDevices struct {
+ S gomatrixserverlib.ServerName
+ UserID string
+ Res *gomatrixserverlib.RespUserDevices
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) GetUserDevices(
+ ctx context.Context, s gomatrixserverlib.ServerName, userID string,
+) (gomatrixserverlib.RespUserDevices, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "GetUserDevices")
+ defer span.Finish()
+
+ var result gomatrixserverlib.RespUserDevices
+ request := getUserDevices{
+ S: s,
+ UserID: userID,
+ }
+ var response getUserDevices
+ apiURL := h.federationSenderURL + FederationSenderGetUserDevicesPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return result, err
+ }
+ if response.Err != nil {
+ return result, response.Err
+ }
+ return *response.Res, nil
+}
+
+type claimKeys struct {
+ S gomatrixserverlib.ServerName
+ OneTimeKeys map[string]map[string]string
+ Res *gomatrixserverlib.RespClaimKeys
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) ClaimKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string,
+) (gomatrixserverlib.RespClaimKeys, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "ClaimKeys")
+ defer span.Finish()
+
+ var result gomatrixserverlib.RespClaimKeys
+ request := claimKeys{
+ S: s,
+ OneTimeKeys: oneTimeKeys,
+ }
+ var response claimKeys
+ apiURL := h.federationSenderURL + FederationSenderClaimKeysPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return result, err
+ }
+ if response.Err != nil {
+ return result, response.Err
+ }
+ return *response.Res, nil
+}
+
+type queryKeys struct {
+ S gomatrixserverlib.ServerName
+ Keys map[string][]string
+ Res *gomatrixserverlib.RespQueryKeys
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) QueryKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
+) (gomatrixserverlib.RespQueryKeys, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeys")
+ defer span.Finish()
+
+ var result gomatrixserverlib.RespQueryKeys
+ request := queryKeys{
+ S: s,
+ Keys: keys,
+ }
+ var response queryKeys
+ apiURL := h.federationSenderURL + FederationSenderQueryKeysPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return result, err
+ }
+ if response.Err != nil {
+ return result, response.Err
+ }
+ return *response.Res, nil
+}
diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go
index f02cbd12..b1825576 100644
--- a/federationsender/inthttp/server.go
+++ b/federationsender/inthttp/server.go
@@ -109,4 +109,70 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(
+ FederationSenderGetUserDevicesPath,
+ httputil.MakeInternalAPI("GetUserDevices", func(req *http.Request) util.JSONResponse {
+ var request getUserDevices
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.GetUserDevices(req.Context(), request.S, request.UserID)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
+ internalAPIMux.Handle(
+ FederationSenderClaimKeysPath,
+ httputil.MakeInternalAPI("ClaimKeys", func(req *http.Request) util.JSONResponse {
+ var request claimKeys
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.ClaimKeys(req.Context(), request.S, request.OneTimeKeys)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
+ internalAPIMux.Handle(
+ FederationSenderQueryKeysPath,
+ httputil.MakeInternalAPI("QueryKeys", func(req *http.Request) util.JSONResponse {
+ var request queryKeys
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.QueryKeys(req.Context(), request.S, request.Keys)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
}
diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go
index a574ceff..03ef64e9 100644
--- a/federationsender/statistics/statistics.go
+++ b/federationsender/statistics/statistics.go
@@ -126,8 +126,19 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
return until, false
}
+// BackoffInfo returns information about the current or previous backoff.
+// Returns the last backoffUntil time and whether the server is currently blacklisted or not.
+func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) {
+ until, ok := s.backoffUntil.Load().(time.Time)
+ if ok {
+ return &until, s.blacklisted.Load()
+ }
+ return nil, s.blacklisted.Load()
+}
+
// BackoffIfRequired will block for as long as the current
// backoff requires, if needed. Otherwise it will do nothing.
+// Returns the amount of time to backoff for and whether to give up or not.
func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) (time.Duration, bool) {
if started := s.backoffStarted.Load(); !started {
return 0, false