diff options
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index e7cf1935..5f9a1d0c 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -44,8 +44,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE, -- The 'room_id' key for the event. room_id TEXT NOT NULL, - -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. - event_json TEXT NOT NULL, + -- The headered JSON for the event, containing potentially additional metadata such as + -- the room version. Stored as TEXT because this should be valid UTF-8. + headered_event_json TEXT NOT NULL, -- The event type e.g 'm.room.member'. type TEXT NOT NULL, -- The 'sender' property of the event. @@ -70,26 +71,26 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " + "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = $11 " + "RETURNING id" const selectEventsSQL = "" + - "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" const selectRecentEventsForSyncSQL = "" + - "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " ORDER BY id DESC LIMIT $4" const selectEarlyEventsSQL = "" + - "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC LIMIT $4" @@ -98,7 +99,7 @@ const selectMaxEventIDSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + @@ -203,8 +204,8 @@ func (s *outputRoomEventsStatements) selectStateInRange( } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) - if err != nil { + var ev gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -220,7 +221,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( stateNeeded[ev.RoomID()] = needSet eventIDToEvent[ev.EventID()] = types.StreamEvent{ - Event: ev, + HeaderedEvent: ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync, } @@ -248,7 +249,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID( // of the inserted event. func (s *outputRoomEventsStatements) insertEvent( ctx context.Context, txn *sql.Tx, - event *gomatrixserverlib.Event, addState, removeState []string, + event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, ) (streamPos types.StreamPosition, err error) { var txnID *string @@ -266,12 +267,18 @@ func (s *outputRoomEventsStatements) insertEvent( _, containsURL = content["url"] } + var headeredJSON []byte + headeredJSON, err = json.Marshal(event) + if err != nil { + return + } + stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( ctx, event.RoomID(), event.EventID(), - event.JSON(), + headeredJSON, event.Type(), event.Sender(), containsURL, @@ -373,8 +380,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { return nil, err } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) - if err != nil { + var ev gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err } @@ -386,7 +393,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } result = append(result, types.StreamEvent{ - Event: ev, + HeaderedEvent: ev, StreamPosition: streamPos, TransactionID: transactionID, ExcludeFromSync: excludeFromSync, |