aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-02-14 14:12:33 +0000
committerGitHub <noreply@github.com>2020-02-14 14:12:33 +0000
commit3dabf4d4ed52459a4661db32c983018c2c73a431 (patch)
treee4052d1abe5ccbe12051116357272c9ae99c9a14 /appservice
parent409fec2a48f454600936eb674da0611566c5a286 (diff)
More SQLite (#871)
* SQLite support for appservice * SQLite support for mediaapi * Copyright notices * SQLite for public rooms API (although with some slight differences in behaviour) * Lazy match aliases, add TODOs
Diffstat (limited to 'appservice')
-rw-r--r--appservice/consumers/roomserver.go4
-rw-r--r--appservice/storage/postgres/appservice_events_table.go (renamed from appservice/storage/appservice_events_table.go)3
-rw-r--r--appservice/storage/postgres/storage.go111
-rw-r--r--appservice/storage/postgres/txn_id_counter_table.go (renamed from appservice/storage/txn_id_counter_table.go)3
-rw-r--r--appservice/storage/sqlite3/appservice_events_table.go249
-rw-r--r--appservice/storage/sqlite3/storage.go111
-rw-r--r--appservice/storage/sqlite3/txn_id_counter_table.go60
-rw-r--r--appservice/storage/storage.go106
-rw-r--r--appservice/workers/transaction_scheduler.go6
9 files changed, 562 insertions, 91 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index b9a56795..6d3ea808 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -34,7 +34,7 @@ import (
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db accounts.Database
- asDB *storage.Database
+ asDB storage.Database
query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI
serverName string
@@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store accounts.Database,
- appserviceDB *storage.Database,
+ appserviceDB storage.Database,
queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
workerStates []types.ApplicationServiceWorkerState,
diff --git a/appservice/storage/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go
index 285bbf48..d72faeea 100644
--- a/appservice/storage/appservice_events_table.go
+++ b/appservice/storage/postgres/appservice_events_table.go
@@ -1,4 +1,5 @@
// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package storage
+package postgres
import (
"context"
diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go
new file mode 100644
index 00000000..c4756468
--- /dev/null
+++ b/appservice/storage/postgres/storage.go
@@ -0,0 +1,111 @@
+// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ // Import postgres database driver
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Database stores events intended to be later sent to application services
+type Database struct {
+ events eventsStatements
+ txnID txnStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ if err := d.events.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.txnID.prepare(d.db)
+}
+
+// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// for a transaction worker to pull and later send to an application service.
+func (d *Database) StoreEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) error {
+ return d.events.insertEvent(ctx, appServiceID, event)
+}
+
+// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
+// be sent to an application service given its ID.
+func (d *Database) GetEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+ limit int,
+) (int, int, []gomatrixserverlib.Event, bool, error) {
+ return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
+}
+
+// CountEventsWithAppServiceID returns the number of events destined for an
+// application service given its ID.
+func (d *Database) CountEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
+}
+
+// UpdateTxnIDForEvents takes in an application service ID and a
+// and stores them in the DB, unless the pair already exists, in
+// which case it updates them.
+func (d *Database) UpdateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) error {
+ return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
+}
+
+// RemoveEventsBeforeAndIncludingID removes all events from the database that
+// are less than or equal to a given maximum ID. IDs here are implemented as a
+// serial, thus this should always delete events in chronological order.
+func (d *Database) RemoveEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) error {
+ return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
+}
+
+// GetLatestTxnID returns the latest available transaction id
+func (d *Database) GetLatestTxnID(
+ ctx context.Context,
+) (int, error) {
+ return d.txnID.selectTxnID(ctx)
+}
diff --git a/appservice/storage/txn_id_counter_table.go b/appservice/storage/postgres/txn_id_counter_table.go
index 7b0fa378..a96a0e36 100644
--- a/appservice/storage/txn_id_counter_table.go
+++ b/appservice/storage/postgres/txn_id_counter_table.go
@@ -1,4 +1,5 @@
// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package storage
+package postgres
import (
"context"
diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go
new file mode 100644
index 00000000..846f09f7
--- /dev/null
+++ b/appservice/storage/sqlite3/appservice_events_table.go
@@ -0,0 +1,249 @@
+// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+const appserviceEventsSchema = `
+-- Stores events to be sent to application services
+CREATE TABLE IF NOT EXISTS appservice_events (
+ -- An auto-incrementing id unique to each event in the table
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ -- The ID of the application service the event will be sent to
+ as_id TEXT NOT NULL,
+ -- JSON representation of the event
+ event_json TEXT NOT NULL,
+ -- The ID of the transaction that this event is a part of
+ txn_id INTEGER NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
+`
+
+const selectEventsByApplicationServiceIDSQL = "" +
+ "SELECT id, event_json, txn_id " +
+ "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
+
+const countEventsByApplicationServiceIDSQL = "" +
+ "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
+
+const insertEventSQL = "" +
+ "INSERT INTO appservice_events(as_id, event_json, txn_id) " +
+ "VALUES ($1, $2, $3)"
+
+const updateTxnIDForEventsSQL = "" +
+ "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
+
+const deleteEventsBeforeAndIncludingIDSQL = "" +
+ "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
+
+const (
+ // A transaction ID number that no transaction should ever have. Used for
+ // checking again the default value.
+ invalidTxnID = -2
+)
+
+type eventsStatements struct {
+ selectEventsByApplicationServiceIDStmt *sql.Stmt
+ countEventsByApplicationServiceIDStmt *sql.Stmt
+ insertEventStmt *sql.Stmt
+ updateTxnIDForEventsStmt *sql.Stmt
+ deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
+}
+
+func (s *eventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(appserviceEventsSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
+ return
+ }
+ if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
+ return
+ }
+ if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectEventsByApplicationServiceID takes in an application service ID and
+// returns a slice of events that need to be sent to that application service,
+// as well as an int later used to remove these same events from the database
+// once successfully sent to an application service.
+func (s *eventsStatements) selectEventsByApplicationServiceID(
+ ctx context.Context,
+ applicationServiceID string,
+ limit int,
+) (
+ txnID, maxID int,
+ events []gomatrixserverlib.Event,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve events from the database. Unsuccessfully sent events first
+ eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
+ if err != nil {
+ return
+ }
+ defer func() {
+ err = eventRows.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": applicationServiceID,
+ }).WithError(err).Fatalf("appservice unable to select new events to send")
+ }
+ }()
+ events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
+ // Get current time for use in calculating event age
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+
+ // Iterate through each row and store event contents
+ // If txn_id changes dramatically, we've switched from collecting old events to
+ // new ones. Send back those events first.
+ lastTxnID := invalidTxnID
+ for eventsProcessed := 0; eventRows.Next(); {
+ var event gomatrixserverlib.Event
+ var eventJSON []byte
+ var id int
+ err = eventRows.Scan(
+ &id,
+ &eventJSON,
+ &txnID,
+ )
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Unmarshal eventJSON
+ if err = json.Unmarshal(eventJSON, &event); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // If txnID has changed on this event from the previous event, then we've
+ // reached the end of a transaction's events. Return only those events.
+ if lastTxnID > invalidTxnID && lastTxnID != txnID {
+ return events, maxID, lastTxnID, true, nil
+ }
+ lastTxnID = txnID
+
+ // Limit events that aren't part of an old transaction
+ if txnID == -1 {
+ // Return if we've hit the limit
+ if eventsProcessed++; eventsProcessed > limit {
+ return events, maxID, lastTxnID, true, nil
+ }
+ }
+
+ if id > maxID {
+ maxID = id
+ }
+
+ // Portion of the event that is unsigned due to rapid change
+ // TODO: Consider removing age as not many app services use it
+ if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ events = append(events, event)
+ }
+
+ return
+}
+
+// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) countEventsByApplicationServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ var count int
+ err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
+ if err != nil && err != sql.ErrNoRows {
+ return 0, err
+ }
+
+ return count, nil
+}
+
+// insertEvent inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) insertEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) (err error) {
+ // Convert event to JSON before inserting
+ eventJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ _, err = s.insertEventStmt.ExecContext(
+ ctx,
+ appServiceID,
+ eventJSON,
+ -1, // No transaction ID yet
+ )
+ return
+}
+
+// updateTxnIDForEvents sets the transactionID for a collection of events. Done
+// before sending them to an AppService. Referenced before sending to make sure
+// we aren't constructing multiple transactions with the same events.
+func (s *eventsStatements) updateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) (err error) {
+ _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
+ return
+}
+
+// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
+func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) (err error) {
+ _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
+ return
+}
diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go
new file mode 100644
index 00000000..56ab55f2
--- /dev/null
+++ b/appservice/storage/sqlite3/storage.go
@@ -0,0 +1,111 @@
+// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ // Import SQLite database driver
+ "github.com/matrix-org/gomatrixserverlib"
+ _ "github.com/mattn/go-sqlite3"
+)
+
+// Database stores events intended to be later sent to application services
+type Database struct {
+ events eventsStatements
+ txnID txnStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ if err := d.events.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.txnID.prepare(d.db)
+}
+
+// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// for a transaction worker to pull and later send to an application service.
+func (d *Database) StoreEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) error {
+ return d.events.insertEvent(ctx, appServiceID, event)
+}
+
+// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
+// be sent to an application service given its ID.
+func (d *Database) GetEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+ limit int,
+) (int, int, []gomatrixserverlib.Event, bool, error) {
+ return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
+}
+
+// CountEventsWithAppServiceID returns the number of events destined for an
+// application service given its ID.
+func (d *Database) CountEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
+}
+
+// UpdateTxnIDForEvents takes in an application service ID and a
+// and stores them in the DB, unless the pair already exists, in
+// which case it updates them.
+func (d *Database) UpdateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) error {
+ return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
+}
+
+// RemoveEventsBeforeAndIncludingID removes all events from the database that
+// are less than or equal to a given maximum ID. IDs here are implemented as a
+// serial, thus this should always delete events in chronological order.
+func (d *Database) RemoveEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) error {
+ return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
+}
+
+// GetLatestTxnID returns the latest available transaction id
+func (d *Database) GetLatestTxnID(
+ ctx context.Context,
+) (int, error) {
+ return d.txnID.selectTxnID(ctx)
+}
diff --git a/appservice/storage/sqlite3/txn_id_counter_table.go b/appservice/storage/sqlite3/txn_id_counter_table.go
new file mode 100644
index 00000000..b1ee6076
--- /dev/null
+++ b/appservice/storage/sqlite3/txn_id_counter_table.go
@@ -0,0 +1,60 @@
+// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+)
+
+const txnIDSchema = `
+-- Keeps a count of the current transaction ID
+CREATE TABLE IF NOT EXISTS appservice_counters (
+ name TEXT PRIMARY KEY NOT NULL,
+ last_id INTEGER DEFAULT 1
+);
+INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
+`
+
+const selectTxnIDSQL = `
+ SELECT last_id FROM appservice_counters WHERE name='txn_id';
+ UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id';
+`
+
+type txnStatements struct {
+ selectTxnIDStmt *sql.Stmt
+}
+
+func (s *txnStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(txnIDSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectTxnID selects the latest ascending transaction ID
+func (s *txnStatements) selectTxnID(
+ ctx context.Context,
+) (txnID int, err error) {
+ err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
+ return
+}
diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go
index b68989fb..ce3776bc 100644
--- a/appservice/storage/storage.go
+++ b/appservice/storage/storage.go
@@ -1,4 +1,4 @@
-// Copyright 2018 New Vector Ltd
+// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,95 +16,33 @@ package storage
import (
"context"
- "database/sql"
+ "net/url"
- // Import postgres database driver
- _ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/appservice/storage/postgres"
+ "github.com/matrix-org/dendrite/appservice/storage/sqlite3"
"github.com/matrix-org/gomatrixserverlib"
)
-// Database stores events intended to be later sent to application services
-type Database struct {
- events eventsStatements
- txnID txnStatements
- db *sql.DB
+type Database interface {
+ StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error
+ GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error)
+ CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
+ UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
+ RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
+ GetLatestTxnID(ctx context.Context) (int, error)
}
-// NewDatabase opens a new database
-func NewDatabase(dataSourceName string) (*Database, error) {
- var result Database
- var err error
- if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
- return nil, err
+func NewDatabase(dataSourceName string) (Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return postgres.NewDatabase(dataSourceName)
}
- if err = result.prepare(); err != nil {
- return nil, err
+ switch uri.Scheme {
+ case "postgres":
+ return postgres.NewDatabase(dataSourceName)
+ case "file":
+ return sqlite3.NewDatabase(dataSourceName)
+ default:
+ return postgres.NewDatabase(dataSourceName)
}
- return &result, nil
-}
-
-func (d *Database) prepare() error {
- if err := d.events.prepare(d.db); err != nil {
- return err
- }
-
- return d.txnID.prepare(d.db)
-}
-
-// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
-// for a transaction worker to pull and later send to an application service.
-func (d *Database) StoreEvent(
- ctx context.Context,
- appServiceID string,
- event *gomatrixserverlib.Event,
-) error {
- return d.events.insertEvent(ctx, appServiceID, event)
-}
-
-// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
-// be sent to an application service given its ID.
-func (d *Database) GetEventsWithAppServiceID(
- ctx context.Context,
- appServiceID string,
- limit int,
-) (int, int, []gomatrixserverlib.Event, bool, error) {
- return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
-}
-
-// CountEventsWithAppServiceID returns the number of events destined for an
-// application service given its ID.
-func (d *Database) CountEventsWithAppServiceID(
- ctx context.Context,
- appServiceID string,
-) (int, error) {
- return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
-}
-
-// UpdateTxnIDForEvents takes in an application service ID and a
-// and stores them in the DB, unless the pair already exists, in
-// which case it updates them.
-func (d *Database) UpdateTxnIDForEvents(
- ctx context.Context,
- appserviceID string,
- maxID, txnID int,
-) error {
- return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
-}
-
-// RemoveEventsBeforeAndIncludingID removes all events from the database that
-// are less than or equal to a given maximum ID. IDs here are implemented as a
-// serial, thus this should always delete events in chronological order.
-func (d *Database) RemoveEventsBeforeAndIncludingID(
- ctx context.Context,
- appserviceID string,
- eventTableID int,
-) error {
- return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
-}
-
-// GetLatestTxnID returns the latest available transaction id
-func (d *Database) GetLatestTxnID(
- ctx context.Context,
-) (int, error) {
- return d.txnID.selectTxnID(ctx)
}
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
index 0330eb9e..faa1e4a9 100644
--- a/appservice/workers/transaction_scheduler.go
+++ b/appservice/workers/transaction_scheduler.go
@@ -43,7 +43,7 @@ var (
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers(
- appserviceDB *storage.Database,
+ appserviceDB storage.Database,
workerStates []types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
@@ -58,7 +58,7 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service
// it is given.
-func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
+func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).Info("starting application service")
@@ -149,7 +149,7 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// transaction, and JSON-encodes the results.
func createTransaction(
ctx context.Context,
- db *storage.Database,
+ db storage.Database,
appserviceID string,
) (
transactionJSON []byte,