diff options
author | kegsay <kegan@matrix.org> | 2021-06-29 11:25:17 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-29 11:25:17 +0100 |
commit | c849e74dfc9aabfd0d98db1310230aa362f6df2a (patch) | |
tree | b608db8d29cf2121edb6dc32c993045dc6b35e17 /roomserver/storage/sqlite3 | |
parent | e2b6a90d90a5f4bfd2658110ae8c2edf5777efb3 (diff) |
db migration: fix #1844 and add additional assertions (#1889)
* db migration: fix #1844 and add additional assertions
- Migration scripts will now check to see if there are any unconverted
snapshot IDs and fail the migration if there are any. This should
prevent people from getting a corrupt database in the event the root
cause is still unknown.
- Add an ORDER BY clause when doing batch queries in the postgres
migration. LIMIT and OFFSET without ORDER BY are undefined and must
not be relied upon to produce a deterministic ordering (e.g row order).
See https://www.postgresql.org/docs/current/queries-limit.html
* Linting
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'roomserver/storage/sqlite3')
-rw-r--r-- | roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go index 3b93b3fa..42edbbc6 100644 --- a/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go +++ b/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go @@ -31,6 +31,7 @@ func LoadStateBlocksRefactor(m *sqlutil.Migrations) { m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor) } +// nolint:gocyclo func UpStateBlocksRefactor(tx *sql.Tx) error { logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!") defer logrus.Warn("State storage upgrade complete") @@ -45,6 +46,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { } maxsnapshotid++ maxblockid++ + oldMaxSnapshotID := maxsnapshotid if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil { return fmt.Errorf("tx.Exec: %w", err) @@ -133,6 +135,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { if jerr != nil { return fmt.Errorf("json.Marshal (new blocks): %w", jerr) } + var newsnapshot types.StateSnapshotNID err = tx.QueryRow(` INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids) @@ -144,7 +147,8 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) } maxsnapshotid++ - if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil { + _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid) + if err != nil { return fmt.Errorf("tx.Exec (update events): %w", err) } if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil { @@ -153,6 +157,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { } } + // By this point we should have no more state_snapshot_nids below oldMaxSnapshotID in either roomserver_rooms or roomserver_events + // If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist + // in roomserver_state_snapshots + var count int64 + if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil { + return fmt.Errorf("assertion query failed: %s", err) + } + if count > 0 { + return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) + } + if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil { + return fmt.Errorf("assertion query failed: %s", err) + } + if count > 0 { + return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) + } + if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil { return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err) } |