diff options
author | ruben <code@rbn.im> | 2019-05-21 22:56:55 +0200 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-05-21 21:56:55 +0100 |
commit | 74827428bd3e11faab65f12204449c1b9469b0ae (patch) | |
tree | 0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /appservice | |
parent | 4d588f7008afe5600219ac0930c2eee2de5c447b (diff) |
use go module for dependencies (#594)
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/README.md | 10 | ||||
-rw-r--r-- | appservice/api/query.go | 178 | ||||
-rw-r--r-- | appservice/appservice.go | 132 | ||||
-rw-r--r-- | appservice/consumers/roomserver.go | 210 | ||||
-rw-r--r-- | appservice/query/query.go | 214 | ||||
-rw-r--r-- | appservice/routing/routing.go | 61 | ||||
-rw-r--r-- | appservice/storage/appservice_events_table.go | 248 | ||||
-rw-r--r-- | appservice/storage/storage.go | 110 | ||||
-rw-r--r-- | appservice/storage/txn_id_counter_table.go | 52 | ||||
-rw-r--r-- | appservice/types/types.go | 64 | ||||
-rw-r--r-- | appservice/workers/transaction_scheduler.go | 227 |
11 files changed, 1506 insertions, 0 deletions
diff --git a/appservice/README.md b/appservice/README.md new file mode 100644 index 00000000..d7555744 --- /dev/null +++ b/appservice/README.md @@ -0,0 +1,10 @@ +# Application Service + +This component interfaces with external [Application +Services](https://matrix.org/docs/spec/application_service/unstable.html). +This includes any HTTP endpoints that application services call, as well as talking +to any HTTP endpoints that application services provide themselves. + +## Consumers + +This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services.
\ No newline at end of file diff --git a/appservice/api/query.go b/appservice/api/query.go new file mode 100644 index 00000000..9ec21448 --- /dev/null +++ b/appservice/api/query.go @@ -0,0 +1,178 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package api contains methods used by dendrite components in multi-process +// mode to send requests to the appservice component, typically in order to ask +// an application service for some information. +package api + +import ( + "context" + "database/sql" + "errors" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/gomatrixserverlib" + + commonHTTP "github.com/matrix-org/dendrite/common/http" + opentracing "github.com/opentracing/opentracing-go" +) + +// RoomAliasExistsRequest is a request to an application service +// about whether a room alias exists +type RoomAliasExistsRequest struct { + // Alias we want to lookup + Alias string `json:"alias"` +} + +// RoomAliasExistsResponse is a response from an application service +// about whether a room alias exists +type RoomAliasExistsResponse struct { + AliasExists bool `json:"exists"` +} + +// UserIDExistsRequest is a request to an application service about whether a +// user ID exists +type UserIDExistsRequest struct { + // UserID we want to lookup + UserID string `json:"user_id"` +} + +// UserIDExistsRequestAccessToken is a request to an application service +// about whether a user ID exists. Includes an access token +type UserIDExistsRequestAccessToken struct { + // UserID we want to lookup + UserID string `json:"user_id"` + AccessToken string `json:"access_token"` +} + +// UserIDExistsResponse is a response from an application service about +// whether a user ID exists +type UserIDExistsResponse struct { + UserIDExists bool `json:"exists"` +} + +// AppServiceQueryAPI is used to query user and room alias data from application +// services +type AppServiceQueryAPI interface { + // Check whether a room alias exists within any application service namespaces + RoomAliasExists( + ctx context.Context, + req *RoomAliasExistsRequest, + resp *RoomAliasExistsResponse, + ) error + // Check whether a user ID exists within any application service namespaces + UserIDExists( + ctx context.Context, + req *UserIDExistsRequest, + resp *UserIDExistsResponse, + ) error +} + +// AppServiceRoomAliasExistsPath is the HTTP path for the RoomAliasExists API +const AppServiceRoomAliasExistsPath = "/api/appservice/RoomAliasExists" + +// AppServiceUserIDExistsPath is the HTTP path for the UserIDExists API +const AppServiceUserIDExistsPath = "/api/appservice/UserIDExists" + +// httpAppServiceQueryAPI contains the URL to an appservice query API and a +// reference to a httpClient used to reach it +type httpAppServiceQueryAPI struct { + appserviceURL string + httpClient *http.Client +} + +// NewAppServiceQueryAPIHTTP creates a AppServiceQueryAPI implemented by talking +// to a HTTP POST API. +// If httpClient is nil then it uses http.DefaultClient +func NewAppServiceQueryAPIHTTP( + appserviceURL string, + httpClient *http.Client, +) AppServiceQueryAPI { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &httpAppServiceQueryAPI{appserviceURL, httpClient} +} + +// RoomAliasExists implements AppServiceQueryAPI +func (h *httpAppServiceQueryAPI) RoomAliasExists( + ctx context.Context, + request *RoomAliasExistsRequest, + response *RoomAliasExistsResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "appserviceRoomAliasExists") + defer span.Finish() + + apiURL := h.appserviceURL + AppServiceRoomAliasExistsPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +// UserIDExists implements AppServiceQueryAPI +func (h *httpAppServiceQueryAPI) UserIDExists( + ctx context.Context, + request *UserIDExistsRequest, + response *UserIDExistsResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "appserviceUserIDExists") + defer span.Finish() + + apiURL := h.appserviceURL + AppServiceUserIDExistsPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +// RetreiveUserProfile is a wrapper that queries both the local database and +// application services for a given user's profile +func RetreiveUserProfile( + ctx context.Context, + userID string, + asAPI AppServiceQueryAPI, + accountDB *accounts.Database, +) (*authtypes.Profile, error) { + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return nil, err + } + + // Try to query the user from the local database + profile, err := accountDB.GetProfileByLocalpart(ctx, localpart) + if err != nil && err != sql.ErrNoRows { + return nil, err + } else if profile != nil { + return profile, nil + } + + // Query the appservice component for the existence of an AS user + userReq := UserIDExistsRequest{UserID: userID} + var userResp UserIDExistsResponse + if err = asAPI.UserIDExists(ctx, &userReq, &userResp); err != nil { + return nil, err + } + + // If no user exists, return + if !userResp.UserIDExists { + return nil, errors.New("no known profile for given user ID") + } + + // Try to query the user from the local database again + profile, err = accountDB.GetProfileByLocalpart(ctx, localpart) + if err != nil { + return nil, err + } + + // profile should not be nil at this point + return profile, nil +} diff --git a/appservice/appservice.go b/appservice/appservice.go new file mode 100644 index 00000000..8703959f --- /dev/null +++ b/appservice/appservice.go @@ -0,0 +1,132 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package appservice + +import ( + "context" + "net/http" + "sync" + "time" + + appserviceAPI "github.com/matrix-org/dendrite/appservice/api" + "github.com/matrix-org/dendrite/appservice/consumers" + "github.com/matrix-org/dendrite/appservice/query" + "github.com/matrix-org/dendrite/appservice/routing" + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" + "github.com/matrix-org/dendrite/appservice/workers" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/common/transactions" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +// SetupAppServiceAPIComponent sets up and registers HTTP handlers for the AppServices +// component. +func SetupAppServiceAPIComponent( + base *basecomponent.BaseDendrite, + accountsDB *accounts.Database, + deviceDB *devices.Database, + federation *gomatrixserverlib.FederationClient, + roomserverAliasAPI roomserverAPI.RoomserverAliasAPI, + roomserverQueryAPI roomserverAPI.RoomserverQueryAPI, + transactionsCache *transactions.Cache, +) appserviceAPI.AppServiceQueryAPI { + // Create a connection to the appservice postgres DB + appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to appservice db") + } + + // Wrap application services in a type that relates the application service and + // a sync.Cond object that can be used to notify workers when there are new + // events to be sent out. + workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) + for i, appservice := range base.Cfg.Derived.ApplicationServices { + m := sync.Mutex{} + ws := types.ApplicationServiceWorkerState{ + AppService: appservice, + Cond: sync.NewCond(&m), + } + workerStates[i] = ws + + // Create bot account for this AS if it doesn't already exist + if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil { + logrus.WithFields(logrus.Fields{ + "appservice": appservice.ID, + }).WithError(err).Panicf("failed to generate bot account for appservice") + } + } + + // Create appserivce query API with an HTTP client that will be used for all + // outbound and inbound requests (inbound only for the internal API) + appserviceQueryAPI := query.AppServiceQueryAPI{ + HTTPClient: &http.Client{ + Timeout: time.Second * 30, + }, + Cfg: base.Cfg, + } + + appserviceQueryAPI.SetupHTTP(http.DefaultServeMux) + + consumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, + roomserverQueryAPI, roomserverAliasAPI, workerStates, + ) + if err := consumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start appservice roomserver consumer") + } + + // Create application service transaction workers + if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil { + logrus.WithError(err).Panicf("failed to start app service transaction workers") + } + + // Set up HTTP Endpoints + routing.Setup( + base.APIMux, *base.Cfg, roomserverQueryAPI, roomserverAliasAPI, + accountsDB, federation, transactionsCache, + ) + + return &appserviceQueryAPI +} + +// generateAppServiceAccounts creates a dummy account based off the +// `sender_localpart` field of each application service if it doesn't +// exist already +func generateAppServiceAccount( + accountsDB *accounts.Database, + deviceDB *devices.Database, + as config.ApplicationService, +) error { + ctx := context.Background() + + // Create an account for the application service + acc, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID) + if err != nil { + return err + } else if acc == nil { + // This account already exists + return nil + } + + // Create a dummy device with a dummy token for the application service + _, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart) + return err +} diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go new file mode 100644 index 00000000..dbdae532 --- /dev/null +++ b/appservice/consumers/roomserver.go @@ -0,0 +1,210 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + + log "github.com/sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { + roomServerConsumer *common.ContinualConsumer + db *accounts.Database + asDB *storage.Database + query api.RoomserverQueryAPI + alias api.RoomserverAliasAPI + serverName string + workerStates []types.ApplicationServiceWorkerState +} + +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call +// Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *accounts.Database, + appserviceDB *storage.Database, + queryAPI api.RoomserverQueryAPI, + aliasAPI api.RoomserverAliasAPI, + workerStates []types.ApplicationServiceWorkerState, +) *OutputRoomEventConsumer { + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + roomServerConsumer: &consumer, + db: store, + asDB: appserviceDB, + query: queryAPI, + alias: aliasAPI, + serverName: string(cfg.Matrix.ServerName), + workerStates: workerStates, + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from room servers +func (s *OutputRoomEventConsumer) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the appservice component receives a new event from +// the room server output log. +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + + ev := output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "type": ev.Type(), + }).Info("appservice received an event from roomserver") + + missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + if err != nil { + return err + } + events := append(missingEvents, ev) + + // Send event to any relevant application services + return s.filterRoomserverEvents(context.TODO(), events) +} + +// lookupMissingStateEvents looks up the state events that are added by a new event, +// and returns any not already present. +func (s *OutputRoomEventConsumer) lookupMissingStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return []gomatrixserverlib.Event{}, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{}, nil + } + + result := []gomatrixserverlib.Event{} + missing := []string{} + for _, id := range addsStateEventIDs { + if id != event.EventID() { + // If the event isn't the current one, add it to the list of events + // to retrieve from the roomserver + missing = append(missing, id) + } + } + + // Request the missing events from the roomserver + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + + return result, nil +} + +// filterRoomserverEvents takes in events and decides whether any of them need +// to be passed on to an external application service. It does this by checking +// each namespace of each registered application service, and if there is a +// match, adds the event to the queue for events to be sent to a particular +// application service. +func (s *OutputRoomEventConsumer) filterRoomserverEvents( + ctx context.Context, + events []gomatrixserverlib.Event, +) error { + for _, ws := range s.workerStates { + for _, event := range events { + // Check if this event is interesting to this application service + if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { + // Queue this event to be sent off to the application service + if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { + log.WithError(err).Warn("failed to insert incoming event into appservices database") + } else { + // Tell our worker to send out new messages by updating remaining message + // count and waking them up with a broadcast + ws.NotifyNewEvents() + } + } + } + } + + return nil +} + +// appserviceIsInterestedInEvent returns a boolean depending on whether a given +// event falls within one of a given application service's namespaces. +func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool { + // No reason to queue events if they'll never be sent to the application + // service + if appservice.URL == "" { + return false + } + + // Check Room ID and Sender of the event + if appservice.IsInterestedInUserID(event.Sender()) || + appservice.IsInterestedInRoomID(event.RoomID()) { + return true + } + + // Check all known room aliases of the room the event came from + queryReq := api.GetAliasesForRoomIDRequest{RoomID: event.RoomID()} + var queryRes api.GetAliasesForRoomIDResponse + if err := s.alias.GetAliasesForRoomID(ctx, &queryReq, &queryRes); err == nil { + for _, alias := range queryRes.Aliases { + if appservice.IsInterestedInRoomAlias(alias) { + return true + } + } + } else { + log.WithFields(log.Fields{ + "room_id": event.RoomID(), + }).WithError(err).Errorf("Unable to get aliases for room") + } + + return false +} diff --git a/appservice/query/query.go b/appservice/query/query.go new file mode 100644 index 00000000..fde3ab09 --- /dev/null +++ b/appservice/query/query.go @@ -0,0 +1,214 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package query handles requests from other internal dendrite components when +// they interact with the AppServiceQueryAPI. +package query + +import ( + "context" + "encoding/json" + "net/http" + "net/url" + "time" + + "github.com/matrix-org/dendrite/appservice/api" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/util" + opentracing "github.com/opentracing/opentracing-go" + log "github.com/sirupsen/logrus" +) + +const roomAliasExistsPath = "/rooms/" +const userIDExistsPath = "/users/" + +// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI +type AppServiceQueryAPI struct { + HTTPClient *http.Client + Cfg *config.Dendrite +} + +// RoomAliasExists performs a request to '/room/{roomAlias}' on all known +// handling application services until one admits to owning the room +func (a *AppServiceQueryAPI) RoomAliasExists( + ctx context.Context, + request *api.RoomAliasExistsRequest, + response *api.RoomAliasExistsResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias") + defer span.Finish() + + // Create an HTTP client if one does not already exist + if a.HTTPClient == nil { + a.HTTPClient = makeHTTPClient() + } + + // Determine which application service should handle this request + for _, appservice := range a.Cfg.Derived.ApplicationServices { + if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) { + // The full path to the rooms API, includes hs token + URL, err := url.Parse(appservice.URL + roomAliasExistsPath) + URL.Path += request.Alias + apiURL := URL.String() + "?access_token=" + appservice.HSToken + + // Send a request to each application service. If one responds that it has + // created the room, immediately return. + req, err := http.NewRequest(http.MethodGet, apiURL, nil) + if err != nil { + return err + } + req = req.WithContext(ctx) + + resp, err := a.HTTPClient.Do(req) + if resp != nil { + defer func() { + err = resp.Body.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice_id": appservice.ID, + "status_code": resp.StatusCode, + }).WithError(err).Error("Unable to close application service response body") + } + }() + } + if err != nil { + log.WithError(err).Errorf("Issue querying room alias on application service %s", appservice.ID) + return err + } + switch resp.StatusCode { + case http.StatusOK: + // OK received from appservice. Room exists + response.AliasExists = true + return nil + case http.StatusNotFound: + // Room does not exist + default: + // Application service reported an error. Warn + log.WithFields(log.Fields{ + "appservice_id": appservice.ID, + "status_code": resp.StatusCode, + }).Warn("Application service responded with non-OK status code") + } + } + } + + response.AliasExists = false + return nil +} + +// UserIDExists performs a request to '/users/{userID}' on all known +// handling application services until one admits to owning the user ID +func (a *AppServiceQueryAPI) UserIDExists( + ctx context.Context, + request *api.UserIDExistsRequest, + response *api.UserIDExistsResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID") + defer span.Finish() + + // Create an HTTP client if one does not already exist + if a.HTTPClient == nil { + a.HTTPClient = makeHTTPClient() + } + + // Determine which application service should handle this request + for _, appservice := range a.Cfg.Derived.ApplicationServices { + if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) { + // The full path to the rooms API, includes hs token + URL, err := url.Parse(appservice.URL + userIDExistsPath) + URL.Path += request.UserID + apiURL := URL.String() + "?access_token=" + appservice.HSToken + + // Send a request to each application service. If one responds that it has + // created the user, immediately return. + req, err := http.NewRequest(http.MethodGet, apiURL, nil) + if err != nil { + return err + } + resp, err := a.HTTPClient.Do(req.WithContext(ctx)) + if resp != nil { + defer func() { + err = resp.Body.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice_id": appservice.ID, + "status_code": resp.StatusCode, + }).Error("Unable to close application service response body") + } + }() + } + if err != nil { + log.WithFields(log.Fields{ + "appservice_id": appservice.ID, + }).WithError(err).Error("issue querying user ID on application service") + return err + } + if resp.StatusCode == http.StatusOK { + // StatusOK received from appservice. User ID exists + response.UserIDExists = true + return nil + } + + // Log non OK + log.WithFields(log.Fields{ + "appservice_id": appservice.ID, + "status_code": resp.StatusCode, + }).Warn("application service responded with non-OK status code") + } + } + + response.UserIDExists = false + return nil +} + +// makeHTTPClient creates an HTTP client with certain options that will be used for all query requests to application services +func makeHTTPClient() *http.Client { + return &http.Client{ + Timeout: time.Second * 30, + } +} + +// SetupHTTP adds the AppServiceQueryPAI handlers to the http.ServeMux. This +// handles and muxes incoming api requests the to internal AppServiceQueryAPI. +func (a *AppServiceQueryAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle( + api.AppServiceRoomAliasExistsPath, + common.MakeInternalAPI("appserviceRoomAliasExists", func(req *http.Request) util.JSONResponse { + var request api.RoomAliasExistsRequest + var response api.RoomAliasExistsResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := a.RoomAliasExists(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + servMux.Handle( + api.AppServiceUserIDExistsPath, + common.MakeInternalAPI("appserviceUserIDExists", func(req *http.Request) util.JSONResponse { + var request api.UserIDExistsRequest + var response api.UserIDExistsResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := a.UserIDExists(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) +} diff --git a/appservice/routing/routing.go b/appservice/routing/routing.go new file mode 100644 index 00000000..f0b8461d --- /dev/null +++ b/appservice/routing/routing.go @@ -0,0 +1,61 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const pathPrefixApp = "/_matrix/app/r0" + +// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client +// to clients which need to make outbound HTTP requests. +func Setup( + apiMux *mux.Router, cfg config.Dendrite, // nolint: unparam + queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, // nolint: unparam + accountDB *accounts.Database, // nolint: unparam + federation *gomatrixserverlib.FederationClient, // nolint: unparam + transactionsCache *transactions.Cache, // nolint: unparam +) { + appMux := apiMux.PathPrefix(pathPrefixApp).Subrouter() + + appMux.Handle("/alias", + common.MakeExternalAPI("alias", func(req *http.Request) util.JSONResponse { + // TODO: Implement + return util.JSONResponse{ + Code: http.StatusOK, + JSON: nil, + } + }), + ).Methods(http.MethodGet, http.MethodOptions) + appMux.Handle("/user", + common.MakeExternalAPI("user", func(req *http.Request) util.JSONResponse { + // TODO: Implement + return util.JSONResponse{ + Code: http.StatusOK, + JSON: nil, + } + }), + ).Methods(http.MethodGet, http.MethodOptions) +} diff --git a/appservice/storage/appservice_events_table.go b/appservice/storage/appservice_events_table.go new file mode 100644 index 00000000..285bbf48 --- /dev/null +++ b/appservice/storage/appservice_events_table.go @@ -0,0 +1,248 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +const appserviceEventsSchema = ` +-- Stores events to be sent to application services +CREATE TABLE IF NOT EXISTS appservice_events ( + -- An auto-incrementing id unique to each event in the table + id BIGSERIAL NOT NULL PRIMARY KEY, + -- The ID of the application service the event will be sent to + as_id TEXT NOT NULL, + -- JSON representation of the event + event_json TEXT NOT NULL, + -- The ID of the transaction that this event is a part of + txn_id BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); +` + +const selectEventsByApplicationServiceIDSQL = "" + + "SELECT id, event_json, txn_id " + + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" + +const countEventsByApplicationServiceIDSQL = "" + + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" + +const insertEventSQL = "" + + "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "VALUES ($1, $2, $3)" + +const updateTxnIDForEventsSQL = "" + + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + +const deleteEventsBeforeAndIncludingIDSQL = "" + + "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" + +const ( + // A transaction ID number that no transaction should ever have. Used for + // checking again the default value. + invalidTxnID = -2 +) + +type eventsStatements struct { + selectEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt +} + +func (s *eventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(appserviceEventsSchema) + if err != nil { + return + } + + if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil { + return + } + if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { + return + } + + return +} + +// selectEventsByApplicationServiceID takes in an application service ID and +// returns a slice of events that need to be sent to that application service, +// as well as an int later used to remove these same events from the database +// once successfully sent to an application service. +func (s *eventsStatements) selectEventsByApplicationServiceID( + ctx context.Context, + applicationServiceID string, + limit int, +) ( + txnID, maxID int, + events []gomatrixserverlib.Event, + eventsRemaining bool, + err error, +) { + // Retrieve events from the database. Unsuccessfully sent events first + eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) + if err != nil { + return + } + defer func() { + err = eventRows.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": applicationServiceID, + }).WithError(err).Fatalf("appservice unable to select new events to send") + } + }() + events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit) + if err != nil { + return + } + + return +} + +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { + // Get current time for use in calculating event age + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + + // Iterate through each row and store event contents + // If txn_id changes dramatically, we've switched from collecting old events to + // new ones. Send back those events first. + lastTxnID := invalidTxnID + for eventsProcessed := 0; eventRows.Next(); { + var event gomatrixserverlib.Event + var eventJSON []byte + var id int + err = eventRows.Scan( + &id, + &eventJSON, + &txnID, + ) + if err != nil { + return nil, 0, 0, false, err + } + + // Unmarshal eventJSON + if err = json.Unmarshal(eventJSON, &event); err != nil { + return nil, 0, 0, false, err + } + + // If txnID has changed on this event from the previous event, then we've + // reached the end of a transaction's events. Return only those events. + if lastTxnID > invalidTxnID && lastTxnID != txnID { + return events, maxID, lastTxnID, true, nil + } + lastTxnID = txnID + + // Limit events that aren't part of an old transaction + if txnID == -1 { + // Return if we've hit the limit + if eventsProcessed++; eventsProcessed > limit { + return events, maxID, lastTxnID, true, nil + } + } + + if id > maxID { + maxID = id + } + + // Portion of the event that is unsigned due to rapid change + // TODO: Consider removing age as not many app services use it + if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { + return nil, 0, 0, false, err + } + + events = append(events, event) + } + + return +} + +// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) countEventsByApplicationServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + var count int + err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + + return count, nil +} + +// insertEvent inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) insertEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) (err error) { + // Convert event to JSON before inserting + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + + _, err = s.insertEventStmt.ExecContext( + ctx, + appServiceID, + eventJSON, + -1, // No transaction ID yet + ) + return +} + +// updateTxnIDForEvents sets the transactionID for a collection of events. Done +// before sending them to an AppService. Referenced before sending to make sure +// we aren't constructing multiple transactions with the same events. +func (s *eventsStatements) updateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) (err error) { + _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + return +} + +// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. +func (s *eventsStatements) deleteEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) (err error) { + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID) + return +} diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go new file mode 100644 index 00000000..b68989fb --- /dev/null +++ b/appservice/storage/storage.go @@ -0,0 +1,110 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + // Import postgres database driver + _ "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) (int, int, []gomatrixserverlib.Event, bool, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. +func (d *Database) RemoveEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) error { + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) +} + +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( + ctx context.Context, +) (int, error) { + return d.txnID.selectTxnID(ctx) +} diff --git a/appservice/storage/txn_id_counter_table.go b/appservice/storage/txn_id_counter_table.go new file mode 100644 index 00000000..7b0fa378 --- /dev/null +++ b/appservice/storage/txn_id_counter_table.go @@ -0,0 +1,52 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" +) + +const txnIDSchema = ` +-- Keeps a count of the current transaction ID +CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1; +` + +const selectTxnIDSQL = "SELECT nextval('txn_id_counter')" + +type txnStatements struct { + selectTxnIDStmt *sql.Stmt +} + +func (s *txnStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(txnIDSchema) + if err != nil { + return + } + + if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { + return + } + + return +} + +// selectTxnID selects the latest ascending transaction ID +func (s *txnStatements) selectTxnID( + ctx context.Context, +) (txnID int, err error) { + err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID) + return +} diff --git a/appservice/types/types.go b/appservice/types/types.go new file mode 100644 index 00000000..aac73155 --- /dev/null +++ b/appservice/types/types.go @@ -0,0 +1,64 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "sync" + + "github.com/matrix-org/dendrite/common/config" +) + +const ( + // AppServiceDeviceID is the AS dummy device ID + AppServiceDeviceID = "AS_Device" +) + +// ApplicationServiceWorkerState is a type that couples an application service, +// a lockable condition as well as some other state variables, allowing the +// roomserver to notify appservice workers when there are events ready to send +// externally to application services. +type ApplicationServiceWorkerState struct { + AppService config.ApplicationService + Cond *sync.Cond + // Events ready to be sent + EventsReady bool + // Backoff exponent (2^x secs). Max 6, aka 64s. + Backoff int +} + +// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain +// in the event queue for this application service worker. +func (a *ApplicationServiceWorkerState) NotifyNewEvents() { + a.Cond.L.Lock() + a.EventsReady = true + a.Cond.Broadcast() + a.Cond.L.Unlock() +} + +// FinishEventProcessing marks all events of this worker as being sent to the +// application service. +func (a *ApplicationServiceWorkerState) FinishEventProcessing() { + a.Cond.L.Lock() + a.EventsReady = false + a.Cond.L.Unlock() +} + +// WaitForNewEvents causes the calling goroutine to wait on the worker state's +// condition for a broadcast or similar wakeup, if there are no events ready. +func (a *ApplicationServiceWorkerState) WaitForNewEvents() { + a.Cond.L.Lock() + if !a.EventsReady { + a.Cond.Wait() + } + a.Cond.L.Unlock() +} diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go new file mode 100644 index 00000000..0330eb9e --- /dev/null +++ b/appservice/workers/transaction_scheduler.go @@ -0,0 +1,227 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workers + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math" + "net/http" + "time" + + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +var ( + // Maximum size of events sent in each transaction. + transactionBatchSize = 50 + // Timeout for sending a single transaction to an application service. + transactionTimeout = time.Second * 60 +) + +// SetupTransactionWorkers spawns a separate goroutine for each application +// service. Each of these "workers" handle taking all events intended for their +// app service, batch them up into a single transaction (up to a max transaction +// size), then send that off to the AS's /transactions/{txnID} endpoint. It also +// handles exponentially backing off in case the AS isn't currently available. +func SetupTransactionWorkers( + appserviceDB *storage.Database, + workerStates []types.ApplicationServiceWorkerState, +) error { + // Create a worker that handles transmitting events to a single homeserver + for _, workerState := range workerStates { + // Don't create a worker if this AS doesn't want to receive events + if workerState.AppService.URL != "" { + go worker(appserviceDB, workerState) + } + } + return nil +} + +// worker is a goroutine that sends any queued events to the application service +// it is given. +func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).Info("starting application service") + ctx := context.Background() + + // Create a HTTP client for sending requests to app services + client := &http.Client{ + Timeout: transactionTimeout, + } + + // Initial check for any leftover events to send from last time + eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to read queued events from DB") + return + } + if eventCount > 0 { + ws.NotifyNewEvents() + } + + // Loop forever and keep waiting for more events to send + for { + // Wait for more events if we've sent all the events in the database + ws.WaitForNewEvents() + + // Batch events up into a transaction + transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to create transaction") + + return + } + + // Send the events off to the application service + // Backoff if the application service does not respond + err = send(client, ws.AppService, txnID, transactionJSON) + if err != nil { + // Backoff + backoff(&ws, err) + continue + } + + // We sent successfully, hooray! + ws.Backoff = 0 + + // Transactions have a maximum event size, so there may still be some events + // left over to send. Keep sending until none are left + if !eventsRemaining { + ws.FinishEventProcessing() + } + + // Remove sent events from the DB + err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("unable to remove appservice events from the database") + return + } + } +} + +// backoff pauses the calling goroutine for a 2^some backoff exponent seconds +func backoff(ws *types.ApplicationServiceWorkerState, err error) { + // Calculate how long to backoff for + backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) + backoffSeconds := time.Second * backoffDuration + + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds", + backoffDuration) + + ws.Backoff++ + if ws.Backoff > 6 { + ws.Backoff = 6 + } + + // Backoff + time.Sleep(backoffSeconds) +} + +// createTransaction takes in a slice of AS events, stores them in an AS +// transaction, and JSON-encodes the results. +func createTransaction( + ctx context.Context, + db *storage.Database, + appserviceID string, +) ( + transactionJSON []byte, + txnID, maxID int, + eventsRemaining bool, + err error, +) { + // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) + txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) + if err != nil { + log.WithFields(log.Fields{ + "appservice": appserviceID, + }).WithError(err).Fatalf("appservice worker unable to read queued events from DB") + + return + } + + // Check if these events do not already have a transaction ID + if txnID == -1 { + // If not, grab next available ID from the DB + txnID, err = db.GetLatestTxnID(ctx) + if err != nil { + return nil, 0, 0, false, err + } + + // Mark new events with current transactionID + if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil { + return nil, 0, 0, false, err + } + } + + // Create a transaction and store the events inside + transaction := gomatrixserverlib.ApplicationServiceTransaction{ + Events: events, + } + + transactionJSON, err = json.Marshal(transaction) + if err != nil { + return + } + + return +} + +// send sends events to an application service. Returns an error if an OK was not +// received back from the application service or the request timed out. +func send( + client *http.Client, + appservice config.ApplicationService, + txnID int, + transaction []byte, +) error { + // POST a transaction to our AS + address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID) + resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) + if err != nil { + return err + } + defer func() { + err := resp.Body.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": appservice.ID, + }).WithError(err).Error("unable to close response body from application service") + } + }() + + // Check the AS received the events correctly + if resp.StatusCode != http.StatusOK { + // TODO: Handle non-200 error codes from application services + return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode) + } + + return nil +} |