aboutsummaryrefslogtreecommitdiff
path: root/appservice/storage
diff options
context:
space:
mode:
Diffstat (limited to 'appservice/storage')
-rw-r--r--appservice/storage/appservice_events_table.go248
-rw-r--r--appservice/storage/storage.go110
-rw-r--r--appservice/storage/txn_id_counter_table.go52
3 files changed, 410 insertions, 0 deletions
diff --git a/appservice/storage/appservice_events_table.go b/appservice/storage/appservice_events_table.go
new file mode 100644
index 00000000..285bbf48
--- /dev/null
+++ b/appservice/storage/appservice_events_table.go
@@ -0,0 +1,248 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+const appserviceEventsSchema = `
+-- Stores events to be sent to application services
+CREATE TABLE IF NOT EXISTS appservice_events (
+ -- An auto-incrementing id unique to each event in the table
+ id BIGSERIAL NOT NULL PRIMARY KEY,
+ -- The ID of the application service the event will be sent to
+ as_id TEXT NOT NULL,
+ -- JSON representation of the event
+ event_json TEXT NOT NULL,
+ -- The ID of the transaction that this event is a part of
+ txn_id BIGINT NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
+`
+
+const selectEventsByApplicationServiceIDSQL = "" +
+ "SELECT id, event_json, txn_id " +
+ "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
+
+const countEventsByApplicationServiceIDSQL = "" +
+ "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
+
+const insertEventSQL = "" +
+ "INSERT INTO appservice_events(as_id, event_json, txn_id) " +
+ "VALUES ($1, $2, $3)"
+
+const updateTxnIDForEventsSQL = "" +
+ "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
+
+const deleteEventsBeforeAndIncludingIDSQL = "" +
+ "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
+
+const (
+ // A transaction ID number that no transaction should ever have. Used for
+ // checking again the default value.
+ invalidTxnID = -2
+)
+
+type eventsStatements struct {
+ selectEventsByApplicationServiceIDStmt *sql.Stmt
+ countEventsByApplicationServiceIDStmt *sql.Stmt
+ insertEventStmt *sql.Stmt
+ updateTxnIDForEventsStmt *sql.Stmt
+ deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
+}
+
+func (s *eventsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(appserviceEventsSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
+ return
+ }
+ if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
+ return
+ }
+ if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
+ return
+ }
+ if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectEventsByApplicationServiceID takes in an application service ID and
+// returns a slice of events that need to be sent to that application service,
+// as well as an int later used to remove these same events from the database
+// once successfully sent to an application service.
+func (s *eventsStatements) selectEventsByApplicationServiceID(
+ ctx context.Context,
+ applicationServiceID string,
+ limit int,
+) (
+ txnID, maxID int,
+ events []gomatrixserverlib.Event,
+ eventsRemaining bool,
+ err error,
+) {
+ // Retrieve events from the database. Unsuccessfully sent events first
+ eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
+ if err != nil {
+ return
+ }
+ defer func() {
+ err = eventRows.Close()
+ if err != nil {
+ log.WithFields(log.Fields{
+ "appservice": applicationServiceID,
+ }).WithError(err).Fatalf("appservice unable to select new events to send")
+ }
+ }()
+ events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
+ // Get current time for use in calculating event age
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+
+ // Iterate through each row and store event contents
+ // If txn_id changes dramatically, we've switched from collecting old events to
+ // new ones. Send back those events first.
+ lastTxnID := invalidTxnID
+ for eventsProcessed := 0; eventRows.Next(); {
+ var event gomatrixserverlib.Event
+ var eventJSON []byte
+ var id int
+ err = eventRows.Scan(
+ &id,
+ &eventJSON,
+ &txnID,
+ )
+ if err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // Unmarshal eventJSON
+ if err = json.Unmarshal(eventJSON, &event); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ // If txnID has changed on this event from the previous event, then we've
+ // reached the end of a transaction's events. Return only those events.
+ if lastTxnID > invalidTxnID && lastTxnID != txnID {
+ return events, maxID, lastTxnID, true, nil
+ }
+ lastTxnID = txnID
+
+ // Limit events that aren't part of an old transaction
+ if txnID == -1 {
+ // Return if we've hit the limit
+ if eventsProcessed++; eventsProcessed > limit {
+ return events, maxID, lastTxnID, true, nil
+ }
+ }
+
+ if id > maxID {
+ maxID = id
+ }
+
+ // Portion of the event that is unsigned due to rapid change
+ // TODO: Consider removing age as not many app services use it
+ if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
+ return nil, 0, 0, false, err
+ }
+
+ events = append(events, event)
+ }
+
+ return
+}
+
+// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) countEventsByApplicationServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ var count int
+ err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
+ if err != nil && err != sql.ErrNoRows {
+ return 0, err
+ }
+
+ return count, nil
+}
+
+// insertEvent inserts an event mapped to its corresponding application service
+// IDs into the db.
+func (s *eventsStatements) insertEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) (err error) {
+ // Convert event to JSON before inserting
+ eventJSON, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ _, err = s.insertEventStmt.ExecContext(
+ ctx,
+ appServiceID,
+ eventJSON,
+ -1, // No transaction ID yet
+ )
+ return
+}
+
+// updateTxnIDForEvents sets the transactionID for a collection of events. Done
+// before sending them to an AppService. Referenced before sending to make sure
+// we aren't constructing multiple transactions with the same events.
+func (s *eventsStatements) updateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) (err error) {
+ _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
+ return
+}
+
+// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
+func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) (err error) {
+ _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
+ return
+}
diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go
new file mode 100644
index 00000000..b68989fb
--- /dev/null
+++ b/appservice/storage/storage.go
@@ -0,0 +1,110 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ // Import postgres database driver
+ _ "github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Database stores events intended to be later sent to application services
+type Database struct {
+ events eventsStatements
+ txnID txnStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ if err := d.events.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.txnID.prepare(d.db)
+}
+
+// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
+// for a transaction worker to pull and later send to an application service.
+func (d *Database) StoreEvent(
+ ctx context.Context,
+ appServiceID string,
+ event *gomatrixserverlib.Event,
+) error {
+ return d.events.insertEvent(ctx, appServiceID, event)
+}
+
+// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
+// be sent to an application service given its ID.
+func (d *Database) GetEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+ limit int,
+) (int, int, []gomatrixserverlib.Event, bool, error) {
+ return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
+}
+
+// CountEventsWithAppServiceID returns the number of events destined for an
+// application service given its ID.
+func (d *Database) CountEventsWithAppServiceID(
+ ctx context.Context,
+ appServiceID string,
+) (int, error) {
+ return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
+}
+
+// UpdateTxnIDForEvents takes in an application service ID and a
+// and stores them in the DB, unless the pair already exists, in
+// which case it updates them.
+func (d *Database) UpdateTxnIDForEvents(
+ ctx context.Context,
+ appserviceID string,
+ maxID, txnID int,
+) error {
+ return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
+}
+
+// RemoveEventsBeforeAndIncludingID removes all events from the database that
+// are less than or equal to a given maximum ID. IDs here are implemented as a
+// serial, thus this should always delete events in chronological order.
+func (d *Database) RemoveEventsBeforeAndIncludingID(
+ ctx context.Context,
+ appserviceID string,
+ eventTableID int,
+) error {
+ return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
+}
+
+// GetLatestTxnID returns the latest available transaction id
+func (d *Database) GetLatestTxnID(
+ ctx context.Context,
+) (int, error) {
+ return d.txnID.selectTxnID(ctx)
+}
diff --git a/appservice/storage/txn_id_counter_table.go b/appservice/storage/txn_id_counter_table.go
new file mode 100644
index 00000000..7b0fa378
--- /dev/null
+++ b/appservice/storage/txn_id_counter_table.go
@@ -0,0 +1,52 @@
+// Copyright 2018 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+)
+
+const txnIDSchema = `
+-- Keeps a count of the current transaction ID
+CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
+`
+
+const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
+
+type txnStatements struct {
+ selectTxnIDStmt *sql.Stmt
+}
+
+func (s *txnStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(txnIDSchema)
+ if err != nil {
+ return
+ }
+
+ if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
+ return
+ }
+
+ return
+}
+
+// selectTxnID selects the latest ascending transaction ID
+func (s *txnStatements) selectTxnID(
+ ctx context.Context,
+) (txnID int, err error) {
+ err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
+ return
+}