aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/output_room_events_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go39
1 files changed, 23 insertions, 16 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index e7cf1935..5f9a1d0c 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -44,8 +44,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
- -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
- event_json TEXT NOT NULL,
+ -- The headered JSON for the event, containing potentially additional metadata such as
+ -- the room version. Stored as TEXT because this should be valid UTF-8.
+ headered_event_json TEXT NOT NULL,
-- The event type e.g 'm.room.member'.
type TEXT NOT NULL,
-- The 'sender' property of the event.
@@ -70,26 +71,26 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
- "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
+ "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = $11 " +
"RETURNING id"
const selectEventsSQL = "" +
- "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
+ "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" +
- "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectRecentEventsForSyncSQL = "" +
- "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
" ORDER BY id DESC LIMIT $4"
const selectEarlyEventsSQL = "" +
- "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
@@ -98,7 +99,7 @@ const selectMaxEventIDSQL = "" +
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
- "SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
+ "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
@@ -203,8 +204,8 @@ func (s *outputRoomEventsStatements) selectStateInRange(
}
// TODO: Handle redacted events
- ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
- if err != nil {
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@@ -220,7 +221,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
- Event: ev,
+ HeaderedEvent: ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,
}
@@ -248,7 +249,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx,
- event *gomatrixserverlib.Event, addState, removeState []string,
+ event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos types.StreamPosition, err error) {
var txnID *string
@@ -266,12 +267,18 @@ func (s *outputRoomEventsStatements) insertEvent(
_, containsURL = content["url"]
}
+ var headeredJSON []byte
+ headeredJSON, err = json.Marshal(event)
+ if err != nil {
+ return
+ }
+
stmt := common.TxStmt(txn, s.insertEventStmt)
err = stmt.QueryRowContext(
ctx,
event.RoomID(),
event.EventID(),
- event.JSON(),
+ headeredJSON,
event.Type(),
event.Sender(),
containsURL,
@@ -373,8 +380,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
- ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
- if err != nil {
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@@ -386,7 +393,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
result = append(result, types.StreamEvent{
- Event: ev,
+ HeaderedEvent: ev,
StreamPosition: streamPos,
TransactionID: transactionID,
ExcludeFromSync: excludeFromSync,