diff options
author | Kegsay <kegan@matrix.org> | 2020-08-20 17:03:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-20 17:03:07 +0100 |
commit | 6d6bb7513710db1009c474eff031434916feda1b (patch) | |
tree | 62eb40ba6944580a7cf6da1541d7070ad65bd362 /federationsender | |
parent | 068a3d3c9f9be3473b68e3a13912182caf1c7117 (diff) |
Add FederationClient interface to federationsender (#1284)
* Add FederationClient interface to federationsender
- Use a shim struct in HTTP mode to keep the same API as `FederationClient`.
- Use `federationsender` instead of `FederationClient` in `keyserver`.
* Pointers not values
* Review comments
* Fix unit tests
* Rejig backoff
* Unbreak test
* Remove debug logs
* Review comments and linting
Diffstat (limited to 'federationsender')
-rw-r--r-- | federationsender/api/api.go | 24 | ||||
-rw-r--r-- | federationsender/internal/api.go | 104 | ||||
-rw-r--r-- | federationsender/inthttp/client.go | 95 | ||||
-rw-r--r-- | federationsender/inthttp/server.go | 66 | ||||
-rw-r--r-- | federationsender/statistics/statistics.go | 11 |
5 files changed, 297 insertions, 3 deletions
diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 9f9c2645..cea0010d 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -2,14 +2,38 @@ package api import ( "context" + "fmt" + "time" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) +// FederationClient is a subset of gomatrixserverlib.FederationClient functions which the fedsender +// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in +// this interface are of type FederationClientError +type FederationClient interface { + GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error) + ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error) + QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error) +} + +// FederationClientError is returned from FederationClient methods in the event of a problem. +type FederationClientError struct { + Err string + RetryAfter time.Duration + Blacklisted bool +} + +func (e *FederationClientError) Error() string { + return fmt.Sprintf("%s - (retry_after=%d, blacklisted=%v)", e.Err, e.RetryAfter, e.Blacklisted) +} + // FederationSenderInternalAPI is used to query information from the federation sender. type FederationSenderInternalAPI interface { + FederationClient + // PerformDirectoryLookup looks up a remote room ID from a room alias. PerformDirectoryLookup( ctx context.Context, diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 647e3fcb..6b5f4c34 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -1,11 +1,16 @@ package internal import ( + "context" + "time" + + "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/roomserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) @@ -14,7 +19,7 @@ type FederationSenderInternalAPI struct { db storage.Database cfg *config.FederationSender statistics *statistics.Statistics - rsAPI api.RoomserverInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing queues *queue.OutgoingQueues @@ -22,7 +27,7 @@ type FederationSenderInternalAPI struct { func NewFederationSenderInternalAPI( db storage.Database, cfg *config.FederationSender, - rsAPI api.RoomserverInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, statistics *statistics.Statistics, @@ -38,3 +43,96 @@ func NewFederationSenderInternalAPI( queues: queues, } } + +func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) { + stats := a.statistics.ForServer(s) + until, blacklisted := stats.BackoffInfo() + if blacklisted { + return stats, &api.FederationClientError{ + Blacklisted: true, + } + } + now := time.Now() + if until != nil && now.Before(*until) { + return stats, &api.FederationClientError{ + RetryAfter: time.Until(*until), + } + } + + return stats, nil +} + +func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) { + if err == nil { + return + } + mxerr, ok := err.(gomatrix.HTTPError) + if !ok { + return stats.Failure() + } + if mxerr.Code >= 500 && mxerr.Code < 600 { + return stats.Failure() + } + return +} + +func (a *FederationSenderInternalAPI) doRequest( + s gomatrixserverlib.ServerName, request func() (interface{}, error), +) (interface{}, error) { + stats, err := a.isBlacklistedOrBackingOff(s) + if err != nil { + return nil, err + } + res, err := request() + if err != nil { + until, blacklisted := failBlacklistableError(err, stats) + now := time.Now() + var retryAfter time.Duration + if until.After(now) { + retryAfter = time.Until(until) + } + return res, &api.FederationClientError{ + Err: err.Error(), + Blacklisted: blacklisted, + RetryAfter: retryAfter, + } + } + stats.Success() + return res, nil +} + +func (a *FederationSenderInternalAPI) GetUserDevices( + ctx context.Context, s gomatrixserverlib.ServerName, userID string, +) (gomatrixserverlib.RespUserDevices, error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.GetUserDevices(ctx, s, userID) + }) + if err != nil { + return gomatrixserverlib.RespUserDevices{}, err + } + return ires.(gomatrixserverlib.RespUserDevices), nil +} + +func (a *FederationSenderInternalAPI) ClaimKeys( + ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, +) (gomatrixserverlib.RespClaimKeys, error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.ClaimKeys(ctx, s, oneTimeKeys) + }) + if err != nil { + return gomatrixserverlib.RespClaimKeys{}, err + } + return ires.(gomatrixserverlib.RespClaimKeys), nil +} + +func (a *FederationSenderInternalAPI) QueryKeys( + ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, +) (gomatrixserverlib.RespQueryKeys, error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.QueryKeys(ctx, s, keys) + }) + if err != nil { + return gomatrixserverlib.RespQueryKeys{}, err + } + return ires.(gomatrixserverlib.RespQueryKeys), nil +} diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 13c2c45a..79e220c3 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/gomatrix" + "github.com/matrix-org/gomatrixserverlib" "github.com/opentracing/opentracing-go" ) @@ -21,6 +22,10 @@ const ( FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" + + FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices" + FederationSenderClaimKeysPath = "/federationsender/client/claimKeys" + FederationSenderQueryKeysPath = "/federationsender/client/queryKeys" ) // NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. @@ -133,3 +138,93 @@ func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU( apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +type getUserDevices struct { + S gomatrixserverlib.ServerName + UserID string + Res *gomatrixserverlib.RespUserDevices + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) GetUserDevices( + ctx context.Context, s gomatrixserverlib.ServerName, userID string, +) (gomatrixserverlib.RespUserDevices, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetUserDevices") + defer span.Finish() + + var result gomatrixserverlib.RespUserDevices + request := getUserDevices{ + S: s, + UserID: userID, + } + var response getUserDevices + apiURL := h.federationSenderURL + FederationSenderGetUserDevicesPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} + +type claimKeys struct { + S gomatrixserverlib.ServerName + OneTimeKeys map[string]map[string]string + Res *gomatrixserverlib.RespClaimKeys + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) ClaimKeys( + ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, +) (gomatrixserverlib.RespClaimKeys, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "ClaimKeys") + defer span.Finish() + + var result gomatrixserverlib.RespClaimKeys + request := claimKeys{ + S: s, + OneTimeKeys: oneTimeKeys, + } + var response claimKeys + apiURL := h.federationSenderURL + FederationSenderClaimKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} + +type queryKeys struct { + S gomatrixserverlib.ServerName + Keys map[string][]string + Res *gomatrixserverlib.RespQueryKeys + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) QueryKeys( + ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, +) (gomatrixserverlib.RespQueryKeys, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeys") + defer span.Finish() + + var result gomatrixserverlib.RespQueryKeys + request := queryKeys{ + S: s, + Keys: keys, + } + var response queryKeys + apiURL := h.federationSenderURL + FederationSenderQueryKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go index f02cbd12..b1825576 100644 --- a/federationsender/inthttp/server.go +++ b/federationsender/inthttp/server.go @@ -109,4 +109,70 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle( + FederationSenderGetUserDevicesPath, + httputil.MakeInternalAPI("GetUserDevices", func(req *http.Request) util.JSONResponse { + var request getUserDevices + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.GetUserDevices(req.Context(), request.S, request.UserID) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderClaimKeysPath, + httputil.MakeInternalAPI("ClaimKeys", func(req *http.Request) util.JSONResponse { + var request claimKeys + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.ClaimKeys(req.Context(), request.S, request.OneTimeKeys) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderQueryKeysPath, + httputil.MakeInternalAPI("QueryKeys", func(req *http.Request) util.JSONResponse { + var request queryKeys + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.QueryKeys(req.Context(), request.S, request.Keys) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) } diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index a574ceff..03ef64e9 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -126,8 +126,19 @@ func (s *ServerStatistics) Failure() (time.Time, bool) { return until, false } +// BackoffInfo returns information about the current or previous backoff. +// Returns the last backoffUntil time and whether the server is currently blacklisted or not. +func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) { + until, ok := s.backoffUntil.Load().(time.Time) + if ok { + return &until, s.blacklisted.Load() + } + return nil, s.blacklisted.Load() +} + // BackoffIfRequired will block for as long as the current // backoff requires, if needed. Otherwise it will do nothing. +// Returns the amount of time to backoff for and whether to give up or not. func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) (time.Duration, bool) { if started := s.backoffStarted.Load(); !started { return 0, false |