diff options
Diffstat (limited to 'syncapi/storage/postgres')
3 files changed, 159 insertions, 39 deletions
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 3caafa14..0d607b7c 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -275,6 +275,15 @@ func (s *currentRoomStateStatements) SelectCurrentState( ) ([]*gomatrixserverlib.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt) senders, notSenders := getSendersStateFilterFilter(stateFilter) + // We're going to query members later, so remove them from this request + if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers { + notTypes := &[]string{gomatrixserverlib.MRoomMember} + if stateFilter.NotTypes != nil { + *stateFilter.NotTypes = append(*stateFilter.NotTypes, gomatrixserverlib.MRoomMember) + } else { + stateFilter.NotTypes = notTypes + } + } rows, err := stmt.QueryContext(ctx, roomID, pq.StringArray(senders), pq.StringArray(notSenders), diff --git a/syncapi/storage/postgres/deltas/20230201152200_rename_index.go b/syncapi/storage/postgres/deltas/20230201152200_rename_index.go new file mode 100644 index 00000000..5a0ec505 --- /dev/null +++ b/syncapi/storage/postgres/deltas/20230201152200_rename_index.go @@ -0,0 +1,29 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" +) + +func UpRenameOutputRoomEventsIndex(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, `ALTER TABLE syncapi_output_room_events RENAME CONSTRAINT syncapi_event_id_idx TO syncapi_output_room_event_id_idx;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 0075fc8d..59fb99aa 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -19,18 +19,17 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "sort" + "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - - "github.com/lib/pq" "github.com/matrix-org/gomatrixserverlib" - - "github.com/matrix-org/dendrite/internal/sqlutil" ) const outputRoomEventsSchema = ` @@ -44,7 +43,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- This isn't a problem for us since we just want to order by this field. id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event - event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE, + event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE, -- The 'room_id' key for the event. room_id TEXT NOT NULL, -- The headered JSON for the event, containing potentially additional metadata such as @@ -79,13 +78,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_room_id_idx ON syncapi_out CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON syncapi_output_room_events (exclude_from_sync); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_add_state_ids_idx ON syncapi_output_room_events ((add_state_ids IS NOT NULL)); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_remove_state_ids_idx ON syncapi_output_room_events ((remove_state_ids IS NOT NULL)); +CREATE INDEX IF NOT EXISTS syncapi_output_room_events_recent_events_idx ON syncapi_output_room_events (room_id, exclude_from_sync, id, sender, type); + + ` const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + - "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + + "ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + "RETURNING id" const selectEventsSQL = "" + @@ -109,14 +111,29 @@ const selectRecentEventsSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id DESC LIMIT $8" -const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + - " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + - " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + - " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + - " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + - " ORDER BY id DESC LIMIT $8" +// selectRecentEventsForSyncSQL contains an optimization to get the recent events for a list of rooms, using a LATERAL JOIN +// The sub select inside LATERAL () is executed for all room_ids it gets as a parameter $1 +const selectRecentEventsForSyncSQL = ` +WITH room_ids AS ( + SELECT unnest($1::text[]) AS room_id +) +SELECT x.* +FROM room_ids, + LATERAL ( + SELECT room_id, event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility + FROM syncapi_output_room_events recent_events + WHERE + recent_events.room_id = room_ids.room_id + AND recent_events.exclude_from_sync = FALSE + AND id > $2 AND id <= $3 + AND ( $4::text[] IS NULL OR sender = ANY($4) ) + AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) ) + AND ( $6::text[] IS NULL OR type LIKE ANY($6) ) + AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) ) + ORDER BY recent_events.id DESC + LIMIT $8 + ) AS x +` const selectEarlyEventsSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + @@ -207,12 +224,30 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { return nil, err } + migrationName := "syncapi: rename dupe index (output_room_events)" + + var cName string + err = db.QueryRowContext(context.Background(), "select constraint_name from information_schema.table_constraints where table_name = 'syncapi_output_room_events' AND constraint_name = 'syncapi_event_id_idx'").Scan(&cName) + switch err { + case sql.ErrNoRows: // migration was already executed, as the index was renamed + if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil { + return nil, fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) + } + case nil: + default: + return nil, err + } + m := sqlutil.NewMigrator(db) m.AddMigrations( sqlutil.Migration{ Version: "syncapi: add history visibility column (output_room_events)", Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, }, + sqlutil.Migration{ + Version: migrationName, + Up: deltas.UpRenameOutputRoomEventsIndex, + }, ) err = m.Up(context.Background()) if err != nil { @@ -398,9 +433,9 @@ func (s *outputRoomEventsStatements) InsertEvent( // from sync. func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, + roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, bool, error) { +) (map[string]types.RecentEvents, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) @@ -408,8 +443,9 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } senders, notSenders := getSendersRoomEventFilter(eventFilter) + rows, err := stmt.QueryContext( - ctx, roomID, r.Low(), r.High(), + ctx, pq.StringArray(roomIDs), ra.Low(), ra.High(), pq.StringArray(senders), pq.StringArray(notSenders), pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)), @@ -417,34 +453,80 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( eventFilter.Limit+1, ) if err != nil { - return nil, false, err + return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") - events, err := rowsToStreamEvents(rows) - if err != nil { - return nil, false, err - } - if chronologicalOrder { - // The events need to be returned from oldest to latest, which isn't - // necessary the way the SQL query returns them, so a sort is necessary to - // ensure the events are in the right order in the slice. - sort.SliceStable(events, func(i int, j int) bool { - return events[i].StreamPosition < events[j].StreamPosition + + result := make(map[string]types.RecentEvents) + + for rows.Next() { + var ( + roomID string + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility gomatrixserverlib.HistoryVisibility + ) + if err := rows.Scan(&roomID, &eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { + return nil, err + } + // TODO: Handle redacted events + var ev gomatrixserverlib.HeaderedEvent + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + return nil, err + } + + if sessionID != nil && txnID != nil { + transactionID = &api.TransactionID{ + SessionID: *sessionID, + TransactionID: *txnID, + } + } + + r := result[roomID] + + ev.Visibility = historyVisibility + r.Events = append(r.Events, types.StreamEvent{ + HeaderedEvent: &ev, + StreamPosition: streamPos, + TransactionID: transactionID, + ExcludeFromSync: excludeFromSync, }) + + result[roomID] = r } - // we queried for 1 more than the limit, so if we returned one more mark limited=true - limited := false - if len(events) > eventFilter.Limit { - limited = true - // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. - if chronologicalOrder { - events = events[1:] - } else { - events = events[:len(events)-1] + + if chronologicalOrder { + for roomID, evs := range result { + // The events need to be returned from oldest to latest, which isn't + // necessary the way the SQL query returns them, so a sort is necessary to + // ensure the events are in the right order in the slice. + sort.SliceStable(evs.Events, func(i int, j int) bool { + return evs.Events[i].StreamPosition < evs.Events[j].StreamPosition + }) + + if len(evs.Events) > eventFilter.Limit { + evs.Limited = true + evs.Events = evs.Events[1:] + } + + result[roomID] = evs } - } + } else { + for roomID, evs := range result { + if len(evs.Events) > eventFilter.Limit { + evs.Limited = true + evs.Events = evs.Events[:len(evs.Events)-1] + } - return events, limited, nil + result[roomID] = evs + } + } + return result, rows.Err() } // selectEarlyEvents returns the earliest events in the given room, starting |