aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-02-07 14:31:23 +0100
committerGitHub <noreply@github.com>2023-02-07 14:31:23 +0100
commiteb29a315507f0075c2c6a495ac59c64a7f45f9fc (patch)
tree3ed37d947dbacdf95c77dc27e4193c671e4968dc /syncapi/storage/postgres
parentcf254ba0445e2509f77f41dbec69f632b126b847 (diff)
Optimize `/sync` and history visibility (#2961)
Should fix the following issues or make a lot less worse when using Postgres: The main issue behind #2911: The client gives up after a certain time, causing a cascade of context errors, because the response couldn't be built up fast enough. This mostly happens on accounts with many rooms, due to the inefficient way we're getting recent events and current state For #2777: The queries for getting the membership events for history visibility were being executed for each room (I think 185?), resulting in a whooping 2k queries for membership events. (Getting the statesnapshot -> block nids -> actual wanted membership event) Both should now be better by: - Using a LATERAL join to get all recent events for all joined rooms in one go (TODO: maybe do the same for room summary and current state etc) - If we're lazy loading on initial syncs, we're now not getting the whole current state, just to drop the majority of it because we're lazy loading members - we add a filter to exclude membership events on the first call to `CurrentState`. - Using an optimized query to get the membership events needed to calculate history visibility --------- Co-authored-by: kegsay <kegan@matrix.org>
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go9
-rw-r--r--syncapi/storage/postgres/deltas/20230201152200_rename_index.go29
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go160
3 files changed, 159 insertions, 39 deletions
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index 3caafa14..0d607b7c 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -275,6 +275,15 @@ func (s *currentRoomStateStatements) SelectCurrentState(
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
+ // We're going to query members later, so remove them from this request
+ if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
+ notTypes := &[]string{gomatrixserverlib.MRoomMember}
+ if stateFilter.NotTypes != nil {
+ *stateFilter.NotTypes = append(*stateFilter.NotTypes, gomatrixserverlib.MRoomMember)
+ } else {
+ stateFilter.NotTypes = notTypes
+ }
+ }
rows, err := stmt.QueryContext(ctx, roomID,
pq.StringArray(senders),
pq.StringArray(notSenders),
diff --git a/syncapi/storage/postgres/deltas/20230201152200_rename_index.go b/syncapi/storage/postgres/deltas/20230201152200_rename_index.go
new file mode 100644
index 00000000..5a0ec505
--- /dev/null
+++ b/syncapi/storage/postgres/deltas/20230201152200_rename_index.go
@@ -0,0 +1,29 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+)
+
+func UpRenameOutputRoomEventsIndex(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, `ALTER TABLE syncapi_output_room_events RENAME CONSTRAINT syncapi_event_id_idx TO syncapi_output_room_event_id_idx;`)
+ if err != nil {
+ return fmt.Errorf("failed to execute upgrade: %w", err)
+ }
+ return nil
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 0075fc8d..59fb99aa 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -19,18 +19,17 @@ import (
"context"
"database/sql"
"encoding/json"
+ "fmt"
"sort"
+ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
-
- "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
-
- "github.com/matrix-org/dendrite/internal/sqlutil"
)
const outputRoomEventsSchema = `
@@ -44,7 +43,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- This isn't a problem for us since we just want to order by this field.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event
- event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
+ event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The headered JSON for the event, containing potentially additional metadata such as
@@ -79,13 +78,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_room_id_idx ON syncapi_out
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON syncapi_output_room_events (exclude_from_sync);
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_add_state_ids_idx ON syncapi_output_room_events ((add_state_ids IS NOT NULL));
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_remove_state_ids_idx ON syncapi_output_room_events ((remove_state_ids IS NOT NULL));
+CREATE INDEX IF NOT EXISTS syncapi_output_room_events_recent_events_idx ON syncapi_output_room_events (room_id, exclude_from_sync, id, sender, type);
+
+
`
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
- "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
+ "ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"RETURNING id"
const selectEventsSQL = "" +
@@ -109,14 +111,29 @@ const selectRecentEventsSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
-const selectRecentEventsForSyncSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
- " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
- " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
- " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
- " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
- " ORDER BY id DESC LIMIT $8"
+// selectRecentEventsForSyncSQL contains an optimization to get the recent events for a list of rooms, using a LATERAL JOIN
+// The sub select inside LATERAL () is executed for all room_ids it gets as a parameter $1
+const selectRecentEventsForSyncSQL = `
+WITH room_ids AS (
+ SELECT unnest($1::text[]) AS room_id
+)
+SELECT x.*
+FROM room_ids,
+ LATERAL (
+ SELECT room_id, event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility
+ FROM syncapi_output_room_events recent_events
+ WHERE
+ recent_events.room_id = room_ids.room_id
+ AND recent_events.exclude_from_sync = FALSE
+ AND id > $2 AND id <= $3
+ AND ( $4::text[] IS NULL OR sender = ANY($4) )
+ AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )
+ AND ( $6::text[] IS NULL OR type LIKE ANY($6) )
+ AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )
+ ORDER BY recent_events.id DESC
+ LIMIT $8
+ ) AS x
+`
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
@@ -207,12 +224,30 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
return nil, err
}
+ migrationName := "syncapi: rename dupe index (output_room_events)"
+
+ var cName string
+ err = db.QueryRowContext(context.Background(), "select constraint_name from information_schema.table_constraints where table_name = 'syncapi_output_room_events' AND constraint_name = 'syncapi_event_id_idx'").Scan(&cName)
+ switch err {
+ case sql.ErrNoRows: // migration was already executed, as the index was renamed
+ if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil {
+ return nil, fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
+ }
+ case nil:
+ default:
+ return nil, err
+ }
+
m := sqlutil.NewMigrator(db)
m.AddMigrations(
sqlutil.Migration{
Version: "syncapi: add history visibility column (output_room_events)",
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
},
+ sqlutil.Migration{
+ Version: migrationName,
+ Up: deltas.UpRenameOutputRoomEventsIndex,
+ },
)
err = m.Up(context.Background())
if err != nil {
@@ -398,9 +433,9 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
+ roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, bool, error) {
+) (map[string]types.RecentEvents, error) {
var stmt *sql.Stmt
if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
@@ -408,8 +443,9 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
senders, notSenders := getSendersRoomEventFilter(eventFilter)
+
rows, err := stmt.QueryContext(
- ctx, roomID, r.Low(), r.High(),
+ ctx, pq.StringArray(roomIDs), ra.Low(), ra.High(),
pq.StringArray(senders),
pq.StringArray(notSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
@@ -417,34 +453,80 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
eventFilter.Limit+1,
)
if err != nil {
- return nil, false, err
+ return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
- events, err := rowsToStreamEvents(rows)
- if err != nil {
- return nil, false, err
- }
- if chronologicalOrder {
- // The events need to be returned from oldest to latest, which isn't
- // necessary the way the SQL query returns them, so a sort is necessary to
- // ensure the events are in the right order in the slice.
- sort.SliceStable(events, func(i int, j int) bool {
- return events[i].StreamPosition < events[j].StreamPosition
+
+ result := make(map[string]types.RecentEvents)
+
+ for rows.Next() {
+ var (
+ roomID string
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
+ historyVisibility gomatrixserverlib.HistoryVisibility
+ )
+ if err := rows.Scan(&roomID, &eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+
+ if sessionID != nil && txnID != nil {
+ transactionID = &api.TransactionID{
+ SessionID: *sessionID,
+ TransactionID: *txnID,
+ }
+ }
+
+ r := result[roomID]
+
+ ev.Visibility = historyVisibility
+ r.Events = append(r.Events, types.StreamEvent{
+ HeaderedEvent: &ev,
+ StreamPosition: streamPos,
+ TransactionID: transactionID,
+ ExcludeFromSync: excludeFromSync,
})
+
+ result[roomID] = r
}
- // we queried for 1 more than the limit, so if we returned one more mark limited=true
- limited := false
- if len(events) > eventFilter.Limit {
- limited = true
- // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
- if chronologicalOrder {
- events = events[1:]
- } else {
- events = events[:len(events)-1]
+
+ if chronologicalOrder {
+ for roomID, evs := range result {
+ // The events need to be returned from oldest to latest, which isn't
+ // necessary the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(evs.Events, func(i int, j int) bool {
+ return evs.Events[i].StreamPosition < evs.Events[j].StreamPosition
+ })
+
+ if len(evs.Events) > eventFilter.Limit {
+ evs.Limited = true
+ evs.Events = evs.Events[1:]
+ }
+
+ result[roomID] = evs
}
- }
+ } else {
+ for roomID, evs := range result {
+ if len(evs.Events) > eventFilter.Limit {
+ evs.Limited = true
+ evs.Events = evs.Events[:len(evs.Events)-1]
+ }
- return events, limited, nil
+ result[roomID] = evs
+ }
+ }
+ return result, rows.Err()
}
// selectEarlyEvents returns the earliest events in the given room, starting