diff options
author | ruben <code@rbn.im> | 2019-05-21 22:56:55 +0200 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-05-21 21:56:55 +0100 |
commit | 74827428bd3e11faab65f12204449c1b9469b0ae (patch) | |
tree | 0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /appservice/workers | |
parent | 4d588f7008afe5600219ac0930c2eee2de5c447b (diff) |
use go module for dependencies (#594)
Diffstat (limited to 'appservice/workers')
-rw-r--r-- | appservice/workers/transaction_scheduler.go | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go new file mode 100644 index 00000000..0330eb9e --- /dev/null +++ b/appservice/workers/transaction_scheduler.go @@ -0,0 +1,227 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workers + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math" + "net/http" + "time" + + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +var ( + // Maximum size of events sent in each transaction. + transactionBatchSize = 50 + // Timeout for sending a single transaction to an application service. + transactionTimeout = time.Second * 60 +) + +// SetupTransactionWorkers spawns a separate goroutine for each application +// service. Each of these "workers" handle taking all events intended for their +// app service, batch them up into a single transaction (up to a max transaction +// size), then send that off to the AS's /transactions/{txnID} endpoint. It also +// handles exponentially backing off in case the AS isn't currently available. +func SetupTransactionWorkers( + appserviceDB *storage.Database, + workerStates []types.ApplicationServiceWorkerState, +) error { + // Create a worker that handles transmitting events to a single homeserver + for _, workerState := range workerStates { + // Don't create a worker if this AS doesn't want to receive events + if workerState.AppService.URL != "" { + go worker(appserviceDB, workerState) + } + } + return nil +} + +// worker is a goroutine that sends any queued events to the application service +// it is given. +func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).Info("starting application service") + ctx := context.Background() + + // Create a HTTP client for sending requests to app services + client := &http.Client{ + Timeout: transactionTimeout, + } + + // Initial check for any leftover events to send from last time + eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to read queued events from DB") + return + } + if eventCount > 0 { + ws.NotifyNewEvents() + } + + // Loop forever and keep waiting for more events to send + for { + // Wait for more events if we've sent all the events in the database + ws.WaitForNewEvents() + + // Batch events up into a transaction + transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to create transaction") + + return + } + + // Send the events off to the application service + // Backoff if the application service does not respond + err = send(client, ws.AppService, txnID, transactionJSON) + if err != nil { + // Backoff + backoff(&ws, err) + continue + } + + // We sent successfully, hooray! + ws.Backoff = 0 + + // Transactions have a maximum event size, so there may still be some events + // left over to send. Keep sending until none are left + if !eventsRemaining { + ws.FinishEventProcessing() + } + + // Remove sent events from the DB + err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("unable to remove appservice events from the database") + return + } + } +} + +// backoff pauses the calling goroutine for a 2^some backoff exponent seconds +func backoff(ws *types.ApplicationServiceWorkerState, err error) { + // Calculate how long to backoff for + backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) + backoffSeconds := time.Second * backoffDuration + + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds", + backoffDuration) + + ws.Backoff++ + if ws.Backoff > 6 { + ws.Backoff = 6 + } + + // Backoff + time.Sleep(backoffSeconds) +} + +// createTransaction takes in a slice of AS events, stores them in an AS +// transaction, and JSON-encodes the results. +func createTransaction( + ctx context.Context, + db *storage.Database, + appserviceID string, +) ( + transactionJSON []byte, + txnID, maxID int, + eventsRemaining bool, + err error, +) { + // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) + txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) + if err != nil { + log.WithFields(log.Fields{ + "appservice": appserviceID, + }).WithError(err).Fatalf("appservice worker unable to read queued events from DB") + + return + } + + // Check if these events do not already have a transaction ID + if txnID == -1 { + // If not, grab next available ID from the DB + txnID, err = db.GetLatestTxnID(ctx) + if err != nil { + return nil, 0, 0, false, err + } + + // Mark new events with current transactionID + if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil { + return nil, 0, 0, false, err + } + } + + // Create a transaction and store the events inside + transaction := gomatrixserverlib.ApplicationServiceTransaction{ + Events: events, + } + + transactionJSON, err = json.Marshal(transaction) + if err != nil { + return + } + + return +} + +// send sends events to an application service. Returns an error if an OK was not +// received back from the application service or the request timed out. +func send( + client *http.Client, + appservice config.ApplicationService, + txnID int, + transaction []byte, +) error { + // POST a transaction to our AS + address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID) + resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) + if err != nil { + return err + } + defer func() { + err := resp.Body.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": appservice.ID, + }).WithError(err).Error("unable to close response body from application service") + } + }() + + // Check the AS received the events correctly + if resp.StatusCode != http.StatusOK { + // TODO: Handle non-200 error codes from application services + return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode) + } + + return nil +} |