aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorruben <code@rbn.im>2019-05-21 22:56:55 +0200
committerBrendan Abolivier <babolivier@matrix.org>2019-05-21 21:56:55 +0100
commit74827428bd3e11faab65f12204449c1b9469b0ae (patch)
tree0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /appservice
parent4d588f7008afe5600219ac0930c2eee2de5c447b (diff)
use go module for dependencies (#594)
Diffstat (limited to 'appservice')
-rw-r--r--appservice/README.md10
-rw-r--r--appservice/api/query.go178
-rw-r--r--appservice/appservice.go132
-rw-r--r--appservice/consumers/roomserver.go210
-rw-r--r--appservice/query/query.go214
-rw-r--r--appservice/routing/routing.go61
-rw-r--r--appservice/storage/appservice_events_table.go248
-rw-r--r--appservice/storage/storage.go110
-rw-r--r--appservice/storage/txn_id_counter_table.go52
-rw-r--r--appservice/types/types.go64
-rw-r--r--appservice/workers/transaction_scheduler.go227
11 files changed, 1506 insertions, 0 deletions
diff --git a/appservice/README.md b/appservice/README.md
new file mode 100644
index 00000000..d7555744
--- /dev/null
+++ b/appservice/README.md
@@ -0,0 +1,10 @@
+# Application Service
+
+This component interfaces with external [Application
+Services](https://matrix.org/docs/spec/application_service/unstable.html).
+This includes any HTTP endpoints that application services call, as well as talking
+to any HTTP endpoints that application services provide themselves.
+
+## Consumers
+
+This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services. \ No newline at end of file
diff --git a/appservice/api/query.go b/appservice/api/query.go
new file mode 100644
index 00000000..9ec21448
--- /dev/null
+++ b/appservice/api/query.go
@@ -0,0 +1,178 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package api contains methods used by dendrite components in multi-process
+// mode to send requests to the appservice component, typically in order to ask
+// an application service for some information.
+package api
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "net/http"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ commonHTTP "github.com/matrix-org/dendrite/common/http"
+ opentracing "github.com/opentracing/opentracing-go"
+)
+
+// RoomAliasExistsRequest is a request to an application service
+// about whether a room alias exists
+type RoomAliasExistsRequest struct {
+ // Alias we want to lookup
+ Alias string `json:"alias"`
+}
+
+// RoomAliasExistsResponse is a response from an application service
+// about whether a room alias exists
+type RoomAliasExistsResponse struct {
+ AliasExists bool `json:"exists"`
+}
+
+// UserIDExistsRequest is a request to an application service about whether a
+// user ID exists
+type UserIDExistsRequest struct {
+ // UserID we want to lookup
+ UserID string `json:"user_id"`
+}
+
+// UserIDExistsRequestAccessToken is a request to an application service
+// about whether a user ID exists. Includes an access token
+type UserIDExistsRequestAccessToken struct {
+ // UserID we want to lookup
+ UserID string `json:"user_id"`
+ AccessToken string `json:"access_token"`
+}
+
+// UserIDExistsResponse is a response from an application service about
+// whether a user ID exists
+type UserIDExistsResponse struct {
+ UserIDExists bool `json:"exists"`
+}
+
+// AppServiceQueryAPI is used to query user and room alias data from application
+// services
+type AppServiceQueryAPI interface {
+ // Check whether a room alias exists within any application service namespaces
+ RoomAliasExists(
+ ctx context.Context,
+ req *RoomAliasExistsRequest,
+ resp *RoomAliasExistsResponse,
+ ) error
+ // Check whether a user ID exists within any application service namespaces
+ UserIDExists(
+ ctx context.Context,
+ req *UserIDExistsRequest,
+ resp *UserIDExistsResponse,
+ ) error
+}
+
+// AppServiceRoomAliasExistsPath is the HTTP path for the RoomAliasExists API
+const AppServiceRoomAliasExistsPath = "/api/appservice/RoomAliasExists"
+
+// AppServiceUserIDExistsPath is the HTTP path for the UserIDExists API
+const AppServiceUserIDExistsPath = "/api/appservice/UserIDExists"
+
+// httpAppServiceQueryAPI contains the URL to an appservice query API and a
+// reference to a httpClient used to reach it
+type httpAppServiceQueryAPI struct {
+ appserviceURL string
+ httpClient *http.Client
+}
+
+// NewAppServiceQueryAPIHTTP creates a AppServiceQueryAPI implemented by talking
+// to a HTTP POST API.
+// If httpClient is nil then it uses http.DefaultClient
+func NewAppServiceQueryAPIHTTP(
+ appserviceURL string,
+ httpClient *http.Client,
+) AppServiceQueryAPI {
+ if httpClient == nil {
+ httpClient = http.DefaultClient
+ }
+ return &httpAppServiceQueryAPI{appserviceURL, httpClient}
+}
+
+// RoomAliasExists implements AppServiceQueryAPI
+func (h *httpAppServiceQueryAPI) RoomAliasExists(
+ ctx context.Context,
+ request *RoomAliasExistsRequest,
+ response *RoomAliasExistsResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "appserviceRoomAliasExists")
+ defer span.Finish()
+
+ apiURL := h.appserviceURL + AppServiceRoomAliasExistsPath
+ return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
+// UserIDExists implements AppServiceQueryAPI
+func (h *httpAppServiceQueryAPI) UserIDExists(
+ ctx context.Context,
+ request *UserIDExistsRequest,
+ response *UserIDExistsResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "appserviceUserIDExists")
+ defer span.Finish()
+
+ apiURL := h.appserviceURL + AppServiceUserIDExistsPath
+ return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
+// RetreiveUserProfile is a wrapper that queries both the local database and
+// application services for a given user's profile
+func RetreiveUserProfile(
+ ctx context.Context,
+ userID string,
+ asAPI AppServiceQueryAPI,
+ accountDB *accounts.Database,
+) (*authtypes.Profile, error) {
+ localpart, _, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return nil, err
+ }
+
+ // Try to query the user from the local database
+ profile, err := accountDB.GetProfileByLocalpart(ctx, localpart)
+ if err != nil && err != sql.ErrNoRows {
+ return nil, err
+ } else if profile != nil {
+ return profile, nil
+ }
+
+ // Query the appservice component for the existence of an AS user
+ userReq := UserIDExistsRequest{UserID: userID}
+ var userResp UserIDExistsResponse
+ if err = asAPI.UserIDExists(ctx, &userReq, &userResp); err != nil {
+ return nil, err
+ }
+
+ // If no user exists, return
+ if !userResp.UserIDExists {
+ return nil, errors.New("no known profile for given user ID")
+ }
+
+ // Try to query the user from the local database again
+ profile, err = accountDB.GetProfileByLocalpart(ctx, localpart)
+ if err != nil {
+ return nil, err
+ }
+
+ // profile should not be nil at this point
+ return profile, nil
+}
diff --git a/appservice/appservice.go b/appservice/appservice.go
new file mode 100644
index 00000000..8703959f
--- /dev/null
+++ b/appservice/appservice.go
@@ -0,0 +1,132 @@
+// Copyright 2018 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package appservice
+
+import (
+ "context"
+ "net/http"
+ "sync"
+ "time"
+
+ appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
+ "github.com/matrix-org/dendrite/appservice/consumers"
+ "github.com/matrix-org/dendrite/appservice/query"
+ "github.com/matrix-org/dendrite/appservice/routing"
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
+ "github.com/matrix-org/dendrite/appservice/workers"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
+ "github.com/matrix-org/dendrite/common/basecomponent"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/common/transactions"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+)
+
+// SetupAppServiceAPIComponent sets up and registers HTTP handlers for the AppServices
+// component.
+func SetupAppServiceAPIComponent(
+ base *basecomponent.BaseDendrite,
+ accountsDB *accounts.Database,
+ deviceDB *devices.Database,
+ federation *gomatrixserverlib.FederationClient,
+ roomserverAliasAPI roomserverAPI.RoomserverAliasAPI,
+ roomserverQueryAPI roomserverAPI.RoomserverQueryAPI,
+ transactionsCache *transactions.Cache,
+) appserviceAPI.AppServiceQueryAPI {
+ // Create a connection to the appservice postgres DB
+ appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService))
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to appservice db")
+ }
+
+ // Wrap application services in a type that relates the application service and
+ // a sync.Cond object that can be used to notify workers when there are new
+ // events to be sent out.
+ workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
+ for i, appservice := range base.Cfg.Derived.ApplicationServices {
+ m := sync.Mutex{}
+ ws := types.ApplicationServiceWorkerState{
+ AppService: appservice,
+ Cond: sync.NewCond(&m),
+ }
+ workerStates[i] = ws
+
+ // Create bot account for this AS if it doesn't already exist
+ if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil {
+ logrus.WithFields(logrus.Fields{
+ "appservice": appservice.ID,
+ }).WithError(err).Panicf("failed to generate bot account for appservice")
+ }
+ }
+
+ // Create appserivce query API with an HTTP client that will be used for all
+ // outbound and inbound requests (inbound only for the internal API)
+ appserviceQueryAPI := query.AppServiceQueryAPI{
+ HTTPClient: &http.Client{
+ Timeout: time.Second * 30,
+ },
+ Cfg: base.Cfg,
+ }
+
+ appserviceQueryAPI.SetupHTTP(http.DefaultServeMux)
+
+ consumer := consumers.NewOutputRoomEventConsumer(
+ base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
+ roomserverQueryAPI, roomserverAliasAPI, workerStates,
+ )
+ if err := consumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
+ }
+
+ // Create application service transaction workers
+ if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
+ logrus.WithError(err).Panicf("failed to start app service transaction workers")
+ }
+
+ // Set up HTTP Endpoints
+ routing.Setup(
+ base.APIMux, *base.Cfg, roomserverQueryAPI, roomserverAliasAPI,
+ accountsDB, federation, transactionsCache,
+ )
+
+ return &appserviceQueryAPI
+}
+
+// generateAppServiceAccounts creates a dummy account based off the
+// `sender_localpart` field of each application service if it doesn't
+// exist already
+func generateAppServiceAccount(
+ accountsDB *accounts.Database,
+ deviceDB *devices.Database,
+ as config.ApplicationService,
+) error {
+ ctx := context.Background()
+
+ // Create an account for the application service
+ acc, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID)
+ if err != nil {
+ return err
+ } else if acc == nil {
+ // This account already exists
+ return nil
+ }
+
+ // Create a dummy device with a dummy token for the application service
+ _, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart)
+ return err
+}
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
new file mode 100644
index 00000000..dbdae532
--- /dev/null
+++ b/appservice/consumers/roomserver.go
@@ -0,0 +1,210 @@
+// Copyright 2018 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ log "github.com/sirupsen/logrus"
+ sarama "gopkg.in/Shopify/sarama.v1"
+)
+
+// OutputRoomEventConsumer consumes events that originated in the room server.
+type OutputRoomEventConsumer struct {
+ roomServerConsumer *common.ContinualConsumer
+ db *accounts.Database
+ asDB *storage.Database
+ query api.RoomserverQueryAPI
+ alias api.RoomserverAliasAPI
+ serverName string
+ workerStates []types.ApplicationServiceWorkerState
+}
+
+// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
+// Start() to begin consuming from room servers.
+func NewOutputRoomEventConsumer(
+ cfg *config.Dendrite,
+ kafkaConsumer sarama.Consumer,
+ store *accounts.Database,
+ appserviceDB *storage.Database,
+ queryAPI api.RoomserverQueryAPI,
+ aliasAPI api.RoomserverAliasAPI,
+ workerStates []types.ApplicationServiceWorkerState,
+) *OutputRoomEventConsumer {
+ consumer := common.ContinualConsumer{
+ Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
+ Consumer: kafkaConsumer,
+ PartitionStore: store,
+ }
+ s := &OutputRoomEventConsumer{
+ roomServerConsumer: &consumer,
+ db: store,
+ asDB: appserviceDB,
+ query: queryAPI,
+ alias: aliasAPI,
+ serverName: string(cfg.Matrix.ServerName),
+ workerStates: workerStates,
+ }
+ consumer.ProcessMessage = s.onMessage
+
+ return s
+}
+
+// Start consuming from room servers
+func (s *OutputRoomEventConsumer) Start() error {
+ return s.roomServerConsumer.Start()
+}
+
+// onMessage is called when the appservice component receives a new event from
+// the room server output log.
+func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Value, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return nil
+ }
+
+ if output.Type != api.OutputTypeNewRoomEvent {
+ log.WithField("type", output.Type).Debug(
+ "roomserver output log: ignoring unknown output type",
+ )
+ return nil
+ }
+
+ ev := output.NewRoomEvent.Event
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "room_id": ev.RoomID(),
+ "type": ev.Type(),
+ }).Info("appservice received an event from roomserver")
+
+ missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
+ if err != nil {
+ return err
+ }
+ events := append(missingEvents, ev)
+
+ // Send event to any relevant application services
+ return s.filterRoomserverEvents(context.TODO(), events)
+}
+
+// lookupMissingStateEvents looks up the state events that are added by a new event,
+// and returns any not already present.
+func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
+ addsStateEventIDs []string, event gomatrixserverlib.Event,
+) ([]gomatrixserverlib.Event, error) {
+ // Fast path if there aren't any new state events.
+ if len(addsStateEventIDs) == 0 {
+ return []gomatrixserverlib.Event{}, nil
+ }
+
+ // Fast path if the only state event added is the event itself.
+ if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
+ return []gomatrixserverlib.Event{}, nil
+ }
+
+ result := []gomatrixserverlib.Event{}
+ missing := []string{}
+ for _, id := range addsStateEventIDs {
+ if id != event.EventID() {
+ // If the event isn't the current one, add it to the list of events
+ // to retrieve from the roomserver
+ missing = append(missing, id)
+ }
+ }
+
+ // Request the missing events from the roomserver
+ eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
+ var eventResp api.QueryEventsByIDResponse
+ if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ return nil, err
+ }
+
+ result = append(result, eventResp.Events...)
+
+ return result, nil
+}
+
+// filterRoomserverEvents takes in events and decides whether any of them need
+// to be passed on to an external application service. It does this by checking
+// each namespace of each registered application service, and if there is a
+// match, adds the event to the queue for events to be sent to a particular
+// application service.
+func (s *OutputRoomEventConsumer) filterRoomserverEvents(
+ ctx context.Context,
+ events []gomatrixserverlib.Event,
+) error {
+ for _, ws := range s.workerStates {
+ for _, event := range events {
+ // Check if this event is interesting to this application service
+ if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
+ // Queue this event to be sent off to the application service
+ if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
+ log.WithError(err).Warn("failed to insert incoming event into appservices database")
+ } else {
+ // Tell our worker to send out new messages by updating remaining message
+ // count and waking them up with a broadcast
+ ws.NotifyNewEvents()
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+// appserviceIsInterestedInEvent returns a boolean depending on whether a given
+// event falls within one of a given application service's namespaces.
+func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool {
+ // No reason to queue events if they'll never be sent to the application
+ // service
+ if appservice.URL == "" {
+ return false
+ }
+
+ // Check Room ID and Sender of the event
+ if appservice.IsInterestedInUserID(event.Sender()) ||
+ appservice.IsInterestedInRoomID(event.RoomID()) {
+ return true
+ }
+
+ // Check all known room aliases of the room the event came from
+ queryReq := api.GetAliasesForRoomIDRequest{RoomID: event.RoomID()}
+ var queryRes api.GetAliasesForRoomIDResponse
+ if err := s.alias.GetAliasesForRoomID(ctx, &queryReq, &queryRes); err == nil {
+ for _, alias := range queryRes.Aliases {
+ if appservice.IsInterestedInRoomAlias(alias) {
+ return true
+ }
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "room_id": event.RoomID(),
+ }).WithError(err).Errorf("Unable to get aliases for room")
+ }
+
+ return false
+}
diff --git a/appservice/query/query.go b/appservice/query/query.go
new file mode 100644
index 00000000..fde3ab09
--- /dev/null
+++ b/appservice/query/query.go
@@ -0,0 +1,214 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package query handles requests from other internal dendrite components when
+// they interact with the AppServiceQueryAPI.
+package query
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/matrix-org/dendrite/appservice/api"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/util"
+ opentracing "github.com/opentracing/opentracing-go"
+ log "github.com/sirupsen/logrus"
+)
+
+const roomAliasExistsPath = "/rooms/"
+const userIDExistsPath = "/users/"
+
+// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
+type AppServiceQueryAPI struct {
+ HTTPClient *http.Client
+ Cfg *config.Dendrite
+}
+
+// RoomAliasExists performs a request to '/room/{roomAlias}' on all known
+// handling application services until one admits to owning the room
+func (a *AppServiceQueryAPI) RoomAliasExists(
+ ctx context.Context,
+ request *api.RoomAliasExistsRequest,
+ response *api.RoomAliasExistsResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias")
+ defer span.Finish()
+
+ // Create an HTTP client if one does not already exist
+ if a.HTTPClient == nil {
+ a.HTTPClient = makeHTTPClient()
+ }
+
+ // Determine which application service should handle this request
+ for _, appservice := range a.Cfg.Derived.ApplicationServices {
+ if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
+ // The full path to the rooms API, includes hs token
+ URL, err := url.Parse(appservice.URL + roomAliasExistsPath)
+ URL.Path += request.Alias
+ apiURL := URL.String() + "?access_token=" + appservice.HSToken
+
+ // Send a request to each application service. If one responds that it has
+ // created the room, immediately return.
+ req, err := http.NewRequest(http.MethodGet, apiURL, nil)
+ if err != nil {
+ return err
+ }
+ req = req.WithContext(ctx)
+
+ resp, err := a.HTTPClient.Do(req)
+ if resp != nil {
+ defer func() {
+ err = resp.Body.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice_id": appservice.ID,
+ "status_code": resp.StatusCode,
+ }).WithError(err).Error("Unable to close application service response body")
+ }
+ }()
+ }
+ if err != nil {
+ log.WithError(err).Errorf("Issue querying room alias on application service %s", appservice.ID)
+ return err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ // OK received from appservice. Room exists
+ response.AliasExists = true
+ return nil
+ case http.StatusNotFound:
+ // Room does not exist
+ default:
+ // Application service reported an error. Warn
+ log.WithFields(log.Fields{
+ "appservice_id": appservice.ID,
+ "status_code": resp.StatusCode,
+ }).Warn("Application service responded with non-OK status code")
+ }
+ }
+ }
+
+ response.AliasExists = false
+ return nil
+}
+
+// UserIDExists performs a request to '/users/{userID}' on all known
+// handling application services until one admits to owning the user ID
+func (a *AppServiceQueryAPI) UserIDExists(
+ ctx context.Context,
+ request *api.UserIDExistsRequest,
+ response *api.UserIDExistsResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID")
+ defer span.Finish()
+
+ // Create an HTTP client if one does not already exist
+ if a.HTTPClient == nil {
+ a.HTTPClient = makeHTTPClient()
+ }
+
+ // Determine which application service should handle this request
+ for _, appservice := range a.Cfg.Derived.ApplicationServices {
+ if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
+ // The full path to the rooms API, includes hs token
+ URL, err := url.Parse(appservice.URL + userIDExistsPath)
+ URL.Path += request.UserID
+ apiURL := URL.String() + "?access_token=" + appservice.HSToken
+
+ // Send a request to each application service. If one responds that it has
+ // created the user, immediately return.
+ req, err := http.NewRequest(http.MethodGet, apiURL, nil)
+ if err != nil {
+ return err
+ }
+ resp, err := a.HTTPClient.Do(req.WithContext(ctx))
+ if resp != nil {
+ defer func() {
+ err = resp.Body.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice_id": appservice.ID,
+ "status_code": resp.StatusCode,
+ }).Error("Unable to close application service response body")
+ }
+ }()
+ }
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice_id": appservice.ID,
+ }).WithError(err).Error("issue querying user ID on application service")
+ return err
+ }
+ if resp.StatusCode == http.StatusOK {
+ // StatusOK received from appservice. User ID exists
+ response.UserIDExists = true
+ return nil
+ }
+
+ // Log non OK
+ log.WithFields(log.Fields{
+ "appservice_id": appservice.ID,
+ "status_code": resp.StatusCode,
+ }).Warn("application service responded with non-OK status code")
+ }
+ }
+
+ response.UserIDExists = false
+ return nil
+}
+
+// makeHTTPClient creates an HTTP client with certain options that will be used for all query requests to application services
+func makeHTTPClient() *http.Client {
+ return &http.Client{
+ Timeout: time.Second * 30,
+ }
+}
+
+// SetupHTTP adds the AppServiceQueryPAI handlers to the http.ServeMux. This
+// handles and muxes incoming api requests the to internal AppServiceQueryAPI.
+func (a *AppServiceQueryAPI) SetupHTTP(servMux *http.ServeMux) {
+ servMux.Handle(
+ api.AppServiceRoomAliasExistsPath,
+ common.MakeInternalAPI("appserviceRoomAliasExists", func(req *http.Request) util.JSONResponse {
+ var request api.RoomAliasExistsRequest
+ var response api.RoomAliasExistsResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.ErrorResponse(err)
+ }
+ if err := a.RoomAliasExists(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
+ servMux.Handle(
+ api.AppServiceUserIDExistsPath,
+ common.MakeInternalAPI("appserviceUserIDExists", func(req *http.Request) util.JSONResponse {
+ var request api.UserIDExistsRequest
+ var response api.UserIDExistsResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.ErrorResponse(err)
+ }
+ if err := a.UserIDExists(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
+}
diff --git a/appservice/routing/routing.go b/appservice/routing/routing.go
new file mode 100644
index 00000000..f0b8461d
--- /dev/null
+++ b/appservice/routing/routing.go
@@ -0,0 +1,61 @@
+// Copyright 2018 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/common/transactions"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+const pathPrefixApp = "/_matrix/app/r0"
+
+// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
+// to clients which need to make outbound HTTP requests.
+func Setup(
+ apiMux *mux.Router, cfg config.Dendrite, // nolint: unparam
+ queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, // nolint: unparam
+ accountDB *accounts.Database, // nolint: unparam
+ federation *gomatrixserverlib.FederationClient, // nolint: unparam
+ transactionsCache *transactions.Cache, // nolint: unparam
+) {
+ appMux := apiMux.PathPrefix(pathPrefixApp).Subrouter()
+
+ appMux.Handle("/alias",
+ common.MakeExternalAPI("alias", func(req *http.Request) util.JSONResponse {
+ // TODO: Implement
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: nil,
+ }
+ }),
+ ).Methods(http.MethodGet, http.MethodOptions)
+ appMux.Handle("/user",
+ common.MakeExternalAPI("user", func(req *http.Request) util.JSONResponse {
+ // TODO: Implement
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: nil,
+ }
+ }),
+ ).Methods(http.MethodGet, http.MethodOptions)
+}
diff --git a/appservice/storage/appservice_events_table.go b/appservice/storage/appservice_events_table.go
new file mode 100644
index 00000000..285bbf48
--- /dev/null
+++ b/appservice/storage/appservice_events_table.go
@@ -0,0 +1,248 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+const appserviceEventsSchema = `
+-- Stores events to be sent to application services
+CREATE TABLE IF NOT EXISTS appservice_events (
+ -- An auto-incrementing id unique to each event in the table
+ id BIGSERIAL NOT NULL PRIMARY KEY,
+ -- The ID of the application service the event will be sent to
+ as_id TEXT NOT NULL,
+ -- JSON representation of the event
+ event_json TEXT NOT NULL,
+ -- The ID of the transaction that this event is a part of
+ txn_id BIGINT NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
+`
+
+const selectEventsByApplicationServiceIDSQL = "" +
+ "SELECT id, event_json, txn_id " +
+ "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
+
+const countEventsByApplicationServiceIDSQL = "" +
+ "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
+
+const insertEventSQL = "" +
+ "INSERT INTO appservice_events(as_id, event_json, txn_id) " +
+ "VALUES ($1, $2, $3)"
+
+const updateTxnIDForEventsSQL = "" +
+ "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
+
+const deleteEventsBeforeAndIncludingIDSQL = "" +
+ "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
+
+const (
+ // A transaction ID number that no transaction should ever have. Used for
+ // checking again the default value.
+ invalidTxnID = -2
+)
+
+type eventsStatements struct {
+ selectEventsByApplicationServiceIDStmt *sql.Stmt
+ countEventsByApplicationServiceIDStmt *sql.Stmt
+ insertEventStmt *sql.Stmt
+ updateTxnIDForEventsStmt *sql.Stmt
+ deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
+}
+
+func (s *eventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(appserviceEventsSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
+ return
+ }
+ if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
+ return
+ }
+ if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectEventsByApplicationServiceID takes in an application service ID and
+// returns a slice of events that need to be sent to that application service,
+// as well as an int later used to remove these same events from the database
+// once successfully sent to an application service.
+func (s *eventsStatements) selectEventsByApplicationServiceID(
+ ctx context.Context,
+ applicationServiceID string,
+ limit int,
+) (
+ txnID, maxID int,
+ events []gomatrixserverlib.Event,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve events from the database. Unsuccessfully sent events first
+ eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
+ if err != nil {
+ return
+ }
+ defer func() {
+ err = eventRows.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": applicationServiceID,
+ }).WithError(err).Fatalf("appservice unable to select new events to send")
+ }
+ }()
+ events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
+ // Get current time for use in calculating event age
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+
+ // Iterate through each row and store event contents
+ // If txn_id changes dramatically, we've switched from collecting old events to
+ // new ones. Send back those events first.
+ lastTxnID := invalidTxnID
+ for eventsProcessed := 0; eventRows.Next(); {
+ var event gomatrixserverlib.Event
+ var eventJSON []byte
+ var id int
+ err = eventRows.Scan(
+ &id,
+ &eventJSON,
+ &txnID,
+ )
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Unmarshal eventJSON
+ if err = json.Unmarshal(eventJSON, &event); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // If txnID has changed on this event from the previous event, then we've
+ // reached the end of a transaction's events. Return only those events.
+ if lastTxnID > invalidTxnID && lastTxnID != txnID {
+ return events, maxID, lastTxnID, true, nil
+ }
+ lastTxnID = txnID
+
+ // Limit events that aren't part of an old transaction
+ if txnID == -1 {
+ // Return if we've hit the limit
+ if eventsProcessed++; eventsProcessed > limit {
+ return events, maxID, lastTxnID, true, nil
+ }
+ }
+
+ if id > maxID {
+ maxID = id
+ }
+
+ // Portion of the event that is unsigned due to rapid change
+ // TODO: Consider removing age as not many app services use it
+ if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ events = append(events, event)
+ }
+
+ return
+}
+
+// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) countEventsByApplicationServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ var count int
+ err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
+ if err != nil && err != sql.ErrNoRows {
+ return 0, err
+ }
+
+ return count, nil
+}
+
+// insertEvent inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) insertEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) (err error) {
+ // Convert event to JSON before inserting
+ eventJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ _, err = s.insertEventStmt.ExecContext(
+ ctx,
+ appServiceID,
+ eventJSON,
+ -1, // No transaction ID yet
+ )
+ return
+}
+
+// updateTxnIDForEvents sets the transactionID for a collection of events. Done
+// before sending them to an AppService. Referenced before sending to make sure
+// we aren't constructing multiple transactions with the same events.
+func (s *eventsStatements) updateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) (err error) {
+ _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
+ return
+}
+
+// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
+func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) (err error) {
+ _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
+ return
+}
diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go
new file mode 100644
index 00000000..b68989fb
--- /dev/null
+++ b/appservice/storage/storage.go
@@ -0,0 +1,110 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ // Import postgres database driver
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Database stores events intended to be later sent to application services
+type Database struct {
+ events eventsStatements
+ txnID txnStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ if err := d.events.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.txnID.prepare(d.db)
+}
+
+// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// for a transaction worker to pull and later send to an application service.
+func (d *Database) StoreEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) error {
+ return d.events.insertEvent(ctx, appServiceID, event)
+}
+
+// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
+// be sent to an application service given its ID.
+func (d *Database) GetEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+ limit int,
+) (int, int, []gomatrixserverlib.Event, bool, error) {
+ return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
+}
+
+// CountEventsWithAppServiceID returns the number of events destined for an
+// application service given its ID.
+func (d *Database) CountEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
+}
+
+// UpdateTxnIDForEvents takes in an application service ID and a
+// and stores them in the DB, unless the pair already exists, in
+// which case it updates them.
+func (d *Database) UpdateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) error {
+ return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
+}
+
+// RemoveEventsBeforeAndIncludingID removes all events from the database that
+// are less than or equal to a given maximum ID. IDs here are implemented as a
+// serial, thus this should always delete events in chronological order.
+func (d *Database) RemoveEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) error {
+ return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
+}
+
+// GetLatestTxnID returns the latest available transaction id
+func (d *Database) GetLatestTxnID(
+ ctx context.Context,
+) (int, error) {
+ return d.txnID.selectTxnID(ctx)
+}
diff --git a/appservice/storage/txn_id_counter_table.go b/appservice/storage/txn_id_counter_table.go
new file mode 100644
index 00000000..7b0fa378
--- /dev/null
+++ b/appservice/storage/txn_id_counter_table.go
@@ -0,0 +1,52 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+)
+
+const txnIDSchema = `
+-- Keeps a count of the current transaction ID
+CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
+`
+
+const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
+
+type txnStatements struct {
+ selectTxnIDStmt *sql.Stmt
+}
+
+func (s *txnStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(txnIDSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectTxnID selects the latest ascending transaction ID
+func (s *txnStatements) selectTxnID(
+ ctx context.Context,
+) (txnID int, err error) {
+ err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
+ return
+}
diff --git a/appservice/types/types.go b/appservice/types/types.go
new file mode 100644
index 00000000..aac73155
--- /dev/null
+++ b/appservice/types/types.go
@@ -0,0 +1,64 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package types
+
+import (
+ "sync"
+
+ "github.com/matrix-org/dendrite/common/config"
+)
+
+const (
+ // AppServiceDeviceID is the AS dummy device ID
+ AppServiceDeviceID = "AS_Device"
+)
+
+// ApplicationServiceWorkerState is a type that couples an application service,
+// a lockable condition as well as some other state variables, allowing the
+// roomserver to notify appservice workers when there are events ready to send
+// externally to application services.
+type ApplicationServiceWorkerState struct {
+ AppService config.ApplicationService
+ Cond *sync.Cond
+ // Events ready to be sent
+ EventsReady bool
+ // Backoff exponent (2^x secs). Max 6, aka 64s.
+ Backoff int
+}
+
+// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
+// in the event queue for this application service worker.
+func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
+ a.Cond.L.Lock()
+ a.EventsReady = true
+ a.Cond.Broadcast()
+ a.Cond.L.Unlock()
+}
+
+// FinishEventProcessing marks all events of this worker as being sent to the
+// application service.
+func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
+ a.Cond.L.Lock()
+ a.EventsReady = false
+ a.Cond.L.Unlock()
+}
+
+// WaitForNewEvents causes the calling goroutine to wait on the worker state's
+// condition for a broadcast or similar wakeup, if there are no events ready.
+func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
+ a.Cond.L.Lock()
+ if !a.EventsReady {
+ a.Cond.Wait()
+ }
+ a.Cond.L.Unlock()
+}
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
new file mode 100644
index 00000000..0330eb9e
--- /dev/null
+++ b/appservice/workers/transaction_scheduler.go
@@ -0,0 +1,227 @@
+// Copyright 2018 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package workers
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "math"
+ "net/http"
+ "time"
+
+ "github.com/matrix-org/dendrite/appservice/storage"
+ "github.com/matrix-org/dendrite/appservice/types"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+var (
+ // Maximum size of events sent in each transaction.
+ transactionBatchSize = 50
+ // Timeout for sending a single transaction to an application service.
+ transactionTimeout = time.Second * 60
+)
+
+// SetupTransactionWorkers spawns a separate goroutine for each application
+// service. Each of these "workers" handle taking all events intended for their
+// app service, batch them up into a single transaction (up to a max transaction
+// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
+// handles exponentially backing off in case the AS isn't currently available.
+func SetupTransactionWorkers(
+ appserviceDB *storage.Database,
+ workerStates []types.ApplicationServiceWorkerState,
+) error {
+ // Create a worker that handles transmitting events to a single homeserver
+ for _, workerState := range workerStates {
+ // Don't create a worker if this AS doesn't want to receive events
+ if workerState.AppService.URL != "" {
+ go worker(appserviceDB, workerState)
+ }
+ }
+ return nil
+}
+
+// worker is a goroutine that sends any queued events to the application service
+// it is given.
+func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).Info("starting application service")
+ ctx := context.Background()
+
+ // Create a HTTP client for sending requests to app services
+ client := &http.Client{
+ Timeout: transactionTimeout,
+ }
+
+ // Initial check for any leftover events to send from last time
+ eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("appservice worker unable to read queued events from DB")
+ return
+ }
+ if eventCount > 0 {
+ ws.NotifyNewEvents()
+ }
+
+ // Loop forever and keep waiting for more events to send
+ for {
+ // Wait for more events if we've sent all the events in the database
+ ws.WaitForNewEvents()
+
+ // Batch events up into a transaction
+ transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("appservice worker unable to create transaction")
+
+ return
+ }
+
+ // Send the events off to the application service
+ // Backoff if the application service does not respond
+ err = send(client, ws.AppService, txnID, transactionJSON)
+ if err != nil {
+ // Backoff
+ backoff(&ws, err)
+ continue
+ }
+
+ // We sent successfully, hooray!
+ ws.Backoff = 0
+
+ // Transactions have a maximum event size, so there may still be some events
+ // left over to send. Keep sending until none are left
+ if !eventsRemaining {
+ ws.FinishEventProcessing()
+ }
+
+ // Remove sent events from the DB
+ err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Fatal("unable to remove appservice events from the database")
+ return
+ }
+ }
+}
+
+// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
+func backoff(ws *types.ApplicationServiceWorkerState, err error) {
+ // Calculate how long to backoff for
+ backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
+ backoffSeconds := time.Second * backoffDuration
+
+ log.WithFields(log.Fields{
+ "appservice": ws.AppService.ID,
+ }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
+ backoffDuration)
+
+ ws.Backoff++
+ if ws.Backoff > 6 {
+ ws.Backoff = 6
+ }
+
+ // Backoff
+ time.Sleep(backoffSeconds)
+}
+
+// createTransaction takes in a slice of AS events, stores them in an AS
+// transaction, and JSON-encodes the results.
+func createTransaction(
+ ctx context.Context,
+ db *storage.Database,
+ appserviceID string,
+) (
+ transactionJSON []byte,
+ txnID, maxID int,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
+ txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": appserviceID,
+ }).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
+
+ return
+ }
+
+ // Check if these events do not already have a transaction ID
+ if txnID == -1 {
+ // If not, grab next available ID from the DB
+ txnID, err = db.GetLatestTxnID(ctx)
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Mark new events with current transactionID
+ if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
+ return nil, 0, 0, false, err
+ }
+ }
+
+ // Create a transaction and store the events inside
+ transaction := gomatrixserverlib.ApplicationServiceTransaction{
+ Events: events,
+ }
+
+ transactionJSON, err = json.Marshal(transaction)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+// send sends events to an application service. Returns an error if an OK was not
+// received back from the application service or the request timed out.
+func send(
+ client *http.Client,
+ appservice config.ApplicationService,
+ txnID int,
+ transaction []byte,
+) error {
+ // POST a transaction to our AS
+ address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
+ resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
+ if err != nil {
+ return err
+ }
+ defer func() {
+ err := resp.Body.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": appservice.ID,
+ }).WithError(err).Error("unable to close response body from application service")
+ }
+ }()
+
+ // Check the AS received the events correctly
+ if resp.StatusCode != http.StatusOK {
+ // TODO: Handle non-200 error codes from application services
+ return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
+ }
+
+ return nil
+}