diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-02-14 14:12:33 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-14 14:12:33 +0000 |
commit | 3dabf4d4ed52459a4661db32c983018c2c73a431 (patch) | |
tree | e4052d1abe5ccbe12051116357272c9ae99c9a14 /appservice | |
parent | 409fec2a48f454600936eb674da0611566c5a286 (diff) |
More SQLite (#871)
* SQLite support for appservice
* SQLite support for mediaapi
* Copyright notices
* SQLite for public rooms API (although with some slight differences in behaviour)
* Lazy match aliases, add TODOs
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/consumers/roomserver.go | 4 | ||||
-rw-r--r-- | appservice/storage/postgres/appservice_events_table.go (renamed from appservice/storage/appservice_events_table.go) | 3 | ||||
-rw-r--r-- | appservice/storage/postgres/storage.go | 111 | ||||
-rw-r--r-- | appservice/storage/postgres/txn_id_counter_table.go (renamed from appservice/storage/txn_id_counter_table.go) | 3 | ||||
-rw-r--r-- | appservice/storage/sqlite3/appservice_events_table.go | 249 | ||||
-rw-r--r-- | appservice/storage/sqlite3/storage.go | 111 | ||||
-rw-r--r-- | appservice/storage/sqlite3/txn_id_counter_table.go | 60 | ||||
-rw-r--r-- | appservice/storage/storage.go | 106 | ||||
-rw-r--r-- | appservice/workers/transaction_scheduler.go | 6 |
9 files changed, 562 insertions, 91 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index b9a56795..6d3ea808 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,7 +34,7 @@ import ( type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db accounts.Database - asDB *storage.Database + asDB storage.Database query api.RoomserverQueryAPI alias api.RoomserverAliasAPI serverName string @@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store accounts.Database, - appserviceDB *storage.Database, + appserviceDB storage.Database, queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, workerStates []types.ApplicationServiceWorkerState, diff --git a/appservice/storage/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go index 285bbf48..d72faeea 100644 --- a/appservice/storage/appservice_events_table.go +++ b/appservice/storage/postgres/appservice_events_table.go @@ -1,4 +1,5 @@ // Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go new file mode 100644 index 00000000..c4756468 --- /dev/null +++ b/appservice/storage/postgres/storage.go @@ -0,0 +1,111 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + // Import postgres database driver + _ "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) (int, int, []gomatrixserverlib.Event, bool, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. +func (d *Database) RemoveEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) error { + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) +} + +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( + ctx context.Context, +) (int, error) { + return d.txnID.selectTxnID(ctx) +} diff --git a/appservice/storage/txn_id_counter_table.go b/appservice/storage/postgres/txn_id_counter_table.go index 7b0fa378..a96a0e36 100644 --- a/appservice/storage/txn_id_counter_table.go +++ b/appservice/storage/postgres/txn_id_counter_table.go @@ -1,4 +1,5 @@ // Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go new file mode 100644 index 00000000..846f09f7 --- /dev/null +++ b/appservice/storage/sqlite3/appservice_events_table.go @@ -0,0 +1,249 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +const appserviceEventsSchema = ` +-- Stores events to be sent to application services +CREATE TABLE IF NOT EXISTS appservice_events ( + -- An auto-incrementing id unique to each event in the table + id INTEGER PRIMARY KEY AUTOINCREMENT, + -- The ID of the application service the event will be sent to + as_id TEXT NOT NULL, + -- JSON representation of the event + event_json TEXT NOT NULL, + -- The ID of the transaction that this event is a part of + txn_id INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); +` + +const selectEventsByApplicationServiceIDSQL = "" + + "SELECT id, event_json, txn_id " + + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" + +const countEventsByApplicationServiceIDSQL = "" + + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" + +const insertEventSQL = "" + + "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "VALUES ($1, $2, $3)" + +const updateTxnIDForEventsSQL = "" + + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + +const deleteEventsBeforeAndIncludingIDSQL = "" + + "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" + +const ( + // A transaction ID number that no transaction should ever have. Used for + // checking again the default value. + invalidTxnID = -2 +) + +type eventsStatements struct { + selectEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt +} + +func (s *eventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(appserviceEventsSchema) + if err != nil { + return + } + + if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil { + return + } + if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { + return + } + + return +} + +// selectEventsByApplicationServiceID takes in an application service ID and +// returns a slice of events that need to be sent to that application service, +// as well as an int later used to remove these same events from the database +// once successfully sent to an application service. +func (s *eventsStatements) selectEventsByApplicationServiceID( + ctx context.Context, + applicationServiceID string, + limit int, +) ( + txnID, maxID int, + events []gomatrixserverlib.Event, + eventsRemaining bool, + err error, +) { + // Retrieve events from the database. Unsuccessfully sent events first + eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) + if err != nil { + return + } + defer func() { + err = eventRows.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": applicationServiceID, + }).WithError(err).Fatalf("appservice unable to select new events to send") + } + }() + events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit) + if err != nil { + return + } + + return +} + +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { + // Get current time for use in calculating event age + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + + // Iterate through each row and store event contents + // If txn_id changes dramatically, we've switched from collecting old events to + // new ones. Send back those events first. + lastTxnID := invalidTxnID + for eventsProcessed := 0; eventRows.Next(); { + var event gomatrixserverlib.Event + var eventJSON []byte + var id int + err = eventRows.Scan( + &id, + &eventJSON, + &txnID, + ) + if err != nil { + return nil, 0, 0, false, err + } + + // Unmarshal eventJSON + if err = json.Unmarshal(eventJSON, &event); err != nil { + return nil, 0, 0, false, err + } + + // If txnID has changed on this event from the previous event, then we've + // reached the end of a transaction's events. Return only those events. + if lastTxnID > invalidTxnID && lastTxnID != txnID { + return events, maxID, lastTxnID, true, nil + } + lastTxnID = txnID + + // Limit events that aren't part of an old transaction + if txnID == -1 { + // Return if we've hit the limit + if eventsProcessed++; eventsProcessed > limit { + return events, maxID, lastTxnID, true, nil + } + } + + if id > maxID { + maxID = id + } + + // Portion of the event that is unsigned due to rapid change + // TODO: Consider removing age as not many app services use it + if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { + return nil, 0, 0, false, err + } + + events = append(events, event) + } + + return +} + +// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) countEventsByApplicationServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + var count int + err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + + return count, nil +} + +// insertEvent inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) insertEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) (err error) { + // Convert event to JSON before inserting + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + + _, err = s.insertEventStmt.ExecContext( + ctx, + appServiceID, + eventJSON, + -1, // No transaction ID yet + ) + return +} + +// updateTxnIDForEvents sets the transactionID for a collection of events. Done +// before sending them to an AppService. Referenced before sending to make sure +// we aren't constructing multiple transactions with the same events. +func (s *eventsStatements) updateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) (err error) { + _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + return +} + +// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. +func (s *eventsStatements) deleteEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) (err error) { + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID) + return +} diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go new file mode 100644 index 00000000..56ab55f2 --- /dev/null +++ b/appservice/storage/sqlite3/storage.go @@ -0,0 +1,111 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + // Import SQLite database driver + "github.com/matrix-org/gomatrixserverlib" + _ "github.com/mattn/go-sqlite3" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) (int, int, []gomatrixserverlib.Event, bool, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. +func (d *Database) RemoveEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) error { + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) +} + +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( + ctx context.Context, +) (int, error) { + return d.txnID.selectTxnID(ctx) +} diff --git a/appservice/storage/sqlite3/txn_id_counter_table.go b/appservice/storage/sqlite3/txn_id_counter_table.go new file mode 100644 index 00000000..b1ee6076 --- /dev/null +++ b/appservice/storage/sqlite3/txn_id_counter_table.go @@ -0,0 +1,60 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" +) + +const txnIDSchema = ` +-- Keeps a count of the current transaction ID +CREATE TABLE IF NOT EXISTS appservice_counters ( + name TEXT PRIMARY KEY NOT NULL, + last_id INTEGER DEFAULT 1 +); +INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1); +` + +const selectTxnIDSQL = ` + SELECT last_id FROM appservice_counters WHERE name='txn_id'; + UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id'; +` + +type txnStatements struct { + selectTxnIDStmt *sql.Stmt +} + +func (s *txnStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(txnIDSchema) + if err != nil { + return + } + + if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { + return + } + + return +} + +// selectTxnID selects the latest ascending transaction ID +func (s *txnStatements) selectTxnID( + ctx context.Context, +) (txnID int, err error) { + err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID) + return +} diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go index b68989fb..ce3776bc 100644 --- a/appservice/storage/storage.go +++ b/appservice/storage/storage.go @@ -1,4 +1,4 @@ -// Copyright 2018 New Vector Ltd +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,95 +16,33 @@ package storage import ( "context" - "database/sql" + "net/url" - // Import postgres database driver - _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/appservice/storage/postgres" + "github.com/matrix-org/dendrite/appservice/storage/sqlite3" "github.com/matrix-org/gomatrixserverlib" ) -// Database stores events intended to be later sent to application services -type Database struct { - events eventsStatements - txnID txnStatements - db *sql.DB +type Database interface { + StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error + GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error) + CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error) + UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error + RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error + GetLatestTxnID(ctx context.Context) (int, error) } -// NewDatabase opens a new database -func NewDatabase(dataSourceName string) (*Database, error) { - var result Database - var err error - if result.db, err = sql.Open("postgres", dataSourceName); err != nil { - return nil, err +func NewDatabase(dataSourceName string) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return postgres.NewDatabase(dataSourceName) } - if err = result.prepare(); err != nil { - return nil, err + switch uri.Scheme { + case "postgres": + return postgres.NewDatabase(dataSourceName) + case "file": + return sqlite3.NewDatabase(dataSourceName) + default: + return postgres.NewDatabase(dataSourceName) } - return &result, nil -} - -func (d *Database) prepare() error { - if err := d.events.prepare(d.db); err != nil { - return err - } - - return d.txnID.prepare(d.db) -} - -// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database -// for a transaction worker to pull and later send to an application service. -func (d *Database) StoreEvent( - ctx context.Context, - appServiceID string, - event *gomatrixserverlib.Event, -) error { - return d.events.insertEvent(ctx, appServiceID, event) -} - -// GetEventsWithAppServiceID returns a slice of events and their IDs intended to -// be sent to an application service given its ID. -func (d *Database) GetEventsWithAppServiceID( - ctx context.Context, - appServiceID string, - limit int, -) (int, int, []gomatrixserverlib.Event, bool, error) { - return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) -} - -// CountEventsWithAppServiceID returns the number of events destined for an -// application service given its ID. -func (d *Database) CountEventsWithAppServiceID( - ctx context.Context, - appServiceID string, -) (int, error) { - return d.events.countEventsByApplicationServiceID(ctx, appServiceID) -} - -// UpdateTxnIDForEvents takes in an application service ID and a -// and stores them in the DB, unless the pair already exists, in -// which case it updates them. -func (d *Database) UpdateTxnIDForEvents( - ctx context.Context, - appserviceID string, - maxID, txnID int, -) error { - return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) -} - -// RemoveEventsBeforeAndIncludingID removes all events from the database that -// are less than or equal to a given maximum ID. IDs here are implemented as a -// serial, thus this should always delete events in chronological order. -func (d *Database) RemoveEventsBeforeAndIncludingID( - ctx context.Context, - appserviceID string, - eventTableID int, -) error { - return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) -} - -// GetLatestTxnID returns the latest available transaction id -func (d *Database) GetLatestTxnID( - ctx context.Context, -) (int, error) { - return d.txnID.selectTxnID(ctx) } diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index 0330eb9e..faa1e4a9 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -43,7 +43,7 @@ var ( // size), then send that off to the AS's /transactions/{txnID} endpoint. It also // handles exponentially backing off in case the AS isn't currently available. func SetupTransactionWorkers( - appserviceDB *storage.Database, + appserviceDB storage.Database, workerStates []types.ApplicationServiceWorkerState, ) error { // Create a worker that handles transmitting events to a single homeserver @@ -58,7 +58,7 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. -func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { +func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, }).Info("starting application service") @@ -149,7 +149,7 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) { // transaction, and JSON-encodes the results. func createTransaction( ctx context.Context, - db *storage.Database, + db storage.Database, appserviceID string, ) ( transactionJSON []byte, |