aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go25
-rw-r--r--roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go23
2 files changed, 45 insertions, 3 deletions
diff --git a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go
index 84da9614..d87ae052 100644
--- a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go
+++ b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go
@@ -119,11 +119,15 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
_roomserver_state_snapshots
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
WHERE
- _roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
+ _roomserver_state_snapshots.state_snapshot_nid = ANY (
+ SELECT
_roomserver_state_snapshots.state_snapshot_nid
FROM
_roomserver_state_snapshots
- LIMIT $1 OFFSET $2)) AS _roomserver_state_block
+ ORDER BY _roomserver_state_snapshots.state_snapshot_nid ASC
+ LIMIT $1 OFFSET $2
+ )
+ ) AS _roomserver_state_block
GROUP BY
state_snapshot_nid,
room_nid,
@@ -202,6 +206,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
}
+ // By this point we should have no more state_snapshot_nids below maxsnapshotid in either roomserver_rooms or roomserver_events
+ // If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
+ // in roomserver_state_snapshots
+ var count int64
+ if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
+ return fmt.Errorf("assertion query failed: %s", err)
+ }
+ if count > 0 {
+ return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
+ }
+ if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
+ return fmt.Errorf("assertion query failed: %s", err)
+ }
+ if count > 0 {
+ return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
+ }
+
if _, err = tx.Exec(`
DROP TABLE _roomserver_state_snapshots;
DROP SEQUENCE roomserver_state_snapshot_nid_seq;
diff --git a/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go
index 3b93b3fa..42edbbc6 100644
--- a/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go
+++ b/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go
@@ -31,6 +31,7 @@ func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
+// nolint:gocyclo
func UpStateBlocksRefactor(tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete")
@@ -45,6 +46,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
maxsnapshotid++
maxblockid++
+ oldMaxSnapshotID := maxsnapshotid
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
@@ -133,6 +135,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
if jerr != nil {
return fmt.Errorf("json.Marshal (new blocks): %w", jerr)
}
+
var newsnapshot types.StateSnapshotNID
err = tx.QueryRow(`
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
@@ -144,7 +147,8 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
}
maxsnapshotid++
- if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
+ _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid)
+ if err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err)
}
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
@@ -153,6 +157,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
}
+ // By this point we should have no more state_snapshot_nids below oldMaxSnapshotID in either roomserver_rooms or roomserver_events
+ // If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
+ // in roomserver_state_snapshots
+ var count int64
+ if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
+ return fmt.Errorf("assertion query failed: %s", err)
+ }
+ if count > 0 {
+ return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
+ }
+ if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
+ return fmt.Errorf("assertion query failed: %s", err)
+ }
+ if count > 0 {
+ return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
+ }
+
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
}