diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-07-18 14:46:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-18 14:46:15 +0200 |
commit | a7e92f8cb99105c2a579f73fbc972b7fb0b35678 (patch) | |
tree | d9df6aa7b9aa2321f9833fd964c108f2481153ff /syncapi/storage/postgres | |
parent | c0c909d30642d03600fd48bd052a22040a67267b (diff) |
History visibility database changes (#2533)
* Add new history_visibility column
* Update SQL queries to include history_visibility
* Store the history visibilty calculated by the roomserver
* Update GMSL
* Update migrations
* Fix migration
* Update GMSL
* Fix `go.sum`
* Update GMSL to use sql.Scanner & sql.Valuer
* Re-order migration/table creation
* Update gomatrixserverlib
* Add history_visibility column to current_room_state
* Fix migrations
* Return error instead of Fatal log
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/storage/postgres')
4 files changed, 117 insertions, 45 deletions
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index c4667baf..aae2d8c3 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( -- The serial ID of the output_room_events table when this event became -- part of the current state of the room. added_at BIGINT, + history_visibility SMALLINT NOT NULL DEFAULT 2, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); @@ -63,8 +64,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9" @@ -100,11 +101,11 @@ const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" const selectEventsWithEventIDsSQL = "" + - // TODO: The session_id and transaction_id blanks are here because otherwise - // the rowsToStreamEvents expects there to be exactly six columns. We need to + // TODO: The session_id and transaction_id blanks are here because + // the rowsToStreamEvents expects there to be exactly seven columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" const selectSharedUsersSQL = "" + @@ -336,6 +337,7 @@ func (s *currentRoomStateStatements) UpsertRoomState( headeredJSON, membership, addedAt, + event.Visibility, ) return err } diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go new file mode 100644 index 00000000..2b4b6528 --- /dev/null +++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go @@ -0,0 +1,50 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) { + m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn) +} + +func UpAddHistoryVisibilityColumn(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE syncapi_output_room_events ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; + UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted'); + ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; + UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted'); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownAddHistoryVisibilityColumn(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility; + ALTER TABLE syncapi_current_room_state DROP COLUMN IF EXISTS history_visibility; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index d84d0cfa..d0474d4c 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -67,7 +67,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- events retrieved through backfilling that have a position in the stream -- that relates to the moment these were retrieved rather than the moment these -- were emitted. - exclude_from_sync BOOL DEFAULT FALSE + exclude_from_sync BOOL DEFAULT FALSE, + -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined) + history_visibility SMALLINT NOT NULL DEFAULT 2 ); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type); @@ -78,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " + + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + "RETURNING id" const selectEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectEventsWithFilterSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + @@ -96,7 +98,7 @@ const selectEventsWithFilterSQL = "" + " LIMIT $7" const selectRecentEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -105,7 +107,7 @@ const selectRecentEventsSQL = "" + " ORDER BY id DESC LIMIT $8" const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -114,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" + " ORDER BY id DESC LIMIT $8" const selectEarlyEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -130,7 +132,7 @@ const updateEventJSONSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND room_id = ANY($3)" + @@ -146,10 +148,10 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" const selectContextEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + + "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + @@ -157,7 +159,7 @@ const selectContextBeforeEventSQL = "" + " ORDER BY id DESC LIMIT $3" const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + @@ -246,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - addIDs pq.StringArray - delIDs pq.StringArray + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + addIDs pq.StringArray + delIDs pq.StringArray + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -283,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( needSet[id] = true } stateNeeded[ev.RoomID()] = needSet + ev.Visibility = historyVisibility eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, @@ -314,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, - transactionID *api.TransactionID, excludeFromSync bool, + transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, ) (streamPos types.StreamPosition, err error) { var txnID *string var sessionID *int64 @@ -351,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent( sessionID, txnID, excludeFromSync, + historyVisibility, ).Scan(&streamPos) return } @@ -504,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string - if err = row.Scan(&id, &eventAsString); err != nil { + var historyVisibility gomatrixserverlib.HistoryVisibility + if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil { return 0, evt, err } if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil { return 0, evt, err } + evt.Visibility = historyVisibility return id, evt, nil } @@ -532,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err = rows.Scan(&eventBytes); err != nil { + if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { return evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return evts, err } + evt.Visibility = historyVisibility evts = append(evts, evt) } @@ -565,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err = rows.Scan(&lastID, &eventBytes); err != nil { + if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { return 0, evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return 0, evts, err } + evt.Visibility = historyVisibility evts = append(evts, evt) } @@ -584,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - sessionID *int64 - txnID *string - transactionID *api.TransactionID + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { return nil, err } // TODO: Handle redacted events diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 9cfe7c07..c6121e41 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -42,15 +42,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } - accountData, err := NewPostgresAccountDataTable(d.db) - if err != nil { + if _, err = d.db.Exec(outputRoomEventsSchema); err != nil { return nil, err } - events, err := NewPostgresEventsTable(d.db) - if err != nil { + if _, err = d.db.Exec(currentRoomStateSchema); err != nil { return nil, err } - currState, err := NewPostgresCurrentRoomStateTable(d.db) + accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { return nil, err } @@ -101,9 +99,19 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) + deltas.LoadAddHistoryVisibilityColumn(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { return nil, err } + // prepare statements after the migrations have run + events, err := NewPostgresEventsTable(d.db) + if err != nil { + return nil, err + } + currState, err := NewPostgresCurrentRoomStateTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, |