diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-09 18:07:17 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-09 18:07:17 +0000 |
commit | bad81c028f090af0e1005076829db67d1a749a14 (patch) | |
tree | 0e0be1720fec64c831fa4b86b09f9b11071106b2 /syncapi | |
parent | 851c02659a5c15896aa8d292e38e82c2169fa80a (diff) |
Don't recalculate event ID so often in sync (#1624)
* Don't bail so quickly in fetchMissingStateEvents
* Don't recalculate event IDs so often in sync API
* Add comments
* Fix comments
* Update to matrix-org/gomatrixserverlib@eb6a890
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/storage/postgres/current_room_state_table.go | 11 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 13 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 22 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/current_room_state_table.go | 11 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 13 |
5 files changed, 42 insertions, 28 deletions
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 12327278..554163e5 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -76,7 +76,7 @@ const selectRoomIDsWithMembershipSQL = "" + "SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" const selectCurrentStateSQL = "" + - "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" + + "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + @@ -92,10 +92,10 @@ const selectStateEventSQL = "" + const selectEventsWithEventIDsSQL = "" + // TODO: The session_id and transaction_id blanks are here because otherwise - // the rowsToStreamEvents expects there to be exactly five columns. We need to + // the rowsToStreamEvents expects there to be exactly six columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" type currentRoomStateStatements struct { @@ -278,13 +278,14 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { result := []*gomatrixserverlib.HeaderedEvent{} for rows.Next() { + var eventID string var eventBytes []byte - if err := rows.Scan(&eventBytes); err != nil { + if err := rows.Scan(&eventID, &eventBytes); err != nil { return nil, err } // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, err } result = append(result, &ev) diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index ce4b6335..f4bbebd2 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -79,20 +79,20 @@ const insertEventSQL = "" + "RETURNING id" const selectEventsSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" const selectRecentEventsForSyncSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " ORDER BY id DESC LIMIT $4" const selectEarlyEventsSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC LIMIT $4" @@ -413,6 +413,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool @@ -420,12 +421,12 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { txnID *string transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { return nil, err } // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, err } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 6c35a765..9df04943 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -514,25 +515,28 @@ func (d *Database) addPDUDeltaToResponse( deltas, joinedRoomIDs, err = d.getStateDeltas( ctx, &device, txn, r, device.UserID, &stateFilter, ) + if err != nil { + return nil, fmt.Errorf("d.getStateDeltas: %w", err) + } } else { deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( ctx, &device, txn, r, device.UserID, &stateFilter, ) - } - if err != nil { - return nil, err + if err != nil { + return nil, fmt.Errorf("d.getStateDeltasForFullStateSync: %w", err) + } } for _, delta := range deltas { err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res) if err != nil { - return nil, err + return nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err) } } // TODO: This should be done in getStateDeltas if err = d.addInvitesToResponse(ctx, txn, device.UserID, r, res); err != nil { - return nil, err + return nil, fmt.Errorf("d.addInvitesToResponse: %w", err) } succeeded = true @@ -1126,7 +1130,13 @@ func (d *Database) fetchMissingStateEvents( return nil, err } if len(stateEvents) != len(missing) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + logrus.WithContext(ctx).Warnf("Failed to map all event IDs to events (got %d, wanted %d)", len(stateEvents), len(missing)) + + // TODO: Why is this happening? It's probably the roomserver. Uncomment + // this error again when we work out what it is and fix it, otherwise we + // just end up returning lots of 500s to the client and that breaks + // pretty much everything, rather than just sending what we have. + //return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) } events = append(events, stateEvents...) return events, nil diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 357d4282..f16a6612 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -64,7 +64,7 @@ const selectRoomIDsWithMembershipSQL = "" + "SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" const selectCurrentStateSQL = "" + - "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" + + "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" + " AND ( $2 IS NULL OR sender IN ($2) )" + " AND ( $3 IS NULL OR NOT(sender IN ($3)) )" + " AND ( $4 IS NULL OR type IN ($4) )" + @@ -80,10 +80,10 @@ const selectStateEventSQL = "" + const selectEventsWithEventIDsSQL = "" + // TODO: The session_id and transaction_id blanks are here because otherwise - // the rowsToStreamEvents expects there to be exactly five columns. We need to + // the rowsToStreamEvents expects there to be exactly six columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + " FROM syncapi_current_room_state WHERE event_id IN ($1)" type currentRoomStateStatements struct { @@ -289,13 +289,14 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { result := []*gomatrixserverlib.HeaderedEvent{} for rows.Next() { + var eventID string var eventBytes []byte - if err := rows.Scan(&eventBytes); err != nil { + if err := rows.Scan(&eventID, &eventBytes); err != nil { return nil, err } // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, err } result = append(result, &ev) diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 06407582..edbd36fb 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -56,20 +56,20 @@ const insertEventSQL = "" + "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $13" const selectEventsSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" const selectRecentEventsSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" const selectRecentEventsForSyncSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " ORDER BY id DESC LIMIT $4" const selectEarlyEventsSQL = "" + - "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC LIMIT $4" @@ -428,6 +428,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool @@ -435,12 +436,12 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { txnID *string transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { return nil, err } // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, err } |