aboutsummaryrefslogtreecommitdiff
path: root/appservice/workers
diff options
context:
space:
mode:
authorruben <code@rbn.im>2019-05-21 22:56:55 +0200
committerBrendan Abolivier <babolivier@matrix.org>2019-05-21 21:56:55 +0100
commit74827428bd3e11faab65f12204449c1b9469b0ae (patch)
tree0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /appservice/workers
parent4d588f7008afe5600219ac0930c2eee2de5c447b (diff)
use go module for dependencies (#594)
Diffstat (limited to 'appservice/workers')
-rw-r--r--appservice/workers/transaction_scheduler.go227
1 files changed, 227 insertions, 0 deletions
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
new file mode 100644
index 00000000..0330eb9e
--- /dev/null
+++ b/appservice/workers/transaction_scheduler.go
@@ -0,0 +1,227 @@
+// Copyright 2018 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package workers
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "math"
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+var (
+ // Maximum size of events sent in each transaction.
+ transactionBatchSize = 50
+ // Timeout for sending a single transaction to an application service.
+ transactionTimeout = time.Second * 60
+)
+
+// SetupTransactionWorkers spawns a separate goroutine for each application
+// service. Each of these "workers" handle taking all events intended for their
+// app service, batch them up into a single transaction (up to a max transaction
+// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
+// handles exponentially backing off in case the AS isn't currently available.
+func SetupTransactionWorkers(
+ appserviceDB *storage.Database,
+ workerStates []types.ApplicationServiceWorkerState,
+) error {
+ // Create a worker that handles transmitting events to a single homeserver
+ for _, workerState := range workerStates {
+ // Don't create a worker if this AS doesn't want to receive events
+ if workerState.AppService.URL != "" {
+ go worker(appserviceDB, workerState)
+ }
+ }
+ return nil
+}
+
+// worker is a goroutine that sends any queued events to the application service
+// it is given.
+func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).Info("starting application service")
+ ctx := context.Background()
+
+ // Create a HTTP client for sending requests to app services
+ client := &http.Client{
+ Timeout: transactionTimeout,
+ }
+
+ // Initial check for any leftover events to send from last time
+ eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("appservice worker unable to read queued events from DB")
+ return
+ }
+ if eventCount > 0 {
+ ws.NotifyNewEvents()
+ }
+
+ // Loop forever and keep waiting for more events to send
+ for {
+ // Wait for more events if we've sent all the events in the database
+ ws.WaitForNewEvents()
+
+ // Batch events up into a transaction
+ transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("appservice worker unable to create transaction")
+
+ return
+ }
+
+ // Send the events off to the application service
+ // Backoff if the application service does not respond
+ err = send(client, ws.AppService, txnID, transactionJSON)
+ if err != nil {
+ // Backoff
+ backoff(&ws, err)
+ continue
+ }
+
+ // We sent successfully, hooray!
+ ws.Backoff = 0
+
+ // Transactions have a maximum event size, so there may still be some events
+ // left over to send. Keep sending until none are left
+ if !eventsRemaining {
+ ws.FinishEventProcessing()
+ }
+
+ // Remove sent events from the DB
+ err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("unable to remove appservice events from the database")
+ return
+ }
+ }
+}
+
+// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
+func backoff(ws *types.ApplicationServiceWorkerState, err error) {
+ // Calculate how long to backoff for
+ backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
+ backoffSeconds := time.Second * backoffDuration
+
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
+ backoffDuration)
+
+ ws.Backoff++
+ if ws.Backoff > 6 {
+ ws.Backoff = 6
+ }
+
+ // Backoff
+ time.Sleep(backoffSeconds)
+}
+
+// createTransaction takes in a slice of AS events, stores them in an AS
+// transaction, and JSON-encodes the results.
+func createTransaction(
+ ctx context.Context,
+ db *storage.Database,
+ appserviceID string,
+) (
+ transactionJSON []byte,
+ txnID, maxID int,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
+ txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": appserviceID,
+ }).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
+
+ return
+ }
+
+ // Check if these events do not already have a transaction ID
+ if txnID == -1 {
+ // If not, grab next available ID from the DB
+ txnID, err = db.GetLatestTxnID(ctx)
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Mark new events with current transactionID
+ if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
+ return nil, 0, 0, false, err
+ }
+ }
+
+ // Create a transaction and store the events inside
+ transaction := gomatrixserverlib.ApplicationServiceTransaction{
+ Events: events,
+ }
+
+ transactionJSON, err = json.Marshal(transaction)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+// send sends events to an application service. Returns an error if an OK was not
+// received back from the application service or the request timed out.
+func send(
+ client *http.Client,
+ appservice config.ApplicationService,
+ txnID int,
+ transaction []byte,
+) error {
+ // POST a transaction to our AS
+ address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
+ resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
+ if err != nil {
+ return err
+ }
+ defer func() {
+ err := resp.Body.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": appservice.ID,
+ }).WithError(err).Error("unable to close response body from application service")
+ }
+ }()
+
+ // Check the AS received the events correctly
+ if resp.StatusCode != http.StatusOK {
+ // TODO: Handle non-200 error codes from application services
+ return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
+ }
+
+ return nil
+}