aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAshley Nelson <fant@shley.email>2022-10-03 05:57:21 -0500
committerGitHub <noreply@github.com>2022-10-03 11:57:21 +0100
commitc1e16fd41e24eb1c139b4e4acbef1a309d7d573b (patch)
treee0b3ad587d2bd002db101a5b9ef6be51ca5e3296
parentb7f4bab358ea6de0932037242f71152bfc27c289 (diff)
Fix fragility of selectEventsWithEventIDsSQL queries (#2757)
This fixes a temporary workaround with the `selectEventsWithEventIDsSQL` queries where fields need to be artificially added to the queries so the row results match the format of the `syncapi_output_room_events` table. I made similar functions that accept row results from the `syncapi_current_room_state` table and convert them into StreamEvents without the fields that are specific to output room events. There is also a unit test in the first commit to ensure the resulting behavior doesn't change from the modified queries and functions. Fixes #601. ### Pull Request Checklist <!-- Please read docs/CONTRIBUTING.md before submitting your pull request --> * [x] I have added tests for PR _or_ I have justified why this PR doesn't need tests. * [x] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off) Signed-off-by: `Ashley Nelson <fant@shley.email>` Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go38
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go38
-rw-r--r--syncapi/storage/tables/current_room_state_test.go88
3 files changed, 150 insertions, 14 deletions
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index 4ffd2961..2ccf0be1 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -104,12 +104,7 @@ const selectStateEventSQL = "" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
- // TODO: The session_id and transaction_id blanks are here because
- // the rowsToStreamEvents expects there to be exactly seven columns. We need to
- // figure out if these really need to be in the DB, and if so, we need a
- // better permanent fix for this. - neilalexander, 2 Jan 2020
- "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
- " FROM syncapi_current_room_state WHERE event_id = ANY($1)"
+ "SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id = ANY($1)"
const selectSharedUsersSQL = "" +
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
@@ -365,7 +360,36 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
- return rowsToStreamEvents(rows)
+ return currentRoomStateRowsToStreamEvents(rows)
+}
+
+func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
+ var events []types.StreamEvent
+ for rows.Next() {
+ var (
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ historyVisibility gomatrixserverlib.HistoryVisibility
+ )
+ if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+
+ ev.Visibility = historyVisibility
+
+ events = append(events, types.StreamEvent{
+ HeaderedEvent: &ev,
+ StreamPosition: streamPos,
+ })
+ }
+
+ return events, nil
}
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index c4019fed..ff45e786 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -88,12 +88,7 @@ const selectStateEventSQL = "" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
- // TODO: The session_id and transaction_id blanks are here because
- // the rowsToStreamEvents expects there to be exactly seven columns. We need to
- // figure out if these really need to be in the DB, and if so, we need a
- // better permanent fix for this. - neilalexander, 2 Jan 2020
- "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
- " FROM syncapi_current_room_state WHERE event_id IN ($1)"
+ "SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id IN ($1)"
const selectSharedUsersSQL = "" +
"SELECT state_key FROM syncapi_current_room_state WHERE room_id IN(" +
@@ -378,7 +373,7 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
return nil, err
}
start = start + n
- events, err := rowsToStreamEvents(rows)
+ events, err := currentRoomStateRowsToStreamEvents(rows)
internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
if err != nil {
return nil, err
@@ -388,6 +383,35 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
return res, nil
}
+func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
+ var events []types.StreamEvent
+ for rows.Next() {
+ var (
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ historyVisibility gomatrixserverlib.HistoryVisibility
+ )
+ if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+
+ ev.Visibility = historyVisibility
+
+ events = append(events, types.StreamEvent{
+ HeaderedEvent: &ev,
+ StreamPosition: streamPos,
+ })
+ }
+
+ return events, nil
+}
+
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
result := []*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
diff --git a/syncapi/storage/tables/current_room_state_test.go b/syncapi/storage/tables/current_room_state_test.go
new file mode 100644
index 00000000..23287c50
--- /dev/null
+++ b/syncapi/storage/tables/current_room_state_test.go
@@ -0,0 +1,88 @@
+package tables_test
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "testing"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/storage/postgres"
+ "github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/test"
+)
+
+func newCurrentRoomStateTable(t *testing.T, dbType test.DBType) (tables.CurrentRoomState, *sql.DB, func()) {
+ t.Helper()
+ connStr, close := test.PrepareDBConnectionString(t, dbType)
+ db, err := sqlutil.Open(&config.DatabaseOptions{
+ ConnectionString: config.DataSource(connStr),
+ }, sqlutil.NewExclusiveWriter())
+ if err != nil {
+ t.Fatalf("failed to open db: %s", err)
+ }
+
+ var tab tables.CurrentRoomState
+ switch dbType {
+ case test.DBTypePostgres:
+ tab, err = postgres.NewPostgresCurrentRoomStateTable(db)
+ case test.DBTypeSQLite:
+ var stream sqlite3.StreamIDStatements
+ if err = stream.Prepare(db); err != nil {
+ t.Fatalf("failed to prepare stream stmts: %s", err)
+ }
+ tab, err = sqlite3.NewSqliteCurrentRoomStateTable(db, &stream)
+ }
+ if err != nil {
+ t.Fatalf("failed to make new table: %s", err)
+ }
+ return tab, db, close
+}
+
+func TestCurrentRoomStateTable(t *testing.T) {
+ ctx := context.Background()
+ alice := test.NewUser(t)
+ room := test.NewRoom(t, alice)
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ tab, db, close := newCurrentRoomStateTable(t, dbType)
+ defer close()
+ events := room.CurrentState()
+ err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
+ for i, ev := range events {
+ err := tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i))
+ if err != nil {
+ return fmt.Errorf("failed to UpsertRoomState: %w", err)
+ }
+ }
+ wantEventIDs := []string{
+ events[0].EventID(), events[1].EventID(), events[2].EventID(), events[3].EventID(),
+ }
+ gotEvents, err := tab.SelectEventsWithEventIDs(ctx, txn, wantEventIDs)
+ if err != nil {
+ return fmt.Errorf("failed to SelectEventsWithEventIDs: %w", err)
+ }
+ if len(gotEvents) != len(wantEventIDs) {
+ return fmt.Errorf("SelectEventsWithEventIDs\ngot %d, want %d results", len(gotEvents), len(wantEventIDs))
+ }
+ gotEventIDs := make(map[string]struct{}, len(gotEvents))
+ for _, event := range gotEvents {
+ if event.ExcludeFromSync {
+ return fmt.Errorf("SelectEventsWithEventIDs ExcludeFromSync should be false for current room state event %+v", event)
+ }
+ gotEventIDs[event.EventID()] = struct{}{}
+ }
+ for _, id := range wantEventIDs {
+ if _, ok := gotEventIDs[id]; !ok {
+ return fmt.Errorf("SelectEventsWithEventIDs\nexpected id %q not returned", id)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ })
+}