diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-03-05 10:40:27 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-05 10:40:27 +0000 |
commit | 1ad96e2e2df9dc1f5fa7d31522babd6a64ca517f (patch) | |
tree | 166e2051273fc89f50d6efac1c40a12cd7c077a7 /appservice | |
parent | 9557ccada4efe50d0f370019ad0b9f017fc7ebcf (diff) |
Tweak AS registration check and AS component HTTP clients (#1785)
* Tweak AS registration check
* Check appservice usernames using correct function
* Update sytest-whitelist
* Use gomatrixserverlib.Client since that allows us to disable TLS validation using the config
* Add appservice-specific client and ability to control TLS validation for appservices only
* Set timeout on appservice client
* Review comments
* Remove dead code
* Enforce LoginTypeApplicationService after all
* Check correct auth type field
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/appservice.go | 11 | ||||
-rw-r--r-- | appservice/query/query.go | 25 | ||||
-rw-r--r-- | appservice/workers/transaction_scheduler.go | 16 |
3 files changed, 13 insertions, 39 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go index d783c7eb..f608e8e7 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -16,9 +16,7 @@ package appservice import ( "context" - "net/http" "sync" - "time" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -48,6 +46,7 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { + client := base.CreateAppserviceClient() consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) // Create a connection to the appservice postgres DB @@ -79,10 +78,8 @@ func NewInternalAPI( // Create appserivce query API with an HTTP client that will be used for all // outbound and inbound requests (inbound only for the internal API) appserviceQueryAPI := &query.AppServiceQueryAPI{ - HTTPClient: &http.Client{ - Timeout: time.Second * 30, - }, - Cfg: base.Cfg, + HTTPClient: client, + Cfg: base.Cfg, } // Only consume if we actually have ASes to track, else we'll just chew cycles needlessly. @@ -98,7 +95,7 @@ func NewInternalAPI( } // Create application service transaction workers - if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil { + if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil { logrus.WithError(err).Panicf("failed to start app service transaction workers") } return appserviceQueryAPI diff --git a/appservice/query/query.go b/appservice/query/query.go index 7e5ac475..b4c33528 100644 --- a/appservice/query/query.go +++ b/appservice/query/query.go @@ -20,10 +20,10 @@ import ( "context" "net/http" "net/url" - "time" "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" opentracing "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" ) @@ -33,7 +33,7 @@ const userIDExistsPath = "/users/" // AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI type AppServiceQueryAPI struct { - HTTPClient *http.Client + HTTPClient *gomatrixserverlib.Client Cfg *config.Dendrite } @@ -47,11 +47,6 @@ func (a *AppServiceQueryAPI) RoomAliasExists( span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias") defer span.Finish() - // Create an HTTP client if one does not already exist - if a.HTTPClient == nil { - a.HTTPClient = makeHTTPClient() - } - // Determine which application service should handle this request for _, appservice := range a.Cfg.Derived.ApplicationServices { if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) { @@ -68,7 +63,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists( } req = req.WithContext(ctx) - resp, err := a.HTTPClient.Do(req) + resp, err := a.HTTPClient.DoHTTPRequest(ctx, req) if resp != nil { defer func() { err = resp.Body.Close() @@ -115,11 +110,6 @@ func (a *AppServiceQueryAPI) UserIDExists( span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID") defer span.Finish() - // Create an HTTP client if one does not already exist - if a.HTTPClient == nil { - a.HTTPClient = makeHTTPClient() - } - // Determine which application service should handle this request for _, appservice := range a.Cfg.Derived.ApplicationServices { if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) { @@ -134,7 +124,7 @@ func (a *AppServiceQueryAPI) UserIDExists( if err != nil { return err } - resp, err := a.HTTPClient.Do(req.WithContext(ctx)) + resp, err := a.HTTPClient.DoHTTPRequest(ctx, req) if resp != nil { defer func() { err = resp.Body.Close() @@ -169,10 +159,3 @@ func (a *AppServiceQueryAPI) UserIDExists( response.UserIDExists = false return nil } - -// makeHTTPClient creates an HTTP client with certain options that will be used for all query requests to application services -func makeHTTPClient() *http.Client { - return &http.Client{ - Timeout: time.Second * 30, - } -} diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index 45748c21..47d447c2 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -34,8 +34,6 @@ import ( var ( // Maximum size of events sent in each transaction. transactionBatchSize = 50 - // Timeout for sending a single transaction to an application service. - transactionTimeout = time.Second * 60 ) // SetupTransactionWorkers spawns a separate goroutine for each application @@ -44,6 +42,7 @@ var ( // size), then send that off to the AS's /transactions/{txnID} endpoint. It also // handles exponentially backing off in case the AS isn't currently available. func SetupTransactionWorkers( + client *gomatrixserverlib.Client, appserviceDB storage.Database, workerStates []types.ApplicationServiceWorkerState, ) error { @@ -51,7 +50,7 @@ func SetupTransactionWorkers( for _, workerState := range workerStates { // Don't create a worker if this AS doesn't want to receive events if workerState.AppService.URL != "" { - go worker(appserviceDB, workerState) + go worker(client, appserviceDB, workerState) } } return nil @@ -59,17 +58,12 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. -func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { +func worker(client *gomatrixserverlib.Client, db storage.Database, ws types.ApplicationServiceWorkerState) { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, }).Info("Starting application service") ctx := context.Background() - // Create a HTTP client for sending requests to app services - client := &http.Client{ - Timeout: transactionTimeout, - } - // Initial check for any leftover events to send from last time eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) if err != nil { @@ -206,7 +200,7 @@ func createTransaction( // send sends events to an application service. Returns an error if an OK was not // received back from the application service or the request timed out. func send( - client *http.Client, + client *gomatrixserverlib.Client, appservice config.ApplicationService, txnID int, transaction []byte, @@ -219,7 +213,7 @@ func send( return err } req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) + resp, err := client.DoHTTPRequest(context.TODO(), req) if err != nil { return err } |