diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-04-26 13:25:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-26 13:25:57 +0100 |
commit | 5ce1fe80dea8b8cfca8712e8d584deb995bbddcc (patch) | |
tree | 1307a1edf73abf68cebd4601efec1e467dac964c /roomserver/storage/postgres/deltas | |
parent | d6e9b7b307ff0d7541046ec33890d49239c7a6ca (diff) |
State storage refactor (#1839)
* Hash-deduplicated state storage (and migrations) for PostgreSQL and SQLite
* Refactor droomserver database setup for migrations
* Fix conflict statements
* Update migration names
* Set a boundary for old to new block/snapshot IDs so we don't rewrite them more than once accidentally
* Create sequence if not exists
* Fix boundary queries
* Fix boundary queries
* Use Query
* Break out queries a bit
* More sequence tweaks
* Query parameters are not playing the game
* Injection escaping may not work for CREATE SEQUENCE after all
* Fix snapshot sequence name
* Use boundaried IDs in SQLite too
* Use IFNULL for SQLite
* Use COALESCE in PostgreSQL
* Review comments @Kegsay
Diffstat (limited to 'roomserver/storage/postgres/deltas')
-rw-r--r-- | roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go | 1 | ||||
-rw-r--r-- | roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go | 223 |
2 files changed, 224 insertions, 0 deletions
diff --git a/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go b/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go index 733f0fa1..f3bd8632 100644 --- a/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go +++ b/roomserver/storage/postgres/deltas/20201028212440_add_forgotten_column.go @@ -24,6 +24,7 @@ import ( func LoadFromGoose() { goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn) + goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor) } func LoadAddForgottenColumn(m *sqlutil.Migrations) { diff --git a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go new file mode 100644 index 00000000..84da9614 --- /dev/null +++ b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go @@ -0,0 +1,223 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +type stateSnapshotData struct { + StateSnapshotNID types.StateSnapshotNID + RoomNID types.RoomNID +} + +type stateBlockData struct { + stateSnapshotData + StateBlockNID types.StateBlockNID + EventNIDs types.EventNIDs +} + +func LoadStateBlocksRefactor(m *sqlutil.Migrations) { + m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor) +} + +// nolint:gocyclo +func UpStateBlocksRefactor(tx *sql.Tx) error { + logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!") + defer logrus.Warn("State storage upgrade complete") + + var snapshotcount int + var maxsnapshotid int + var maxblockid int + if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil { + return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) + } + if err := tx.QueryRow(`SELECT COALESCE(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil { + return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) + } + if err := tx.QueryRow(`SELECT COALESCE(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil { + return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) + } + maxsnapshotid++ + maxblockid++ + + if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil { + return fmt.Errorf("tx.Exec: %w", err) + } + if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil { + return fmt.Errorf("tx.Exec: %w", err) + } + // We create new sequences starting with the maximum state snapshot and block NIDs. + // This means that all newly created snapshots and blocks by the migration will have + // NIDs higher than these values, so that when we come to update the references to + // these NIDs using UPDATE statements, we can guarantee we are only ever updating old + // values and not accidentally overwriting new ones. + if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_block_nid_sequence START WITH %d;`, maxblockid)); err != nil { + return fmt.Errorf("tx.Exec: %w", err) + } + if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_snapshot_nid_sequence START WITH %d;`, maxsnapshotid)); err != nil { + return fmt.Errorf("tx.Exec: %w", err) + } + _, err := tx.Exec(` + CREATE TABLE IF NOT EXISTS roomserver_state_block ( + state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_sequence'), + state_block_hash BYTEA UNIQUE, + event_nids bigint[] NOT NULL + ); + `) + if err != nil { + return fmt.Errorf("tx.Exec (create blocks table): %w", err) + } + _, err = tx.Exec(` + CREATE TABLE IF NOT EXISTS roomserver_state_snapshots ( + state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_sequence'), + state_snapshot_hash BYTEA UNIQUE, + room_nid bigint NOT NULL, + state_block_nids bigint[] NOT NULL + ); + `) + if err != nil { + return fmt.Errorf("tx.Exec (create snapshots table): %w", err) + } + logrus.Warn("New tables created...") + + batchsize := 100 + for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize { + var snapshotrows *sql.Rows + snapshotrows, err = tx.Query(` + SELECT + state_snapshot_nid, + room_nid, + state_block_nid, + ARRAY_AGG(event_nid) AS event_nids + FROM ( + SELECT + _roomserver_state_snapshots.state_snapshot_nid, + _roomserver_state_snapshots.room_nid, + _roomserver_state_block.state_block_nid, + _roomserver_state_block.event_nid + FROM + _roomserver_state_snapshots + JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids) + WHERE + _roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT + _roomserver_state_snapshots.state_snapshot_nid + FROM + _roomserver_state_snapshots + LIMIT $1 OFFSET $2)) AS _roomserver_state_block + GROUP BY + state_snapshot_nid, + room_nid, + state_block_nid; + `, batchsize, batchoffset) + if err != nil { + return fmt.Errorf("tx.Query: %w", err) + } + + logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount) + var snapshots []stateBlockData + + for snapshotrows.Next() { + var snapshot stateBlockData + var eventsarray pq.Int64Array + if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil { + return fmt.Errorf("rows.Scan: %w", err) + } + for _, e := range eventsarray { + snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e)) + } + snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)] + snapshots = append(snapshots, snapshot) + } + + if err = snapshotrows.Close(); err != nil { + return fmt.Errorf("snapshots.Close: %w", err) + } + + newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{} + + for _, snapshot := range snapshots { + var eventsarray pq.Int64Array + for _, e := range snapshot.EventNIDs { + eventsarray = append(eventsarray, int64(e)) + } + + var blocknid types.StateBlockNID + err = tx.QueryRow(` + INSERT INTO roomserver_state_block (state_block_hash, event_nids) + VALUES ($1, $2) + ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2 + RETURNING state_block_nid + `, snapshot.EventNIDs.Hash(), eventsarray).Scan(&blocknid) + if err != nil { + return fmt.Errorf("tx.QueryRow.Scan (insert new block with %d events): %w", len(eventsarray), err) + } + index := stateSnapshotData{snapshot.StateSnapshotNID, snapshot.RoomNID} + newsnapshots[index] = append(newsnapshots[index], blocknid) + } + + for snapshotdata, newblocks := range newsnapshots { + var newblocksarray pq.Int64Array + for _, b := range newblocks { + newblocksarray = append(newblocksarray, int64(b)) + } + + var newNID types.StateSnapshotNID + err = tx.QueryRow(` + INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids) + VALUES ($1, $2, $3) + ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2 + RETURNING state_snapshot_nid + `, newblocks.Hash(), snapshotdata.RoomNID, newblocksarray).Scan(&newNID) + if err != nil { + return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) + } + + if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil { + return fmt.Errorf("tx.Exec (update events): %w", err) + } + + if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil { + return fmt.Errorf("tx.Exec (update rooms): %w", err) + } + } + } + + if _, err = tx.Exec(` + DROP TABLE _roomserver_state_snapshots; + DROP SEQUENCE roomserver_state_snapshot_nid_seq; + `); err != nil { + return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err) + } + if _, err = tx.Exec(` + DROP TABLE _roomserver_state_block; + DROP SEQUENCE roomserver_state_block_nid_seq; + `); err != nil { + return fmt.Errorf("tx.Exec (delete old block table): %w", err) + } + + return nil +} + +func DownStateBlocksRefactor(tx *sql.Tx) error { + panic("Downgrading state storage is not supported") +} |