aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-05-30 18:05:48 +0200
committerGitHub <noreply@github.com>2023-05-30 18:05:48 +0200
commit61341aca500ec4d87e5b6d4c3f965c3836d6e6d6 (patch)
treeeb74ce46fd0812279dab34eb809603a9213ff59c
parent3dcca4017cb919fb249784d9cf9b83ea60a77f15 (diff)
Add tests for the `UpDropEventReferenceSHAPrevEvents` migration (#3087)
... as they could fail if there are duplicate events in `roomserver_previous_events`. This fixes the migration by trying to combine the `event_nids` if possible (same room) as mentioned by @kegsay in https://github.com/matrix-org/dendrite/pull/3083#discussion_r1195508963
-rw-r--r--roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go86
-rw-r--r--roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha_test.go60
-rw-r--r--roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go78
-rw-r--r--roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha_test.go59
4 files changed, 271 insertions, 12 deletions
diff --git a/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go
index c1957771..1b1dd44d 100644
--- a/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go
+++ b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go
@@ -18,19 +18,14 @@ import (
"context"
"database/sql"
"fmt"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/util"
)
func UpDropEventReferenceSHAEvents(ctx context.Context, tx *sql.Tx) error {
- var count int
- err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`).
- Scan(&count)
- if err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("failed to query duplicate event ids")
- }
- if count > 0 {
- return fmt.Errorf("unable to drop column, as there are duplicate event ids")
- }
- _, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`)
+ _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
@@ -46,9 +41,80 @@ func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
+
+ // figure out if there are duplicates
+ dupeRows, err := tx.QueryContext(ctx, `SELECT previous_event_id FROM roomserver_previous_events GROUP BY previous_event_id HAVING count(previous_event_id) > 1`)
+ if err != nil {
+ return fmt.Errorf("failed to query duplicate event ids")
+ }
+ defer internal.CloseAndLogIfError(ctx, dupeRows, "failed to close rows")
+
+ var prevEvents []string
+ var prevEventID string
+ for dupeRows.Next() {
+ if err = dupeRows.Scan(&prevEventID); err != nil {
+ return err
+ }
+ prevEvents = append(prevEvents, prevEventID)
+ }
+ if dupeRows.Err() != nil {
+ return dupeRows.Err()
+ }
+
+ // if we found duplicates, check if we can combine them, e.g. they are in the same room
+ for _, dupeID := range prevEvents {
+ var dupeNIDsRows *sql.Rows
+ dupeNIDsRows, err = tx.QueryContext(ctx, `SELECT event_nids FROM roomserver_previous_events WHERE previous_event_id = $1`, dupeID)
+ if err != nil {
+ return fmt.Errorf("failed to query duplicate event ids")
+ }
+ defer internal.CloseAndLogIfError(ctx, dupeNIDsRows, "failed to close rows")
+ var dupeNIDs []int64
+ for dupeNIDsRows.Next() {
+ var nids pq.Int64Array
+ if err = dupeNIDsRows.Scan(&nids); err != nil {
+ return err
+ }
+ dupeNIDs = append(dupeNIDs, nids...)
+ }
+
+ if dupeNIDsRows.Err() != nil {
+ return dupeNIDsRows.Err()
+ }
+ // dedupe NIDs
+ dupeNIDs = dupeNIDs[:util.SortAndUnique(nids(dupeNIDs))]
+ // now that we have all NIDs, check which room they belong to
+ var roomCount int
+ err = tx.QueryRowContext(ctx, `SELECT count(distinct room_nid) FROM roomserver_events WHERE event_nid = ANY($1)`, pq.Array(dupeNIDs)).Scan(&roomCount)
+ if err != nil {
+ return err
+ }
+ // if the events are from different rooms, that's bad and we can't continue
+ if roomCount > 1 {
+ return fmt.Errorf("detected events (%v) referenced for different rooms (%v)", dupeNIDs, roomCount)
+ }
+ // otherwise delete the dupes
+ _, err = tx.ExecContext(ctx, "DELETE FROM roomserver_previous_events WHERE previous_event_id = $1", dupeID)
+ if err != nil {
+ return fmt.Errorf("unable to delete duplicates: %w", err)
+ }
+
+ // insert combined values
+ _, err = tx.ExecContext(ctx, "INSERT INTO roomserver_previous_events (previous_event_id, event_nids) VALUES ($1, $2)", dupeID, pq.Array(dupeNIDs))
+ if err != nil {
+ return fmt.Errorf("unable to insert new event NIDs: %w", err)
+ }
+ }
+
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events ADD CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id);`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
+
+type nids []int64
+
+func (s nids) Len() int { return len(s) }
+func (s nids) Less(i, j int) bool { return s[i] < s[j] }
+func (s nids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
diff --git a/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha_test.go b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha_test.go
new file mode 100644
index 00000000..c79daac5
--- /dev/null
+++ b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha_test.go
@@ -0,0 +1,60 @@
+package deltas
+
+import (
+ "testing"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestUpDropEventReferenceSHAPrevEvents(t *testing.T) {
+
+ cfg, ctx, close := testrig.CreateConfig(t, test.DBTypePostgres)
+ defer close()
+
+ db, err := sqlutil.Open(&cfg.Global.DatabaseOptions, sqlutil.NewDummyWriter())
+ assert.Nil(t, err)
+ assert.NotNil(t, db)
+ defer db.Close()
+
+ // create the table in the old layout
+ _, err = db.ExecContext(ctx.Context(), `
+CREATE TABLE IF NOT EXISTS roomserver_previous_events (
+ previous_event_id TEXT NOT NULL,
+ event_nids BIGINT[] NOT NULL,
+ previous_reference_sha256 BYTEA NOT NULL,
+ CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256)
+);`)
+ assert.Nil(t, err)
+
+ // create the events table as well, slimmed down with one eventNID
+ _, err = db.ExecContext(ctx.Context(), `
+CREATE SEQUENCE IF NOT EXISTS roomserver_event_nid_seq;
+CREATE TABLE IF NOT EXISTS roomserver_events (
+ event_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_event_nid_seq'),
+ room_nid BIGINT NOT NULL
+);
+
+INSERT INTO roomserver_events (event_nid, room_nid) VALUES (1, 1)
+`)
+ assert.Nil(t, err)
+
+ // insert duplicate prev events with different event_nids
+ stmt, err := db.PrepareContext(ctx.Context(), `INSERT INTO roomserver_previous_events (previous_event_id, event_nids, previous_reference_sha256) VALUES ($1, $2, $3)`)
+ assert.Nil(t, err)
+ assert.NotNil(t, stmt)
+ _, err = stmt.ExecContext(ctx.Context(), "1", pq.Array([]int64{1, 2}), "a")
+ assert.Nil(t, err)
+ _, err = stmt.ExecContext(ctx.Context(), "1", pq.Array([]int64{1, 2, 3}), "b")
+ assert.Nil(t, err)
+ // execute the migration
+ txn, err := db.Begin()
+ assert.Nil(t, err)
+ assert.NotNil(t, txn)
+ defer txn.Rollback()
+ err = UpDropEventReferenceSHAPrevEvents(ctx.Context(), txn)
+ assert.NoError(t, err)
+}
diff --git a/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go
index 452d72ac..515bccc3 100644
--- a/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go
+++ b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go
@@ -18,6 +18,10 @@ import (
"context"
"database/sql"
"fmt"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/util"
)
func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error {
@@ -52,8 +56,72 @@ func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
return fmt.Errorf("tx.ExecContext: %w", err)
}
+ // figure out if there are duplicates
+ dupeRows, err := tx.QueryContext(ctx, `SELECT previous_event_id FROM _roomserver_previous_events GROUP BY previous_event_id HAVING count(previous_event_id) > 1`)
+ if err != nil {
+ return fmt.Errorf("failed to query duplicate event ids")
+ }
+ defer internal.CloseAndLogIfError(ctx, dupeRows, "failed to close rows")
+
+ var prevEvents []string
+ var prevEventID string
+ for dupeRows.Next() {
+ if err = dupeRows.Scan(&prevEventID); err != nil {
+ return err
+ }
+ prevEvents = append(prevEvents, prevEventID)
+ }
+ if dupeRows.Err() != nil {
+ return dupeRows.Err()
+ }
+
+ // if we found duplicates, check if we can combine them, e.g. they are in the same room
+ for _, dupeID := range prevEvents {
+ var dupeNIDsRows *sql.Rows
+ dupeNIDsRows, err = tx.QueryContext(ctx, `SELECT event_nids FROM _roomserver_previous_events WHERE previous_event_id = $1`, dupeID)
+ if err != nil {
+ return fmt.Errorf("failed to query duplicate event ids")
+ }
+ defer internal.CloseAndLogIfError(ctx, dupeNIDsRows, "failed to close rows")
+ var dupeNIDs []int64
+ for dupeNIDsRows.Next() {
+ var nids pq.Int64Array
+ if err = dupeNIDsRows.Scan(&nids); err != nil {
+ return err
+ }
+ dupeNIDs = append(dupeNIDs, nids...)
+ }
+
+ if dupeNIDsRows.Err() != nil {
+ return dupeNIDsRows.Err()
+ }
+ // dedupe NIDs
+ dupeNIDs = dupeNIDs[:util.SortAndUnique(nids(dupeNIDs))]
+ // now that we have all NIDs, check which room they belong to
+ var roomCount int
+ err = tx.QueryRowContext(ctx, `SELECT count(distinct room_nid) FROM roomserver_events WHERE event_nid IN ($1)`, pq.Array(dupeNIDs)).Scan(&roomCount)
+ if err != nil {
+ return err
+ }
+ // if the events are from different rooms, that's bad and we can't continue
+ if roomCount > 1 {
+ return fmt.Errorf("detected events (%v) referenced for different rooms (%v)", dupeNIDs, roomCount)
+ }
+ // otherwise delete the dupes
+ _, err = tx.ExecContext(ctx, "DELETE FROM _roomserver_previous_events WHERE previous_event_id = $1", dupeID)
+ if err != nil {
+ return fmt.Errorf("unable to delete duplicates: %w", err)
+ }
+
+ // insert combined values
+ _, err = tx.ExecContext(ctx, "INSERT INTO _roomserver_previous_events (previous_event_id, event_nids) VALUES ($1, $2)", dupeID, pq.Array(dupeNIDs))
+ if err != nil {
+ return fmt.Errorf("unable to insert new event NIDs: %w", err)
+ }
+ }
+
// move data
- if _, err := tx.ExecContext(ctx, `
+ if _, err = tx.ExecContext(ctx, `
INSERT
INTO roomserver_previous_events (
previous_event_id, event_nids
@@ -64,9 +132,15 @@ INSERT
return fmt.Errorf("tx.ExecContext: %w", err)
}
// drop old table
- _, err := tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
+ _, err = tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
+
+type nids []int64
+
+func (s nids) Len() int { return len(s) }
+func (s nids) Less(i, j int) bool { return s[i] < s[j] }
+func (s nids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
diff --git a/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha_test.go b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha_test.go
new file mode 100644
index 00000000..547d9703
--- /dev/null
+++ b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha_test.go
@@ -0,0 +1,59 @@
+package deltas
+
+import (
+ "testing"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestUpDropEventReferenceSHAPrevEvents(t *testing.T) {
+
+ cfg, ctx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
+ defer close()
+
+ db, err := sqlutil.Open(&cfg.RoomServer.Database, sqlutil.NewExclusiveWriter())
+ assert.Nil(t, err)
+ assert.NotNil(t, db)
+ defer db.Close()
+
+ // create the table in the old layout
+ _, err = db.ExecContext(ctx.Context(), `
+ CREATE TABLE IF NOT EXISTS roomserver_previous_events (
+ previous_event_id TEXT NOT NULL,
+ previous_reference_sha256 BLOB,
+ event_nids TEXT NOT NULL,
+ UNIQUE (previous_event_id, previous_reference_sha256)
+ );`)
+ assert.Nil(t, err)
+
+ // create the events table as well, slimmed down with one eventNID
+ _, err = db.ExecContext(ctx.Context(), `
+ CREATE TABLE IF NOT EXISTS roomserver_events (
+ event_nid INTEGER PRIMARY KEY AUTOINCREMENT,
+ room_nid INTEGER NOT NULL
+);
+
+INSERT INTO roomserver_events (event_nid, room_nid) VALUES (1, 1)
+`)
+ assert.Nil(t, err)
+
+ // insert duplicate prev events with different event_nids
+ stmt, err := db.PrepareContext(ctx.Context(), `INSERT INTO roomserver_previous_events (previous_event_id, event_nids, previous_reference_sha256) VALUES ($1, $2, $3)`)
+ assert.Nil(t, err)
+ assert.NotNil(t, stmt)
+ _, err = stmt.ExecContext(ctx.Context(), "1", "{1,2}", "a")
+ assert.Nil(t, err)
+ _, err = stmt.ExecContext(ctx.Context(), "1", "{1,2,3}", "b")
+ assert.Nil(t, err)
+
+ // execute the migration
+ txn, err := db.Begin()
+ assert.Nil(t, err)
+ assert.NotNil(t, txn)
+ err = UpDropEventReferenceSHAPrevEvents(ctx.Context(), txn)
+ defer txn.Rollback()
+ assert.NoError(t, err)
+}