aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-09-01 09:20:40 +0100
committerGitHub <noreply@github.com>2022-09-01 09:20:40 +0100
commitad6b902b8462adb568d799c69a74b60d69574d0c (patch)
tree9037eb130a47c25cb320116758baa6ee265e89b6 /appservice
parent175f65407a7f684753334022e66b8209f3db7396 (diff)
Refactor appservices component (#2687)
This PR refactors the app services component. It makes the following changes: * Each appservice now gets its own NATS JetStream consumer * The appservice database is now removed entirely, since we just use JetStream as a data source instead * The entire component is now much simpler and we deleted lots of lines of code 💅 The result is that it should be much lighter and hopefully much more performant.
Diffstat (limited to 'appservice')
-rw-r--r--appservice/README.md10
-rw-r--r--appservice/appservice.go55
-rw-r--r--appservice/consumers/roomserver.go318
-rw-r--r--appservice/query/query.go2
-rw-r--r--appservice/storage/interface.go30
-rw-r--r--appservice/storage/postgres/appservice_events_table.go256
-rw-r--r--appservice/storage/postgres/storage.go115
-rw-r--r--appservice/storage/postgres/txn_id_counter_table.go53
-rw-r--r--appservice/storage/sqlite3/appservice_events_table.go267
-rw-r--r--appservice/storage/sqlite3/storage.go114
-rw-r--r--appservice/storage/sqlite3/txn_id_counter_table.go82
-rw-r--r--appservice/storage/storage.go40
-rw-r--r--appservice/storage/storage_wasm.go34
-rw-r--r--appservice/types/types.go64
-rw-r--r--appservice/workers/transaction_scheduler.go236
15 files changed, 207 insertions, 1469 deletions
diff --git a/appservice/README.md b/appservice/README.md
deleted file mode 100644
index d7555744..00000000
--- a/appservice/README.md
+++ /dev/null
@@ -1,10 +0,0 @@
-# Application Service
-
-This component interfaces with external [Application
-Services](https://matrix.org/docs/spec/application_service/unstable.html).
-This includes any HTTP endpoints that application services call, as well as talking
-to any HTTP endpoints that application services provide themselves.
-
-## Consumers
-
-This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services. \ No newline at end of file
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 8fe1b2fc..9000adb1 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -18,7 +18,6 @@ import (
"context"
"crypto/tls"
"net/http"
- "sync"
"time"
"github.com/gorilla/mux"
@@ -28,9 +27,6 @@ import (
"github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/inthttp"
"github.com/matrix-org/dendrite/appservice/query"
- "github.com/matrix-org/dendrite/appservice/storage"
- "github.com/matrix-org/dendrite/appservice/types"
- "github.com/matrix-org/dendrite/appservice/workers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
@@ -59,57 +55,40 @@ func NewInternalAPI(
Proxy: http.ProxyFromEnvironment,
},
}
- js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ // Create appserivce query API with an HTTP client that will be used for all
+ // outbound and inbound requests (inbound only for the internal API)
+ appserviceQueryAPI := &query.AppServiceQueryAPI{
+ HTTPClient: client,
+ Cfg: &base.Cfg.AppServiceAPI,
+ }
- // Create a connection to the appservice postgres DB
- appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database)
- if err != nil {
- logrus.WithError(err).Panicf("failed to connect to appservice db")
+ if len(base.Cfg.Derived.ApplicationServices) == 0 {
+ return appserviceQueryAPI
}
// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
- workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
- for i, appservice := range base.Cfg.Derived.ApplicationServices {
- m := sync.Mutex{}
- ws := types.ApplicationServiceWorkerState{
- AppService: appservice,
- Cond: sync.NewCond(&m),
- }
- workerStates[i] = ws
-
+ for _, appservice := range base.Cfg.Derived.ApplicationServices {
// Create bot account for this AS if it doesn't already exist
- if err = generateAppServiceAccount(userAPI, appservice); err != nil {
+ if err := generateAppServiceAccount(userAPI, appservice); err != nil {
logrus.WithFields(logrus.Fields{
"appservice": appservice.ID,
}).WithError(err).Panicf("failed to generate bot account for appservice")
}
}
- // Create appserivce query API with an HTTP client that will be used for all
- // outbound and inbound requests (inbound only for the internal API)
- appserviceQueryAPI := &query.AppServiceQueryAPI{
- HTTPClient: client,
- Cfg: base.Cfg,
- }
-
// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
// We can't add ASes at runtime so this is safe to do.
- if len(workerStates) > 0 {
- consumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, base.Cfg, js, appserviceDB,
- rsAPI, workerStates,
- )
- if err := consumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
- }
+ js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ consumer := consumers.NewOutputRoomEventConsumer(
+ base.ProcessContext, &base.Cfg.AppServiceAPI,
+ client, js, rsAPI,
+ )
+ if err := consumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
}
- // Create application service transaction workers
- if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil {
- logrus.WithError(err).Panicf("failed to start app service transaction workers")
- }
return appserviceQueryAPI
}
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 21b52bc3..a30944e7 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -15,14 +15,18 @@
package consumers
import (
+ "bytes"
"context"
"encoding/json"
+ "fmt"
+ "math"
+ "net/http"
+ "net/url"
+ "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
- "github.com/matrix-org/dendrite/appservice/storage"
- "github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -33,178 +37,192 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- asDB storage.Database
- rsAPI api.AppserviceRoomserverAPI
- serverName string
- workerStates []types.ApplicationServiceWorkerState
+ ctx context.Context
+ cfg *config.AppServiceAPI
+ client *http.Client
+ jetstream nats.JetStreamContext
+ topic string
+ rsAPI api.AppserviceRoomserverAPI
+}
+
+type appserviceState struct {
+ *config.ApplicationService
+ backoff int
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
- cfg *config.Dendrite,
+ cfg *config.AppServiceAPI,
+ client *http.Client,
js nats.JetStreamContext,
- appserviceDB storage.Database,
rsAPI api.AppserviceRoomserverAPI,
- workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
- ctx: process.Context(),
- jetstream: js,
- durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
- topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
- asDB: appserviceDB,
- rsAPI: rsAPI,
- serverName: string(cfg.Global.ServerName),
- workerStates: workerStates,
+ ctx: process.Context(),
+ cfg: cfg,
+ client: client,
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ rsAPI: rsAPI,
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, 1,
- s.onMessage, nats.DeliverAll(), nats.ManualAck(),
- )
+ for _, as := range s.cfg.Derived.ApplicationServices {
+ appsvc := as
+ state := &appserviceState{
+ ApplicationService: &appsvc,
+ }
+ token := jetstream.Tokenise(as.ID)
+ if err := jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic,
+ s.cfg.Matrix.JetStream.Durable("Appservice_"+token),
+ 50, // maximum number of events to send in a single transaction
+ func(ctx context.Context, msgs []*nats.Msg) bool {
+ return s.onMessage(ctx, state, msgs)
+ },
+ nats.DeliverNew(), nats.ManualAck(),
+ ); err != nil {
+ return fmt.Errorf("failed to create %q consumer: %w", token, err)
+ }
+ }
+ return nil
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
-func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
- msg := msgs[0] // Guaranteed to exist if onMessage is called
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return true
- }
-
- log.WithFields(log.Fields{
- "type": output.Type,
- }).Debug("Got a message in OutputRoomEventConsumer")
-
- events := []*gomatrixserverlib.HeaderedEvent{}
- if output.Type == api.OutputTypeNewRoomEvent && output.NewRoomEvent != nil {
- newEventID := output.NewRoomEvent.Event.EventID()
- events = append(events, output.NewRoomEvent.Event)
- if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
- eventsReq := &api.QueryEventsByIDRequest{
- EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
+func (s *OutputRoomEventConsumer) onMessage(
+ ctx context.Context, state *appserviceState, msgs []*nats.Msg,
+) bool {
+ log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
+ events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
+ for _, msg := range msgs {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
+ continue
+ }
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ if output.NewRoomEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
+ continue
}
- eventsRes := &api.QueryEventsByIDResponse{}
- for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
- if eventID != newEventID {
- eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
+ events = append(events, output.NewRoomEvent.Event)
+ if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
+ newEventID := output.NewRoomEvent.Event.EventID()
+ eventsReq := &api.QueryEventsByIDRequest{
+ EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
}
- }
- if len(eventsReq.EventIDs) > 0 {
- if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
- log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
- return false
+ eventsRes := &api.QueryEventsByIDResponse{}
+ for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
+ if eventID != newEventID {
+ eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
+ }
+ }
+ if len(eventsReq.EventIDs) > 0 {
+ if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
+ log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
+ return false
+ }
+ events = append(events, eventsRes.Events...)
}
- events = append(events, eventsRes.Events...)
}
+
+ case api.OutputTypeNewInviteEvent:
+ if output.NewInviteEvent == nil {
+ continue
+ }
+ events = append(events, output.NewInviteEvent.Event)
+
+ default:
+ continue
}
- } else if output.Type == api.OutputTypeNewInviteEvent && output.NewInviteEvent != nil {
- events = append(events, output.NewInviteEvent.Event)
- } else {
- log.WithFields(log.Fields{
- "type": output.Type,
- }).Debug("appservice OutputRoomEventConsumer ignoring event", string(msg.Data))
- return true
}
- // Send event to any relevant application services
- if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
- log.WithError(err).Errorf("roomserver output log: filter error")
+ // If there are no events selected for sending then we should
+ // ack the messages so that we don't get sent them again in the
+ // future.
+ if len(events) == 0 {
return true
}
- return true
+ // Send event to any relevant application services. If we hit
+ // an error here, return false, so that we negatively ack.
+ log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
+ return s.sendEvents(ctx, state, events) == nil
}
-// filterRoomserverEvents takes in events and decides whether any of them need
-// to be passed on to an external application service. It does this by checking
-// each namespace of each registered application service, and if there is a
-// match, adds the event to the queue for events to be sent to a particular
-// application service.
-func (s *OutputRoomEventConsumer) filterRoomserverEvents(
- ctx context.Context,
+// sendEvents passes events to the appservice by using the transactions
+// endpoint. It will block for the backoff period if necessary.
+func (s *OutputRoomEventConsumer) sendEvents(
+ ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
) error {
- for _, ws := range s.workerStates {
- for _, event := range events {
- // Check if this event is interesting to this application service
- if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
- // Queue this event to be sent off to the application service
- if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
- log.WithError(err).Warn("failed to insert incoming event into appservices database")
- return err
- } else {
- // Tell our worker to send out new messages by updating remaining message
- // count and waking them up with a broadcast
- ws.NotifyNewEvents()
- }
- }
- }
+ // Create the transaction body.
+ transaction, err := json.Marshal(
+ gomatrixserverlib.ApplicationServiceTransaction{
+ Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll),
+ },
+ )
+ if err != nil {
+ return err
}
- return nil
-}
+ // TODO: We should probably be more intelligent and pick something not
+ // in the control of the event. A NATS timestamp header or something maybe.
+ txnID := events[0].Event.OriginServerTS()
-// appserviceJoinedAtEvent returns a boolean depending on whether a given
-// appservice has membership at the time a given event was created.
-func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
- // TODO: This is only checking the current room state, not the state at
- // the event in question. Pretty sure this is what Synapse does too, but
- // until we have a lighter way of checking the state before the event that
- // doesn't involve state res, then this is probably OK.
- membershipReq := &api.QueryMembershipsForRoomRequest{
- RoomID: event.RoomID(),
- JoinedOnly: true,
+ // Send the transaction to the appservice.
+ // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
+ address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
+ req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := s.client.Do(req)
+ if err != nil {
+ return state.backoffAndPause(err)
}
- membershipRes := &api.QueryMembershipsForRoomResponse{}
- // XXX: This could potentially race if the state for the event is not known yet
- // e.g. the event came over federation but we do not have the full state persisted.
- if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
- for _, ev := range membershipRes.JoinEvents {
- var membership gomatrixserverlib.MemberContent
- if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil {
- continue
- }
- if appservice.IsInterestedInUserID(*ev.StateKey) {
- return true
- }
- }
- } else {
- log.WithFields(log.Fields{
- "room_id": event.RoomID(),
- }).WithError(err).Errorf("Unable to get membership for room")
+ // If the response was fine then we can clear any backoffs in place and
+ // report that everything was OK. Otherwise, back off for a while.
+ switch resp.StatusCode {
+ case http.StatusOK:
+ state.backoff = 0
+ default:
+ return state.backoffAndPause(fmt.Errorf("received HTTP status code %d from appservice", resp.StatusCode))
}
- return false
+ return nil
+}
+
+// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
+func (s *appserviceState) backoffAndPause(err error) error {
+ if s.backoff < 6 {
+ s.backoff++
+ }
+ duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff)))
+ log.WithField("appservice", s.ID).WithError(err).Errorf("Unable to send transaction to appservice, backing off for %s", duration.String())
+ time.Sleep(duration)
+ return err
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
//
// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
-func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
- // No reason to queue events if they'll never be sent to the application
- // service
- if appservice.URL == "" {
+func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
+ switch {
+ case appservice.URL == "":
return false
- }
-
- // Check Room ID and Sender of the event
- if appservice.IsInterestedInUserID(event.Sender()) ||
- appservice.IsInterestedInRoomID(event.RoomID()) {
+ case appservice.IsInterestedInUserID(event.Sender()):
+ return true
+ case appservice.IsInterestedInRoomID(event.RoomID()):
return true
}
@@ -225,10 +243,52 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
}
} else {
log.WithFields(log.Fields{
- "room_id": event.RoomID(),
+ "appservice": appservice.ID,
+ "room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get aliases for room")
}
// Check if any of the members in the room match the appservice
return s.appserviceJoinedAtEvent(ctx, event, appservice)
}
+
+// appserviceJoinedAtEvent returns a boolean depending on whether a given
+// appservice has membership at the time a given event was created.
+func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
+ // TODO: This is only checking the current room state, not the state at
+ // the event in question. Pretty sure this is what Synapse does too, but
+ // until we have a lighter way of checking the state before the event that
+ // doesn't involve state res, then this is probably OK.
+ membershipReq := &api.QueryMembershipsForRoomRequest{
+ RoomID: event.RoomID(),
+ JoinedOnly: true,
+ }
+ membershipRes := &api.QueryMembershipsForRoomResponse{}
+
+ // XXX: This could potentially race if the state for the event is not known yet
+ // e.g. the event came over federation but we do not have the full state persisted.
+ if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
+ for _, ev := range membershipRes.JoinEvents {
+ switch {
+ case ev.StateKey == nil:
+ continue
+ case ev.Type != gomatrixserverlib.MRoomMember:
+ continue
+ }
+ var membership gomatrixserverlib.MemberContent
+ err = json.Unmarshal(ev.Content, &membership)
+ switch {
+ case err != nil:
+ continue
+ case membership.Membership == gomatrixserverlib.Join:
+ return true
+ }
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "appservice": appservice.ID,
+ "room_id": event.RoomID(),
+ }).WithError(err).Errorf("Unable to get membership for room")
+ }
+ return false
+}
diff --git a/appservice/query/query.go b/appservice/query/query.go
index dacd3caa..53b34cb1 100644
--- a/appservice/query/query.go
+++ b/appservice/query/query.go
@@ -33,7 +33,7 @@ const userIDExistsPath = "/users/"
// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
type AppServiceQueryAPI struct {
HTTPClient *http.Client
- Cfg *config.Dendrite
+ Cfg *config.AppServiceAPI
}
// RoomAliasExists performs a request to '/room/{roomAlias}' on all known
diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go
deleted file mode 100644
index 25d35af6..00000000
--- a/appservice/storage/interface.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
- "context"
-
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-type Database interface {
- StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
- GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
- CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
- UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
- RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
- GetLatestTxnID(ctx context.Context) (int, error)
-}
diff --git a/appservice/storage/postgres/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go
deleted file mode 100644
index a95be6b8..00000000
--- a/appservice/storage/postgres/appservice_events_table.go
+++ /dev/null
@@ -1,256 +0,0 @@
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package postgres
-
-import (
- "context"
- "database/sql"
- "encoding/json"
- "time"
-
- "github.com/matrix-org/gomatrixserverlib"
- log "github.com/sirupsen/logrus"
-)
-
-const appserviceEventsSchema = `
--- Stores events to be sent to application services
-CREATE TABLE IF NOT EXISTS appservice_events (
- -- An auto-incrementing id unique to each event in the table
- id BIGSERIAL NOT NULL PRIMARY KEY,
- -- The ID of the application service the event will be sent to
- as_id TEXT NOT NULL,
- -- JSON representation of the event
- headered_event_json TEXT NOT NULL,
- -- The ID of the transaction that this event is a part of
- txn_id BIGINT NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
-`
-
-const selectEventsByApplicationServiceIDSQL = "" +
- "SELECT id, headered_event_json, txn_id " +
- "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
-
-const countEventsByApplicationServiceIDSQL = "" +
- "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
-
-const insertEventSQL = "" +
- "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
- "VALUES ($1, $2, $3)"
-
-const updateTxnIDForEventsSQL = "" +
- "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
-
-const deleteEventsBeforeAndIncludingIDSQL = "" +
- "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
-
-const (
- // A transaction ID number that no transaction should ever have. Used for
- // checking again the default value.
- invalidTxnID = -2
-)
-
-type eventsStatements struct {
- selectEventsByApplicationServiceIDStmt *sql.Stmt
- countEventsByApplicationServiceIDStmt *sql.Stmt
- insertEventStmt *sql.Stmt
- updateTxnIDForEventsStmt *sql.Stmt
- deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
-}
-
-func (s *eventsStatements) prepare(db *sql.DB) (err error) {
- _, err = db.Exec(appserviceEventsSchema)
- if err != nil {
- return
- }
-
- if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
- return
- }
- if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
- return
- }
- if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
- return
- }
- if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
- return
- }
- if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
- return
- }
-
- return
-}
-
-// selectEventsByApplicationServiceID takes in an application service ID and
-// returns a slice of events that need to be sent to that application service,
-// as well as an int later used to remove these same events from the database
-// once successfully sent to an application service.
-func (s *eventsStatements) selectEventsByApplicationServiceID(
- ctx context.Context,
- applicationServiceID string,
- limit int,
-) (
- txnID, maxID int,
- events []gomatrixserverlib.HeaderedEvent,
- eventsRemaining bool,
- err error,
-) {
- defer func() {
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": applicationServiceID,
- }).WithError(err).Fatalf("appservice unable to select new events to send")
- }
- }()
- // Retrieve events from the database. Unsuccessfully sent events first
- eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
- if err != nil {
- return
- }
- defer checkNamedErr(eventRows.Close, &err)
- events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
- if err != nil {
- return
- }
-
- return
-}
-
-// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
-func checkNamedErr(fn func() error, err *error) {
- if e := fn(); e != nil && *err == nil {
- *err = e
- }
-}
-
-func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
- // Get current time for use in calculating event age
- nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
-
- // Iterate through each row and store event contents
- // If txn_id changes dramatically, we've switched from collecting old events to
- // new ones. Send back those events first.
- lastTxnID := invalidTxnID
- for eventsProcessed := 0; eventRows.Next(); {
- var event gomatrixserverlib.HeaderedEvent
- var eventJSON []byte
- var id int
- err = eventRows.Scan(
- &id,
- &eventJSON,
- &txnID,
- )
- if err != nil {
- return nil, 0, 0, false, err
- }
-
- // Unmarshal eventJSON
- if err = json.Unmarshal(eventJSON, &event); err != nil {
- return nil, 0, 0, false, err
- }
-
- // If txnID has changed on this event from the previous event, then we've
- // reached the end of a transaction's events. Return only those events.
- if lastTxnID > invalidTxnID && lastTxnID != txnID {
- return events, maxID, lastTxnID, true, nil
- }
- lastTxnID = txnID
-
- // Limit events that aren't part of an old transaction
- if txnID == -1 {
- // Return if we've hit the limit
- if eventsProcessed++; eventsProcessed > limit {
- return events, maxID, lastTxnID, true, nil
- }
- }
-
- if id > maxID {
- maxID = id
- }
-
- // Portion of the event that is unsigned due to rapid change
- // TODO: Consider removing age as not many app services use it
- if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
- return nil, 0, 0, false, err
- }
-
- events = append(events, event)
- }
-
- return
-}
-
-// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
-// IDs into the db.
-func (s *eventsStatements) countEventsByApplicationServiceID(
- ctx context.Context,
- appServiceID string,
-) (int, error) {
- var count int
- err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
- if err != nil && err != sql.ErrNoRows {
- return 0, err
- }
-
- return count, nil
-}
-
-// insertEvent inserts an event mapped to its corresponding application service
-// IDs into the db.
-func (s *eventsStatements) insertEvent(
- ctx context.Context,
- appServiceID string,
- event *gomatrixserverlib.HeaderedEvent,
-) (err error) {
- // Convert event to JSON before inserting
- eventJSON, err := json.Marshal(event)
- if err != nil {
- return err
- }
-
- _, err = s.insertEventStmt.ExecContext(
- ctx,
- appServiceID,
- eventJSON,
- -1, // No transaction ID yet
- )
- return
-}
-
-// updateTxnIDForEvents sets the transactionID for a collection of events. Done
-// before sending them to an AppService. Referenced before sending to make sure
-// we aren't constructing multiple transactions with the same events.
-func (s *eventsStatements) updateTxnIDForEvents(
- ctx context.Context,
- appserviceID string,
- maxID, txnID int,
-) (err error) {
- _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
- return
-}
-
-// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
-func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
- ctx context.Context,
- appserviceID string,
- eventTableID int,
-) (err error) {
- _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
- return
-}
diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go
deleted file mode 100644
index a4c04b2c..00000000
--- a/appservice/storage/postgres/storage.go
+++ /dev/null
@@ -1,115 +0,0 @@
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package postgres
-
-import (
- "context"
- "database/sql"
-
- // Import postgres database driver
- _ "github.com/lib/pq"
- "github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// Database stores events intended to be later sent to application services
-type Database struct {
- events eventsStatements
- txnID txnStatements
- db *sql.DB
- writer sqlutil.Writer
-}
-
-// NewDatabase opens a new database
-func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*Database, error) {
- var result Database
- var err error
- if result.db, result.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
- return nil, err
- }
- if err = result.prepare(); err != nil {
- return nil, err
- }
- return &result, nil
-}
-
-func (d *Database) prepare() error {
- if err := d.events.prepare(d.db); err != nil {
- return err
- }
-
- return d.txnID.prepare(d.db)
-}
-
-// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
-// for a transaction worker to pull and later send to an application service.
-func (d *Database) StoreEvent(
- ctx context.Context,
- appServiceID string,
- event *gomatrixserverlib.HeaderedEvent,
-) error {
- return d.events.insertEvent(ctx, appServiceID, event)
-}
-
-// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
-// be sent to an application service given its ID.
-func (d *Database) GetEventsWithAppServiceID(
- ctx context.Context,
- appServiceID string,
- limit int,
-) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) {
- return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
-}
-
-// CountEventsWithAppServiceID returns the number of events destined for an
-// application service given its ID.
-func (d *Database) CountEventsWithAppServiceID(
- ctx context.Context,
- appServiceID string,
-) (int, error) {
- return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
-}
-
-// UpdateTxnIDForEvents takes in an application service ID and a
-// and stores them in the DB, unless the pair already exists, in
-// which case it updates them.
-func (d *Database) UpdateTxnIDForEvents(
- ctx context.Context,
- appserviceID string,
- maxID, txnID int,
-) error {
- return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
-}
-
-// RemoveEventsBeforeAndIncludingID removes all events from the database that
-// are less than or equal to a given maximum ID. IDs here are implemented as a
-// serial, thus this should always delete events in chronological order.
-func (d *Database) RemoveEventsBeforeAndIncludingID(
- ctx context.Context,
- appserviceID string,
- eventTableID int,
-) error {
- return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
-}
-
-// GetLatestTxnID returns the latest available transaction id
-func (d *Database) GetLatestTxnID(
- ctx context.Context,
-) (int, error) {
- return d.txnID.selectTxnID(ctx)
-}
diff --git a/appservice/storage/postgres/txn_id_counter_table.go b/appservice/storage/postgres/txn_id_counter_table.go
deleted file mode 100644
index a96a0e36..00000000
--- a/appservice/storage/postgres/txn_id_counter_table.go
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package postgres
-
-import (
- "context"
- "database/sql"
-)
-
-const txnIDSchema = `
--- Keeps a count of the current transaction ID
-CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
-`
-
-const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
-
-type txnStatements struct {
- selectTxnIDStmt *sql.Stmt
-}
-
-func (s *txnStatements) prepare(db *sql.DB) (err error) {
- _, err = db.Exec(txnIDSchema)
- if err != nil {
- return
- }
-
- if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
- return
- }
-
- return
-}
-
-// selectTxnID selects the latest ascending transaction ID
-func (s *txnStatements) selectTxnID(
- ctx context.Context,
-) (txnID int, err error) {
- err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
- return
-}
diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go
deleted file mode 100644
index 34b4859e..00000000
--- a/appservice/storage/sqlite3/appservice_events_table.go
+++ /dev/null
@@ -1,267 +0,0 @@
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sqlite3
-
-import (
- "context"
- "database/sql"
- "encoding/json"
- "time"
-
- "github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/gomatrixserverlib"
- log "github.com/sirupsen/logrus"
-)
-
-const appserviceEventsSchema = `
--- Stores events to be sent to application services
-CREATE TABLE IF NOT EXISTS appservice_events (
- -- An auto-incrementing id unique to each event in the table
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- -- The ID of the application service the event will be sent to
- as_id TEXT NOT NULL,
- -- JSON representation of the event
- headered_event_json TEXT NOT NULL,
- -- The ID of the transaction that this event is a part of
- txn_id INTEGER NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
-`
-
-const selectEventsByApplicationServiceIDSQL = "" +
- "SELECT id, headered_event_json, txn_id " +
- "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
-
-const countEventsByApplicationServiceIDSQL = "" +
- "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
-
-const insertEventSQL = "" +
- "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
- "VALUES ($1, $2, $3)"
-
-const updateTxnIDForEventsSQL = "" +
- "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
-
-const deleteEventsBeforeAndIncludingIDSQL = "" +
- "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
-
-const (
- // A transaction ID number that no transaction should ever have. Used for
- // checking again the default value.
- invalidTxnID = -2
-)
-
-type eventsStatements struct {
- db *sql.DB
- writer sqlutil.Writer
- selectEventsByApplicationServiceIDStmt *sql.Stmt
- countEventsByApplicationServiceIDStmt *sql.Stmt
- insertEventStmt *sql.Stmt
- updateTxnIDForEventsStmt *sql.Stmt
- deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
-}
-
-func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
- s.db = db
- s.writer = writer
- _, err = db.Exec(appserviceEventsSchema)
- if err != nil {
- return
- }
-
- if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
- return
- }
- if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
- return
- }
- if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
- return
- }
- if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
- return
- }
- if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
- return
- }
-
- return
-}
-
-// selectEventsByApplicationServiceID takes in an application service ID and
-// returns a slice of events that need to be sent to that application service,
-// as well as an int later used to remove these same events from the database
-// once successfully sent to an application service.
-func (s *eventsStatements) selectEventsByApplicationServiceID(
- ctx context.Context,
- applicationServiceID string,
- limit int,
-) (
- txnID, maxID int,
- events []gomatrixserverlib.HeaderedEvent,
- eventsRemaining bool,
- err error,
-) {
- defer func() {
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": applicationServiceID,
- }).WithError(err).Fatalf("appservice unable to select new events to send")
- }
- }()
- // Retrieve events from the database. Unsuccessfully sent events first
- eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
- if err != nil {
- return
- }
- defer checkNamedErr(eventRows.Close, &err)
- events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
- if err != nil {
- return
- }
-
- return
-}
-
-// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
-func checkNamedErr(fn func() error, err *error) {
- if e := fn(); e != nil && *err == nil {
- *err = e
- }
-}
-
-func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
- // Get current time for use in calculating event age
- nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
-
- // Iterate through each row and store event contents
- // If txn_id changes dramatically, we've switched from collecting old events to
- // new ones. Send back those events first.
- lastTxnID := invalidTxnID
- for eventsProcessed := 0; eventRows.Next(); {
- var event gomatrixserverlib.HeaderedEvent
- var eventJSON []byte
- var id int
- err = eventRows.Scan(
- &id,
- &eventJSON,
- &txnID,
- )
- if err != nil {
- return nil, 0, 0, false, err
- }
-
- // Unmarshal eventJSON
- if err = json.Unmarshal(eventJSON, &event); err != nil {
- return nil, 0, 0, false, err
- }
-
- // If txnID has changed on this event from the previous event, then we've
- // reached the end of a transaction's events. Return only those events.
- if lastTxnID > invalidTxnID && lastTxnID != txnID {
- return events, maxID, lastTxnID, true, nil
- }
- lastTxnID = txnID
-
- // Limit events that aren't part of an old transaction
- if txnID == -1 {
- // Return if we've hit the limit
- if eventsProcessed++; eventsProcessed > limit {
- return events, maxID, lastTxnID, true, nil
- }
- }
-
- if id > maxID {
- maxID = id
- }
-
- // Portion of the event that is unsigned due to rapid change
- // TODO: Consider removing age as not many app services use it
- if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
- return nil, 0, 0, false, err
- }
-
- events = append(events, event)
- }
-
- return
-}
-
-// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
-// IDs into the db.
-func (s *eventsStatements) countEventsByApplicationServiceID(
- ctx context.Context,
- appServiceID string,
-) (int, error) {
- var count int
- err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
- if err != nil && err != sql.ErrNoRows {
- return 0, err
- }
-
- return count, nil
-}
-
-// insertEvent inserts an event mapped to its corresponding application service
-// IDs into the db.
-func (s *eventsStatements) insertEvent(
- ctx context.Context,
- appServiceID string,
- event *gomatrixserverlib.HeaderedEvent,
-) (err error) {
- // Convert event to JSON before inserting
- eventJSON, err := json.Marshal(event)
- if err != nil {
- return err
- }
-
- return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
- _, err := s.insertEventStmt.ExecContext(
- ctx,
- appServiceID,
- eventJSON,
- -1, // No transaction ID yet
- )
- return err
- })
-}
-
-// updateTxnIDForEvents sets the transactionID for a collection of events. Done
-// before sending them to an AppService. Referenced before sending to make sure
-// we aren't constructing multiple transactions with the same events.
-func (s *eventsStatements) updateTxnIDForEvents(
- ctx context.Context,
- appserviceID string,
- maxID, txnID int,
-) (err error) {
- return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
- _, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
- return err
- })
-}
-
-// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
-func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
- ctx context.Context,
- appserviceID string,
- eventTableID int,
-) (err error) {
- return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
- _, err := s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
- return err
- })
-}
diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go
deleted file mode 100644
index ad62b362..00000000
--- a/appservice/storage/sqlite3/storage.go
+++ /dev/null
@@ -1,114 +0,0 @@
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sqlite3
-
-import (
- "context"
- "database/sql"
-
- // Import SQLite database driver
- "github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// Database stores events intended to be later sent to application services
-type Database struct {
- events eventsStatements
- txnID txnStatements
- db *sql.DB
- writer sqlutil.Writer
-}
-
-// NewDatabase opens a new database
-func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*Database, error) {
- var result Database
- var err error
- if result.db, result.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
- return nil, err
- }
- if err = result.prepare(); err != nil {
- return nil, err
- }
- return &result, nil
-}
-
-func (d *Database) prepare() error {
- if err := d.events.prepare(d.db, d.writer); err != nil {
- return err
- }
-
- return d.txnID.prepare(d.db, d.writer)
-}
-
-// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
-// for a transaction worker to pull and later send to an application service.
-func (d *Database) StoreEvent(
- ctx context.Context,
- appServiceID string,
- event *gomatrixserverlib.HeaderedEvent,
-) error {
- return d.events.insertEvent(ctx, appServiceID, event)
-}
-
-// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
-// be sent to an application service given its ID.
-func (d *Database) GetEventsWithAppServiceID(
- ctx context.Context,
- appServiceID string,
- limit int,
-) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) {
- return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
-}
-
-// CountEventsWithAppServiceID returns the number of events destined for an
-// application service given its ID.
-func (d *Database) CountEventsWithAppServiceID(
- ctx context.Context,
- appServiceID string,
-) (int, error) {
- return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
-}
-
-// UpdateTxnIDForEvents takes in an application service ID and a
-// and stores them in the DB, unless the pair already exists, in
-// which case it updates them.
-func (d *Database) UpdateTxnIDForEvents(
- ctx context.Context,
- appserviceID string,
- maxID, txnID int,
-) error {
- return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
-}
-
-// RemoveEventsBeforeAndIncludingID removes all events from the database that
-// are less than or equal to a given maximum ID. IDs here are implemented as a
-// serial, thus this should always delete events in chronological order.
-func (d *Database) RemoveEventsBeforeAndIncludingID(
- ctx context.Context,
- appserviceID string,
- eventTableID int,
-) error {
- return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
-}
-
-// GetLatestTxnID returns the latest available transaction id
-func (d *Database) GetLatestTxnID(
- ctx context.Context,
-) (int, error) {
- return d.txnID.selectTxnID(ctx)
-}
diff --git a/appservice/storage/sqlite3/txn_id_counter_table.go b/appservice/storage/sqlite3/txn_id_counter_table.go
deleted file mode 100644
index f2e902f9..00000000
--- a/appservice/storage/sqlite3/txn_id_counter_table.go
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sqlite3
-
-import (
- "context"
- "database/sql"
-
- "github.com/matrix-org/dendrite/internal/sqlutil"
-)
-
-const txnIDSchema = `
--- Keeps a count of the current transaction ID
-CREATE TABLE IF NOT EXISTS appservice_counters (
- name TEXT PRIMARY KEY NOT NULL,
- last_id INTEGER DEFAULT 1
-);
-INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
-`
-
-const selectTxnIDSQL = `
- SELECT last_id FROM appservice_counters WHERE name='txn_id'
-`
-
-const updateTxnIDSQL = `
- UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id'
-`
-
-type txnStatements struct {
- db *sql.DB
- writer sqlutil.Writer
- selectTxnIDStmt *sql.Stmt
- updateTxnIDStmt *sql.Stmt
-}
-
-func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
- s.db = db
- s.writer = writer
- _, err = db.Exec(txnIDSchema)
- if err != nil {
- return
- }
-
- if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
- return
- }
-
- if s.updateTxnIDStmt, err = db.Prepare(updateTxnIDSQL); err != nil {
- return
- }
-
- return
-}
-
-// selectTxnID selects the latest ascending transaction ID
-func (s *txnStatements) selectTxnID(
- ctx context.Context,
-) (txnID int, err error) {
- err = s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
- err := s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
- if err != nil {
- return err
- }
-
- _, err = s.updateTxnIDStmt.ExecContext(ctx)
- return err
- })
- return
-}
diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go
deleted file mode 100644
index 89d5e0cc..00000000
--- a/appservice/storage/storage.go
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-//go:build !wasm
-// +build !wasm
-
-package storage
-
-import (
- "fmt"
-
- "github.com/matrix-org/dendrite/appservice/storage/postgres"
- "github.com/matrix-org/dendrite/appservice/storage/sqlite3"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
-)
-
-// NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme)
-// and sets DB connection parameters
-func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
- switch {
- case dbProperties.ConnectionString.IsSQLite():
- return sqlite3.NewDatabase(base, dbProperties)
- case dbProperties.ConnectionString.IsPostgres():
- return postgres.NewDatabase(base, dbProperties)
- default:
- return nil, fmt.Errorf("unexpected database type")
- }
-}
diff --git a/appservice/storage/storage_wasm.go b/appservice/storage/storage_wasm.go
deleted file mode 100644
index 23025459..00000000
--- a/appservice/storage/storage_wasm.go
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
- "fmt"
-
- "github.com/matrix-org/dendrite/appservice/storage/sqlite3"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
-)
-
-func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
- switch {
- case dbProperties.ConnectionString.IsSQLite():
- return sqlite3.NewDatabase(base, dbProperties)
- case dbProperties.ConnectionString.IsPostgres():
- return nil, fmt.Errorf("can't use Postgres implementation")
- default:
- return nil, fmt.Errorf("unexpected database type")
- }
-}
diff --git a/appservice/types/types.go b/appservice/types/types.go
deleted file mode 100644
index 098face6..00000000
--- a/appservice/types/types.go
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package types
-
-import (
- "sync"
-
- "github.com/matrix-org/dendrite/setup/config"
-)
-
-const (
- // AppServiceDeviceID is the AS dummy device ID
- AppServiceDeviceID = "AS_Device"
-)
-
-// ApplicationServiceWorkerState is a type that couples an application service,
-// a lockable condition as well as some other state variables, allowing the
-// roomserver to notify appservice workers when there are events ready to send
-// externally to application services.
-type ApplicationServiceWorkerState struct {
- AppService config.ApplicationService
- Cond *sync.Cond
- // Events ready to be sent
- EventsReady bool
- // Backoff exponent (2^x secs). Max 6, aka 64s.
- Backoff int
-}
-
-// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
-// in the event queue for this application service worker.
-func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
- a.Cond.L.Lock()
- a.EventsReady = true
- a.Cond.Broadcast()
- a.Cond.L.Unlock()
-}
-
-// FinishEventProcessing marks all events of this worker as being sent to the
-// application service.
-func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
- a.Cond.L.Lock()
- a.EventsReady = false
- a.Cond.L.Unlock()
-}
-
-// WaitForNewEvents causes the calling goroutine to wait on the worker state's
-// condition for a broadcast or similar wakeup, if there are no events ready.
-func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
- a.Cond.L.Lock()
- if !a.EventsReady {
- a.Cond.Wait()
- }
- a.Cond.L.Unlock()
-}
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
deleted file mode 100644
index 4dab00bd..00000000
--- a/appservice/workers/transaction_scheduler.go
+++ /dev/null
@@ -1,236 +0,0 @@
-// Copyright 2018 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package workers
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "math"
- "net/http"
- "net/url"
- "time"
-
- "github.com/matrix-org/dendrite/appservice/storage"
- "github.com/matrix-org/dendrite/appservice/types"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/gomatrixserverlib"
- log "github.com/sirupsen/logrus"
-)
-
-var (
- // Maximum size of events sent in each transaction.
- transactionBatchSize = 50
-)
-
-// SetupTransactionWorkers spawns a separate goroutine for each application
-// service. Each of these "workers" handle taking all events intended for their
-// app service, batch them up into a single transaction (up to a max transaction
-// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
-// handles exponentially backing off in case the AS isn't currently available.
-func SetupTransactionWorkers(
- client *http.Client,
- appserviceDB storage.Database,
- workerStates []types.ApplicationServiceWorkerState,
-) error {
- // Create a worker that handles transmitting events to a single homeserver
- for _, workerState := range workerStates {
- // Don't create a worker if this AS doesn't want to receive events
- if workerState.AppService.URL != "" {
- go worker(client, appserviceDB, workerState)
- }
- }
- return nil
-}
-
-// worker is a goroutine that sends any queued events to the application service
-// it is given.
-func worker(client *http.Client, db storage.Database, ws types.ApplicationServiceWorkerState) {
- log.WithFields(log.Fields{
- "appservice": ws.AppService.ID,
- }).Info("Starting application service")
- ctx := context.Background()
-
- // Initial check for any leftover events to send from last time
- eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": ws.AppService.ID,
- }).WithError(err).Fatal("appservice worker unable to read queued events from DB")
- return
- }
- if eventCount > 0 {
- ws.NotifyNewEvents()
- }
-
- // Loop forever and keep waiting for more events to send
- for {
- // Wait for more events if we've sent all the events in the database
- ws.WaitForNewEvents()
-
- // Batch events up into a transaction
- transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": ws.AppService.ID,
- }).WithError(err).Fatal("appservice worker unable to create transaction")
-
- return
- }
-
- // Send the events off to the application service
- // Backoff if the application service does not respond
- err = send(client, ws.AppService, txnID, transactionJSON)
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": ws.AppService.ID,
- }).WithError(err).Error("unable to send event")
- // Backoff
- backoff(&ws, err)
- continue
- }
-
- // We sent successfully, hooray!
- ws.Backoff = 0
-
- // Transactions have a maximum event size, so there may still be some events
- // left over to send. Keep sending until none are left
- if !eventsRemaining {
- ws.FinishEventProcessing()
- }
-
- // Remove sent events from the DB
- err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": ws.AppService.ID,
- }).WithError(err).Fatal("unable to remove appservice events from the database")
- return
- }
- }
-}
-
-// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
-func backoff(ws *types.ApplicationServiceWorkerState, err error) {
- // Calculate how long to backoff for
- backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
- backoffSeconds := time.Second * backoffDuration
-
- log.WithFields(log.Fields{
- "appservice": ws.AppService.ID,
- }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
- backoffDuration)
-
- ws.Backoff++
- if ws.Backoff > 6 {
- ws.Backoff = 6
- }
-
- // Backoff
- time.Sleep(backoffSeconds)
-}
-
-// createTransaction takes in a slice of AS events, stores them in an AS
-// transaction, and JSON-encodes the results.
-func createTransaction(
- ctx context.Context,
- db storage.Database,
- appserviceID string,
-) (
- transactionJSON []byte,
- txnID, maxID int,
- eventsRemaining bool,
- err error,
-) {
- // Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
- txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
- if err != nil {
- log.WithFields(log.Fields{
- "appservice": appserviceID,
- }).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
-
- return
- }
-
- // Check if these events do not already have a transaction ID
- if txnID == -1 {
- // If not, grab next available ID from the DB
- txnID, err = db.GetLatestTxnID(ctx)
- if err != nil {
- return nil, 0, 0, false, err
- }
-
- // Mark new events with current transactionID
- if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
- return nil, 0, 0, false, err
- }
- }
-
- var ev []*gomatrixserverlib.HeaderedEvent
- for i := range events {
- ev = append(ev, &events[i])
- }
-
- // Create a transaction and store the events inside
- transaction := gomatrixserverlib.ApplicationServiceTransaction{
- Events: gomatrixserverlib.HeaderedToClientEvents(ev, gomatrixserverlib.FormatAll),
- }
-
- transactionJSON, err = json.Marshal(transaction)
- if err != nil {
- return
- }
-
- return
-}
-
-// send sends events to an application service. Returns an error if an OK was not
-// received back from the application service or the request timed out.
-func send(
- client *http.Client,
- appservice config.ApplicationService,
- txnID int,
- transaction []byte,
-) (err error) {
- // PUT a transaction to our AS
- // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
- address := fmt.Sprintf("%s/transactions/%d?access_token=%s", appservice.URL, txnID, url.QueryEscape(appservice.HSToken))
- req, err := http.NewRequest("PUT", address, bytes.NewBuffer(transaction))
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/json")
- resp, err := client.Do(req)
- if err != nil {
- return err
- }
- defer checkNamedErr(resp.Body.Close, &err)
-
- // Check the AS received the events correctly
- if resp.StatusCode != http.StatusOK {
- // TODO: Handle non-200 error codes from application services
- return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
- }
-
- return nil
-}
-
-// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
-func checkNamedErr(fn func() error, err *error) {
- if e := fn(); e != nil && *err == nil {
- *err = e
- }
-}