aboutsummaryrefslogtreecommitdiff
path: root/roomserver/storage/postgres
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/storage/postgres')
-rw-r--r--roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go1
-rw-r--r--roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go223
-rw-r--r--roomserver/storage/postgres/event_json_table.go12
-rw-r--r--roomserver/storage/postgres/event_state_keys_table.go12
-rw-r--r--roomserver/storage/postgres/event_types_table.go11
-rw-r--r--roomserver/storage/postgres/events_table.go60
-rw-r--r--roomserver/storage/postgres/invite_table.go11
-rw-r--r--roomserver/storage/postgres/membership_table.go16
-rw-r--r--roomserver/storage/postgres/previous_events_table.go11
-rw-r--r--roomserver/storage/postgres/published_table.go12
-rw-r--r--roomserver/storage/postgres/redactions_table.go11
-rw-r--r--roomserver/storage/postgres/room_aliases_table.go12
-rw-r--r--roomserver/storage/postgres/rooms_table.go12
-rw-r--r--roomserver/storage/postgres/state_block_table.go214
-rw-r--r--roomserver/storage/postgres/state_snapshot_table.go52
-rw-r--r--roomserver/storage/postgres/storage.go94
-rw-r--r--roomserver/storage/postgres/transactions_table.go11
17 files changed, 504 insertions, 271 deletions
diff --git a/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go b/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go
index 733f0fa1..f3bd8632 100644
--- a/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go
+++ b/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go
@@ -24,6 +24,7 @@ import (
func LoadFromGoose() {
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
+ goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
diff --git a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go
new file mode 100644
index 00000000..84da9614
--- /dev/null
+++ b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go
@@ -0,0 +1,223 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
+)
+
+type stateSnapshotData struct {
+ StateSnapshotNID types.StateSnapshotNID
+ RoomNID types.RoomNID
+}
+
+type stateBlockData struct {
+ stateSnapshotData
+ StateBlockNID types.StateBlockNID
+ EventNIDs types.EventNIDs
+}
+
+func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
+ m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
+}
+
+// nolint:gocyclo
+func UpStateBlocksRefactor(tx *sql.Tx) error {
+ logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
+ defer logrus.Warn("State storage upgrade complete")
+
+ var snapshotcount int
+ var maxsnapshotid int
+ var maxblockid int
+ if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil {
+ return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
+ }
+ if err := tx.QueryRow(`SELECT COALESCE(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
+ return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
+ }
+ if err := tx.QueryRow(`SELECT COALESCE(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
+ return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
+ }
+ maxsnapshotid++
+ maxblockid++
+
+ if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
+ return fmt.Errorf("tx.Exec: %w", err)
+ }
+ if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
+ return fmt.Errorf("tx.Exec: %w", err)
+ }
+ // We create new sequences starting with the maximum state snapshot and block NIDs.
+ // This means that all newly created snapshots and blocks by the migration will have
+ // NIDs higher than these values, so that when we come to update the references to
+ // these NIDs using UPDATE statements, we can guarantee we are only ever updating old
+ // values and not accidentally overwriting new ones.
+ if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_block_nid_sequence START WITH %d;`, maxblockid)); err != nil {
+ return fmt.Errorf("tx.Exec: %w", err)
+ }
+ if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_snapshot_nid_sequence START WITH %d;`, maxsnapshotid)); err != nil {
+ return fmt.Errorf("tx.Exec: %w", err)
+ }
+ _, err := tx.Exec(`
+ CREATE TABLE IF NOT EXISTS roomserver_state_block (
+ state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_sequence'),
+ state_block_hash BYTEA UNIQUE,
+ event_nids bigint[] NOT NULL
+ );
+ `)
+ if err != nil {
+ return fmt.Errorf("tx.Exec (create blocks table): %w", err)
+ }
+ _, err = tx.Exec(`
+ CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
+ state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_sequence'),
+ state_snapshot_hash BYTEA UNIQUE,
+ room_nid bigint NOT NULL,
+ state_block_nids bigint[] NOT NULL
+ );
+ `)
+ if err != nil {
+ return fmt.Errorf("tx.Exec (create snapshots table): %w", err)
+ }
+ logrus.Warn("New tables created...")
+
+ batchsize := 100
+ for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize {
+ var snapshotrows *sql.Rows
+ snapshotrows, err = tx.Query(`
+ SELECT
+ state_snapshot_nid,
+ room_nid,
+ state_block_nid,
+ ARRAY_AGG(event_nid) AS event_nids
+ FROM (
+ SELECT
+ _roomserver_state_snapshots.state_snapshot_nid,
+ _roomserver_state_snapshots.room_nid,
+ _roomserver_state_block.state_block_nid,
+ _roomserver_state_block.event_nid
+ FROM
+ _roomserver_state_snapshots
+ JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
+ WHERE
+ _roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
+ _roomserver_state_snapshots.state_snapshot_nid
+ FROM
+ _roomserver_state_snapshots
+ LIMIT $1 OFFSET $2)) AS _roomserver_state_block
+ GROUP BY
+ state_snapshot_nid,
+ room_nid,
+ state_block_nid;
+ `, batchsize, batchoffset)
+ if err != nil {
+ return fmt.Errorf("tx.Query: %w", err)
+ }
+
+ logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount)
+ var snapshots []stateBlockData
+
+ for snapshotrows.Next() {
+ var snapshot stateBlockData
+ var eventsarray pq.Int64Array
+ if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil {
+ return fmt.Errorf("rows.Scan: %w", err)
+ }
+ for _, e := range eventsarray {
+ snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e))
+ }
+ snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)]
+ snapshots = append(snapshots, snapshot)
+ }
+
+ if err = snapshotrows.Close(); err != nil {
+ return fmt.Errorf("snapshots.Close: %w", err)
+ }
+
+ newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{}
+
+ for _, snapshot := range snapshots {
+ var eventsarray pq.Int64Array
+ for _, e := range snapshot.EventNIDs {
+ eventsarray = append(eventsarray, int64(e))
+ }
+
+ var blocknid types.StateBlockNID
+ err = tx.QueryRow(`
+ INSERT INTO roomserver_state_block (state_block_hash, event_nids)
+ VALUES ($1, $2)
+ ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2
+ RETURNING state_block_nid
+ `, snapshot.EventNIDs.Hash(), eventsarray).Scan(&blocknid)
+ if err != nil {
+ return fmt.Errorf("tx.QueryRow.Scan (insert new block with %d events): %w", len(eventsarray), err)
+ }
+ index := stateSnapshotData{snapshot.StateSnapshotNID, snapshot.RoomNID}
+ newsnapshots[index] = append(newsnapshots[index], blocknid)
+ }
+
+ for snapshotdata, newblocks := range newsnapshots {
+ var newblocksarray pq.Int64Array
+ for _, b := range newblocks {
+ newblocksarray = append(newblocksarray, int64(b))
+ }
+
+ var newNID types.StateSnapshotNID
+ err = tx.QueryRow(`
+ INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)
+ VALUES ($1, $2, $3)
+ ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2
+ RETURNING state_snapshot_nid
+ `, newblocks.Hash(), snapshotdata.RoomNID, newblocksarray).Scan(&newNID)
+ if err != nil {
+ return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
+ }
+
+ if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
+ return fmt.Errorf("tx.Exec (update events): %w", err)
+ }
+
+ if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
+ return fmt.Errorf("tx.Exec (update rooms): %w", err)
+ }
+ }
+ }
+
+ if _, err = tx.Exec(`
+ DROP TABLE _roomserver_state_snapshots;
+ DROP SEQUENCE roomserver_state_snapshot_nid_seq;
+ `); err != nil {
+ return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
+ }
+ if _, err = tx.Exec(`
+ DROP TABLE _roomserver_state_block;
+ DROP SEQUENCE roomserver_state_block_nid_seq;
+ `); err != nil {
+ return fmt.Errorf("tx.Exec (delete old block table): %w", err)
+ }
+
+ return nil
+}
+
+func DownStateBlocksRefactor(tx *sql.Tx) error {
+ panic("Downgrading state storage is not supported")
+}
diff --git a/roomserver/storage/postgres/event_json_table.go b/roomserver/storage/postgres/event_json_table.go
index 8f11d1d8..e0976b12 100644
--- a/roomserver/storage/postgres/event_json_table.go
+++ b/roomserver/storage/postgres/event_json_table.go
@@ -59,12 +59,14 @@ type eventJSONStatements struct {
bulkSelectEventJSONStmt *sql.Stmt
}
-func NewPostgresEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
- s := &eventJSONStatements{}
+func createEventJSONTable(db *sql.DB) error {
_, err := db.Exec(eventJSONSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
+ s := &eventJSONStatements{}
+
return s, shared.StatementList{
{&s.insertEventJSONStmt, insertEventJSONSQL},
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
diff --git a/roomserver/storage/postgres/event_state_keys_table.go b/roomserver/storage/postgres/event_state_keys_table.go
index 500ff20e..61682356 100644
--- a/roomserver/storage/postgres/event_state_keys_table.go
+++ b/roomserver/storage/postgres/event_state_keys_table.go
@@ -77,12 +77,14 @@ type eventStateKeyStatements struct {
bulkSelectEventStateKeyStmt *sql.Stmt
}
-func NewPostgresEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
- s := &eventStateKeyStatements{}
+func createEventStateKeysTable(db *sql.DB) error {
_, err := db.Exec(eventStateKeysSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
+ s := &eventStateKeyStatements{}
+
return s, shared.StatementList{
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
diff --git a/roomserver/storage/postgres/event_types_table.go b/roomserver/storage/postgres/event_types_table.go
index 02d6ad07..f4257850 100644
--- a/roomserver/storage/postgres/event_types_table.go
+++ b/roomserver/storage/postgres/event_types_table.go
@@ -100,12 +100,13 @@ type eventTypeStatements struct {
bulkSelectEventTypeNIDStmt *sql.Stmt
}
-func NewPostgresEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
- s := &eventTypeStatements{}
+func createEventTypesTable(db *sql.DB) error {
_, err := db.Exec(eventTypesSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
+ s := &eventTypeStatements{}
return s, shared.StatementList{
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go
index 0cf0bd22..88c82083 100644
--- a/roomserver/storage/postgres/events_table.go
+++ b/roomserver/storage/postgres/events_table.go
@@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"fmt"
+ "sort"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
@@ -88,6 +89,16 @@ const bulkSelectStateEventByIDSQL = "" +
" WHERE event_id = ANY($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
+// Bulk look up of events by event NID, optionally filtering by the event type
+// or event state key NIDs if provided. (The CARDINALITY check will return true
+// if the provided arrays are empty, ergo no filtering).
+const bulkSelectStateEventByNIDSQL = "" +
+ "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
+ " WHERE event_nid = ANY($1)" +
+ " AND (CARDINALITY($2::bigint[]) = 0 OR event_type_nid = ANY($2))" +
+ " AND (CARDINALITY($3::bigint[]) = 0 OR event_state_key_nid = ANY($3))" +
+ " ORDER BY event_type_nid, event_state_key_nid ASC"
+
const bulkSelectStateAtEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
" WHERE event_id = ANY($1)"
@@ -127,6 +138,7 @@ type eventStatements struct {
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
+ bulkSelectStateEventByNIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
@@ -140,17 +152,19 @@ type eventStatements struct {
selectRoomNIDsForEventNIDsStmt *sql.Stmt
}
-func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
- s := &eventStatements{}
+func createEventsTable(db *sql.DB) error {
_, err := db.Exec(eventsSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareEventsTable(db *sql.DB) (tables.Events, error) {
+ s := &eventStatements{}
return s, shared.StatementList{
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
+ {&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL},
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
@@ -238,6 +252,42 @@ func (s *eventStatements) BulkSelectStateEventByID(
return results, nil
}
+// bulkSelectStateEventByNID lookups a list of state events by event NID.
+// If any of the requested events are missing from the database it returns a types.MissingEventError
+func (s *eventStatements) BulkSelectStateEventByNID(
+ ctx context.Context, eventNIDs []types.EventNID,
+ stateKeyTuples []types.StateKeyTuple,
+) ([]types.StateEntry, error) {
+ tuples := stateKeyTupleSorter(stateKeyTuples)
+ sort.Sort(tuples)
+ eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
+ rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs), eventTypeNIDArray, eventStateKeyNIDArray)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
+ // We know that we will only get as many results as event IDs
+ // because of the unique constraint on event IDs.
+ // So we can allocate an array of the correct size now.
+ // We might get fewer results than IDs so we adjust the length of the slice before returning it.
+ results := make([]types.StateEntry, len(eventNIDs))
+ i := 0
+ for ; rows.Next(); i++ {
+ result := &results[i]
+ if err = rows.Scan(
+ &result.EventTypeNID,
+ &result.EventStateKeyNID,
+ &result.EventNID,
+ ); err != nil {
+ return nil, err
+ }
+ }
+ if err = rows.Err(); err != nil {
+ return nil, err
+ }
+ return results[:i], nil
+}
+
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
diff --git a/roomserver/storage/postgres/invite_table.go b/roomserver/storage/postgres/invite_table.go
index bb719516..0a2183e2 100644
--- a/roomserver/storage/postgres/invite_table.go
+++ b/roomserver/storage/postgres/invite_table.go
@@ -82,12 +82,13 @@ type inviteStatements struct {
updateInviteRetiredStmt *sql.Stmt
}
-func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
- s := &inviteStatements{}
+func createInvitesTable(db *sql.DB) error {
_, err := db.Exec(inviteSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareInvitesTable(db *sql.DB) (tables.Invites, error) {
+ s := &inviteStatements{}
return s, shared.StatementList{
{&s.insertInviteEventStmt, insertInviteEventSQL},
diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go
index e392a4fb..3466da6d 100644
--- a/roomserver/storage/postgres/membership_table.go
+++ b/roomserver/storage/postgres/membership_table.go
@@ -139,12 +139,13 @@ type membershipStatements struct {
updateMembershipForgetRoomStmt *sql.Stmt
}
-func NewPostgresMembershipTable(db *sql.DB) (tables.Membership, error) {
- s := &membershipStatements{}
+func createMembershipTable(db *sql.DB) error {
_, err := db.Exec(membershipSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareMembershipTable(db *sql.DB) (tables.Membership, error) {
+ s := &membershipStatements{}
return s, shared.StatementList{
{&s.insertMembershipStmt, insertMembershipSQL},
@@ -162,11 +163,6 @@ func NewPostgresMembershipTable(db *sql.DB) (tables.Membership, error) {
}.Prepare(db)
}
-func (s *membershipStatements) execSchema(db *sql.DB) error {
- _, err := db.Exec(membershipSchema)
- return err
-}
-
func (s *membershipStatements) InsertMembership(
ctx context.Context,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
diff --git a/roomserver/storage/postgres/previous_events_table.go b/roomserver/storage/postgres/previous_events_table.go
index 1a4ba673..4a93c3d6 100644
--- a/roomserver/storage/postgres/previous_events_table.go
+++ b/roomserver/storage/postgres/previous_events_table.go
@@ -65,12 +65,13 @@ type previousEventStatements struct {
selectPreviousEventExistsStmt *sql.Stmt
}
-func NewPostgresPreviousEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
- s := &previousEventStatements{}
+func createPrevEventsTable(db *sql.DB) error {
_, err := db.Exec(previousEventSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func preparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
+ s := &previousEventStatements{}
return s, shared.StatementList{
{&s.insertPreviousEventStmt, insertPreviousEventSQL},
diff --git a/roomserver/storage/postgres/published_table.go b/roomserver/storage/postgres/published_table.go
index 440ae784..c180576e 100644
--- a/roomserver/storage/postgres/published_table.go
+++ b/roomserver/storage/postgres/published_table.go
@@ -50,12 +50,14 @@ type publishedStatements struct {
selectPublishedStmt *sql.Stmt
}
-func NewPostgresPublishedTable(db *sql.DB) (tables.Published, error) {
- s := &publishedStatements{}
+func createPublishedTable(db *sql.DB) error {
_, err := db.Exec(publishedSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func preparePublishedTable(db *sql.DB) (tables.Published, error) {
+ s := &publishedStatements{}
+
return s, shared.StatementList{
{&s.upsertPublishedStmt, upsertPublishedSQL},
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
diff --git a/roomserver/storage/postgres/redactions_table.go b/roomserver/storage/postgres/redactions_table.go
index 42aba598..3741d5f6 100644
--- a/roomserver/storage/postgres/redactions_table.go
+++ b/roomserver/storage/postgres/redactions_table.go
@@ -60,12 +60,13 @@ type redactionStatements struct {
markRedactionValidatedStmt *sql.Stmt
}
-func NewPostgresRedactionsTable(db *sql.DB) (tables.Redactions, error) {
- s := &redactionStatements{}
+func createRedactionsTable(db *sql.DB) error {
_, err := db.Exec(redactionsSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareRedactionsTable(db *sql.DB) (tables.Redactions, error) {
+ s := &redactionStatements{}
return s, shared.StatementList{
{&s.insertRedactionStmt, insertRedactionSQL},
diff --git a/roomserver/storage/postgres/room_aliases_table.go b/roomserver/storage/postgres/room_aliases_table.go
index b603a673..c808813e 100644
--- a/roomserver/storage/postgres/room_aliases_table.go
+++ b/roomserver/storage/postgres/room_aliases_table.go
@@ -62,12 +62,14 @@ type roomAliasesStatements struct {
deleteRoomAliasStmt *sql.Stmt
}
-func NewPostgresRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
- s := &roomAliasesStatements{}
+func createRoomAliasesTable(db *sql.DB) error {
_, err := db.Exec(roomAliasesSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
+ s := &roomAliasesStatements{}
+
return s, shared.StatementList{
{&s.insertRoomAliasStmt, insertRoomAliasSQL},
{&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL},
diff --git a/roomserver/storage/postgres/rooms_table.go b/roomserver/storage/postgres/rooms_table.go
index 637680bd..f2b39fe5 100644
--- a/roomserver/storage/postgres/rooms_table.go
+++ b/roomserver/storage/postgres/rooms_table.go
@@ -96,12 +96,14 @@ type roomStatements struct {
bulkSelectRoomNIDsStmt *sql.Stmt
}
-func NewPostgresRoomsTable(db *sql.DB) (tables.Rooms, error) {
- s := &roomStatements{}
+func createRoomsTable(db *sql.DB) error {
_, err := db.Exec(roomsSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
+ s := &roomStatements{}
+
return s, shared.StatementList{
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
diff --git a/roomserver/storage/postgres/state_block_table.go b/roomserver/storage/postgres/state_block_table.go
index d618686f..4523d18b 100644
--- a/roomserver/storage/postgres/state_block_table.go
+++ b/roomserver/storage/postgres/state_block_table.go
@@ -41,141 +41,88 @@ const stateDataSchema = `
-- which in turn makes it easier to merge state data blocks.
CREATE SEQUENCE IF NOT EXISTS roomserver_state_block_nid_seq;
CREATE TABLE IF NOT EXISTS roomserver_state_block (
- -- Local numeric ID for this state data.
- state_block_nid bigint NOT NULL,
- event_type_nid bigint NOT NULL,
- event_state_key_nid bigint NOT NULL,
- event_nid bigint NOT NULL,
- UNIQUE (state_block_nid, event_type_nid, event_state_key_nid)
+ -- The state snapshot NID that identifies this snapshot.
+ state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'),
+ -- The hash of the state block, which is used to enforce uniqueness. The hash is
+ -- generated in Dendrite and passed through to the database, as a btree index over
+ -- this column is cheap and fits within the maximum index size.
+ state_block_hash BYTEA UNIQUE,
+ -- The event NIDs contained within the state block.
+ event_nids bigint[] NOT NULL
);
`
+// Insert a new state block. If we conflict on the hash column then
+// we must perform an update so that the RETURNING statement returns the
+// ID of the row that we conflicted with, so that we can then refer to
+// the original block.
const insertStateDataSQL = "" +
- "INSERT INTO roomserver_state_block (state_block_nid, event_type_nid, event_state_key_nid, event_nid)" +
- " VALUES ($1, $2, $3, $4)"
+ "INSERT INTO roomserver_state_block (state_block_hash, event_nids)" +
+ " VALUES ($1, $2)" +
+ " ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2" +
+ " RETURNING state_block_nid"
-const selectNextStateBlockNIDSQL = "" +
- "SELECT nextval('roomserver_state_block_nid_seq')"
-
-// Bulk state lookup by numeric state block ID.
-// Sort by the state_block_nid, event_type_nid, event_state_key_nid
-// This means that all the entries for a given state_block_nid will appear
-// together in the list and those entries will sorted by event_type_nid
-// and event_state_key_nid. This property makes it easier to merge two
-// state data blocks together.
const bulkSelectStateBlockEntriesSQL = "" +
- "SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
- " FROM roomserver_state_block WHERE state_block_nid = ANY($1)" +
- " ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
-
-// Bulk state lookup by numeric state block ID.
-// Filters the rows in each block to the requested types and state keys.
-// We would like to restrict to particular type state key pairs but we are
-// restricted by the query language to pull the cross product of a list
-// of types and a list state_keys. So we have to filter the result in the
-// application to restrict it to the list of event types and state keys we
-// actually wanted.
-const bulkSelectFilteredStateBlockEntriesSQL = "" +
- "SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
- " FROM roomserver_state_block WHERE state_block_nid = ANY($1)" +
- " AND event_type_nid = ANY($2) AND event_state_key_nid = ANY($3)" +
- " ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
+ "SELECT state_block_nid, event_nids" +
+ " FROM roomserver_state_block WHERE state_block_nid = ANY($1)"
type stateBlockStatements struct {
- insertStateDataStmt *sql.Stmt
- selectNextStateBlockNIDStmt *sql.Stmt
- bulkSelectStateBlockEntriesStmt *sql.Stmt
- bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
+ insertStateDataStmt *sql.Stmt
+ bulkSelectStateBlockEntriesStmt *sql.Stmt
}
-func NewPostgresStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
- s := &stateBlockStatements{}
+func createStateBlockTable(db *sql.DB) error {
_, err := db.Exec(stateDataSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
+ s := &stateBlockStatements{}
return s, shared.StatementList{
{&s.insertStateDataStmt, insertStateDataSQL},
- {&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL},
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
- {&s.bulkSelectFilteredStateBlockEntriesStmt, bulkSelectFilteredStateBlockEntriesSQL},
}.Prepare(db)
}
func (s *stateBlockStatements) BulkInsertStateData(
ctx context.Context,
txn *sql.Tx,
- entries []types.StateEntry,
-) (types.StateBlockNID, error) {
- stateBlockNID, err := s.selectNextStateBlockNID(ctx)
- if err != nil {
- return 0, err
- }
- for _, entry := range entries {
- _, err := s.insertStateDataStmt.ExecContext(
- ctx,
- int64(stateBlockNID),
- int64(entry.EventTypeNID),
- int64(entry.EventStateKeyNID),
- int64(entry.EventNID),
- )
- if err != nil {
- return 0, err
- }
+ entries types.StateEntries,
+) (id types.StateBlockNID, err error) {
+ entries = entries[:util.SortAndUnique(entries)]
+ var nids types.EventNIDs
+ for _, e := range entries {
+ nids = append(nids, e.EventNID)
}
- return stateBlockNID, nil
-}
-
-func (s *stateBlockStatements) selectNextStateBlockNID(
- ctx context.Context,
-) (types.StateBlockNID, error) {
- var stateBlockNID int64
- err := s.selectNextStateBlockNIDStmt.QueryRowContext(ctx).Scan(&stateBlockNID)
- return types.StateBlockNID(stateBlockNID), err
+ err = s.insertStateDataStmt.QueryRowContext(
+ ctx, nids.Hash(), eventNIDsAsArray(nids),
+ ).Scan(&id)
+ return
}
func (s *stateBlockStatements) BulkSelectStateBlockEntries(
- ctx context.Context, stateBlockNIDs []types.StateBlockNID,
-) ([]types.StateEntryList, error) {
- nids := make([]int64, len(stateBlockNIDs))
- for i := range stateBlockNIDs {
- nids[i] = int64(stateBlockNIDs[i])
- }
- rows, err := s.bulkSelectStateBlockEntriesStmt.QueryContext(ctx, pq.Int64Array(nids))
+ ctx context.Context, stateBlockNIDs types.StateBlockNIDs,
+) ([][]types.EventNID, error) {
+ rows, err := s.bulkSelectStateBlockEntriesStmt.QueryContext(ctx, stateBlockNIDsAsArray(stateBlockNIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateBlockEntries: rows.close() failed")
- results := make([]types.StateEntryList, len(stateBlockNIDs))
- // current is a pointer to the StateEntryList to append the state entries to.
- var current *types.StateEntryList
+ results := make([][]types.EventNID, len(stateBlockNIDs))
i := 0
- for rows.Next() {
- var (
- stateBlockNID int64
- eventTypeNID int64
- eventStateKeyNID int64
- eventNID int64
- entry types.StateEntry
- )
- if err = rows.Scan(
- &stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
- ); err != nil {
+ for ; rows.Next(); i++ {
+ var stateBlockNID types.StateBlockNID
+ var result pq.Int64Array
+ if err = rows.Scan(&stateBlockNID, &result); err != nil {
return nil, err
}
- entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
- entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
- entry.EventNID = types.EventNID(eventNID)
- if current == nil || types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
- // The state entry row is for a different state data block to the current one.
- // So we start appending to the next entry in the list.
- current = &results[i]
- current.StateBlockNID = types.StateBlockNID(stateBlockNID)
- i++
+ r := []types.EventNID{}
+ for _, e := range result {
+ r = append(r, types.EventNID(e))
}
- current.StateEntries = append(current.StateEntries, entry)
+ results[i] = r
}
if err = rows.Err(); err != nil {
return nil, err
@@ -186,71 +133,6 @@ func (s *stateBlockStatements) BulkSelectStateBlockEntries(
return results, err
}
-func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries(
- ctx context.Context,
- stateBlockNIDs []types.StateBlockNID,
- stateKeyTuples []types.StateKeyTuple,
-) ([]types.StateEntryList, error) {
- tuples := stateKeyTupleSorter(stateKeyTuples)
- // Sort the tuples so that we can run binary search against them as we filter the rows returned by the db.
- sort.Sort(tuples)
-
- eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
- rows, err := s.bulkSelectFilteredStateBlockEntriesStmt.QueryContext(
- ctx,
- stateBlockNIDsAsArray(stateBlockNIDs),
- eventTypeNIDArray,
- eventStateKeyNIDArray,
- )
- if err != nil {
- return nil, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectFilteredStateBlockEntries: rows.close() failed")
-
- var results []types.StateEntryList
- var current types.StateEntryList
- for rows.Next() {
- var (
- stateBlockNID int64
- eventTypeNID int64
- eventStateKeyNID int64
- eventNID int64
- entry types.StateEntry
- )
- if err := rows.Scan(
- &stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
- ); err != nil {
- return nil, err
- }
- entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
- entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
- entry.EventNID = types.EventNID(eventNID)
-
- // We can use binary search here because we sorted the tuples earlier
- if !tuples.contains(entry.StateKeyTuple) {
- // The select will return the cross product of types and state keys.
- // So we need to check if type of the entry is in the list.
- continue
- }
-
- if types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
- // The state entry row is for a different state data block to the current one.
- // So we append the current entry to the results and start adding to a new one.
- // The first time through the loop current will be empty.
- if current.StateEntries != nil {
- results = append(results, current)
- }
- current = types.StateEntryList{StateBlockNID: types.StateBlockNID(stateBlockNID)}
- }
- current.StateEntries = append(current.StateEntries, entry)
- }
- // Add the last entry to the list if it is not empty.
- if current.StateEntries != nil {
- results = append(results, current)
- }
- return results, rows.Err()
-}
-
func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array {
nids := make([]int64, len(stateBlockNIDs))
for i := range stateBlockNIDs {
diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go
index 63175955..15e14e2e 100644
--- a/roomserver/storage/postgres/state_snapshot_table.go
+++ b/roomserver/storage/postgres/state_snapshot_table.go
@@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/util"
)
const stateSnapshotSchema = `
@@ -40,19 +41,29 @@ const stateSnapshotSchema = `
-- the full state under single state_block_nid.
CREATE SEQUENCE IF NOT EXISTS roomserver_state_snapshot_nid_seq;
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
- -- Local numeric ID for the state.
- state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
- -- Local numeric ID of the room this state is for.
- -- Unused in normal operation, but useful for background work or ad-hoc debugging.
- room_nid bigint NOT NULL,
- -- List of state_block_nids, stored sorted by state_block_nid.
- state_block_nids bigint[] NOT NULL
+ -- The state snapshot NID that identifies this snapshot.
+ state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
+ -- The hash of the state snapshot, which is used to enforce uniqueness. The hash is
+ -- generated in Dendrite and passed through to the database, as a btree index over
+ -- this column is cheap and fits within the maximum index size.
+ state_snapshot_hash BYTEA UNIQUE,
+ -- The room NID that the snapshot belongs to.
+ room_nid bigint NOT NULL,
+ -- The state blocks contained within this snapshot.
+ state_block_nids bigint[] NOT NULL
);
`
+// Insert a new state snapshot. If we conflict on the hash column then
+// we must perform an update so that the RETURNING statement returns the
+// ID of the row that we conflicted with, so that we can then refer to
+// the original snapshot.
const insertStateSQL = "" +
- "INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids)" +
- " VALUES ($1, $2)" +
+ "INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)" +
+ " VALUES ($1, $2, $3)" +
+ " ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2" +
+ // Performing an update, above, ensures that the RETURNING statement
+ // below will always return a valid state snapshot ID
" RETURNING state_snapshot_nid"
// Bulk state data NID lookup.
@@ -67,12 +78,13 @@ type stateSnapshotStatements struct {
bulkSelectStateBlockNIDsStmt *sql.Stmt
}
-func NewPostgresStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
- s := &stateSnapshotStatements{}
+func createStateSnapshotTable(db *sql.DB) error {
_, err := db.Exec(stateSnapshotSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
+ s := &stateSnapshotStatements{}
return s, shared.StatementList{
{&s.insertStateStmt, insertStateSQL},
@@ -81,13 +93,15 @@ func NewPostgresStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
}
func (s *stateSnapshotStatements) InsertState(
- ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID,
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, nids types.StateBlockNIDs,
) (stateNID types.StateSnapshotNID, err error) {
- nids := make([]int64, len(stateBlockNIDs))
- for i := range stateBlockNIDs {
- nids[i] = int64(stateBlockNIDs[i])
+ nids = nids[:util.SortAndUnique(nids)]
+ var id int64
+ err = sqlutil.TxStmt(txn, s.insertStateStmt).QueryRowContext(ctx, nids.Hash(), int64(roomNID), stateBlockNIDsAsArray(nids)).Scan(&id)
+ if err != nil {
+ return 0, err
}
- err = sqlutil.TxStmt(txn, s.insertStateStmt).QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID)
+ stateNID = types.StateSnapshotNID(id)
return
}
diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go
index bb3f841d..863a1593 100644
--- a/roomserver/storage/postgres/storage.go
+++ b/roomserver/storage/postgres/storage.go
@@ -17,6 +17,7 @@ package postgres
import (
"database/sql"
+ "fmt"
// Import the postgres database driver.
_ "github.com/lib/pq"
@@ -39,20 +40,25 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
var db *sql.DB
var err error
if db, err = sqlutil.Open(dbProperties); err != nil {
- return nil, err
+ return nil, fmt.Errorf("sqlutil.Open: %w", err)
}
- // Create tables before executing migrations so we don't fail if the table is missing,
- // and THEN prepare statements so we don't fail due to referencing new columns
- ms := membershipStatements{}
- if err := ms.execSchema(db); err != nil {
+ // Create the tables.
+ if err := d.create(db); err != nil {
return nil, err
}
+
+ // Then execute the migrations. By this point the tables are created with the latest
+ // schemas.
m := sqlutil.NewMigrations()
deltas.LoadAddForgottenColumn(m)
+ deltas.LoadStateBlocksRefactor(m)
if err := m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
+
+ // Then prepare the statements. Now that the migrations have run, any columns referred
+ // to in the database code should now exist.
if err := d.prepare(db, cache); err != nil {
return nil, err
}
@@ -60,61 +66,107 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
return &d, nil
}
-// nolint: gocyclo
-func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) (err error) {
- eventStateKeys, err := NewPostgresEventStateKeysTable(db)
+func (d *Database) create(db *sql.DB) error {
+ if err := createEventStateKeysTable(db); err != nil {
+ return err
+ }
+ if err := createEventTypesTable(db); err != nil {
+ return err
+ }
+ if err := createEventJSONTable(db); err != nil {
+ return err
+ }
+ if err := createEventsTable(db); err != nil {
+ return err
+ }
+ if err := createRoomsTable(db); err != nil {
+ return err
+ }
+ if err := createTransactionsTable(db); err != nil {
+ return err
+ }
+ if err := createStateBlockTable(db); err != nil {
+ return err
+ }
+ if err := createStateSnapshotTable(db); err != nil {
+ return err
+ }
+ if err := createPrevEventsTable(db); err != nil {
+ return err
+ }
+ if err := createRoomAliasesTable(db); err != nil {
+ return err
+ }
+ if err := createInvitesTable(db); err != nil {
+ return err
+ }
+ if err := createMembershipTable(db); err != nil {
+ return err
+ }
+ if err := createPublishedTable(db); err != nil {
+ return err
+ }
+ if err := createRedactionsTable(db); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
+ eventStateKeys, err := prepareEventStateKeysTable(db)
if err != nil {
return err
}
- eventTypes, err := NewPostgresEventTypesTable(db)
+ eventTypes, err := prepareEventTypesTable(db)
if err != nil {
return err
}
- eventJSON, err := NewPostgresEventJSONTable(db)
+ eventJSON, err := prepareEventJSONTable(db)
if err != nil {
return err
}
- events, err := NewPostgresEventsTable(db)
+ events, err := prepareEventsTable(db)
if err != nil {
return err
}
- rooms, err := NewPostgresRoomsTable(db)
+ rooms, err := prepareRoomsTable(db)
if err != nil {
return err
}
- transactions, err := NewPostgresTransactionsTable(db)
+ transactions, err := prepareTransactionsTable(db)
if err != nil {
return err
}
- stateBlock, err := NewPostgresStateBlockTable(db)
+ stateBlock, err := prepareStateBlockTable(db)
if err != nil {
return err
}
- stateSnapshot, err := NewPostgresStateSnapshotTable(db)
+ stateSnapshot, err := prepareStateSnapshotTable(db)
if err != nil {
return err
}
- roomAliases, err := NewPostgresRoomAliasesTable(db)
+ prevEvents, err := preparePrevEventsTable(db)
if err != nil {
return err
}
- prevEvents, err := NewPostgresPreviousEventsTable(db)
+ roomAliases, err := prepareRoomAliasesTable(db)
if err != nil {
return err
}
- invites, err := NewPostgresInvitesTable(db)
+ invites, err := prepareInvitesTable(db)
if err != nil {
return err
}
- membership, err := NewPostgresMembershipTable(db)
+ membership, err := prepareMembershipTable(db)
if err != nil {
return err
}
- published, err := NewPostgresPublishedTable(db)
+ published, err := preparePublishedTable(db)
if err != nil {
return err
}
- redactions, err := NewPostgresRedactionsTable(db)
+ redactions, err := prepareRedactionsTable(db)
if err != nil {
return err
}
diff --git a/roomserver/storage/postgres/transactions_table.go b/roomserver/storage/postgres/transactions_table.go
index 5e59ae16..cada0d8a 100644
--- a/roomserver/storage/postgres/transactions_table.go
+++ b/roomserver/storage/postgres/transactions_table.go
@@ -54,12 +54,13 @@ type transactionStatements struct {
selectTransactionEventIDStmt *sql.Stmt
}
-func NewPostgresTransactionsTable(db *sql.DB) (tables.Transactions, error) {
- s := &transactionStatements{}
+func createTransactionsTable(db *sql.DB) error {
_, err := db.Exec(transactionsSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) {
+ s := &transactionStatements{}
return s, shared.StatementList{
{&s.insertTransactionStmt, insertTransactionSQL},