diff options
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 197 |
1 files changed, 132 insertions, 65 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index ca271593..be302d73 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -22,6 +22,7 @@ import ( "sort" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrix" "github.com/lib/pq" @@ -36,28 +37,35 @@ CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( - -- An incrementing ID which denotes the position in the log that this event resides at. - -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. - -- This isn't a problem for us since we just want to order by this field. - id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), - -- The event ID for the event - event_id TEXT NOT NULL, - -- The 'room_id' key for the event. - room_id TEXT NOT NULL, - -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. - event_json TEXT NOT NULL, - -- The event type e.g 'm.room.member'. - type TEXT NOT NULL, - -- The 'sender' property of the event. - sender TEXT NOT NULL, - -- true if the event content contains a url key. - contains_url BOOL NOT NULL, - -- A list of event IDs which represent a delta of added/removed room state. This can be NULL - -- if there is no delta. - add_state_ids TEXT[], - remove_state_ids TEXT[], - session_id BIGINT, -- The client session that sent the event, if any - transaction_id TEXT -- The transaction id used to send the event, if any + -- An incrementing ID which denotes the position in the log that this event resides at. + -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. + -- This isn't a problem for us since we just want to order by this field. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + -- The event ID for the event + event_id TEXT NOT NULL, + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- The event type e.g 'm.room.member'. + type TEXT NOT NULL, + -- The 'sender' property of the event. + sender TEXT NOT NULL, + -- true if the event content contains a url key. + contains_url BOOL NOT NULL, + -- A list of event IDs which represent a delta of added/removed room state. This can be NULL + -- if there is no delta. + add_state_ids TEXT[], + remove_state_ids TEXT[], + -- The client session that sent the event, if any + session_id BIGINT, + -- The transaction id used to send the event, if any + transaction_id TEXT, + -- Should the event be excluded from responses to /sync requests. Useful for + -- events retrieved through backfilling that have a position in the stream + -- that relates to the moment these were retrieved rather than the moment these + -- were emitted. + exclude_from_sync BOOL DEFAULT FALSE ); -- for event selection CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); @@ -65,23 +73,33 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id" + "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id" const selectEventsSQL = "" + - "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events" + + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" +const selectRecentEventsForSyncSQL = "" + + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + + " ORDER BY id DESC LIMIT $4" + +const selectEarlyEventsSQL = "" + + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + " WHERE room_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id ASC LIMIT $4" + const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT id, event_json, add_state_ids, remove_state_ids" + + "SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + @@ -93,11 +111,13 @@ const selectStateInRangeSQL = "" + " LIMIT $8" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectRecentEventsForSyncStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -117,6 +137,12 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } + if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { + return + } + if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { + return + } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { return } @@ -127,9 +153,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) selectStateInRange( - ctx context.Context, txn *sql.Tx, oldPos, newPos int64, + ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilterPart *gomatrix.FilterPart, -) (map[string]map[string]bool, map[string]streamEvent, error) { +) (map[string]map[string]bool, map[string]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectStateInRangeStmt) rows, err := stmt.QueryContext( @@ -149,19 +175,20 @@ func (s *outputRoomEventsStatements) selectStateInRange( // - For each room ID, build up an array of event IDs which represents cumulative adds/removes // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID // if they aren't in the event ID cache. We don't handle state deletion yet. - eventIDToEvent := make(map[string]streamEvent) + eventIDToEvent := make(map[string]types.StreamEvent) // RoomID => A set (map[string]bool) of state event IDs which are between the two positions stateNeeded := make(map[string]map[string]bool) for rows.Next() { var ( - streamPos int64 - eventBytes []byte - addIDs pq.StringArray - delIDs pq.StringArray + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + addIDs pq.StringArray + delIDs pq.StringArray ) - if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -192,9 +219,10 @@ func (s *outputRoomEventsStatements) selectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = streamEvent{ - Event: ev, - streamPosition: streamPos, + eventIDToEvent[ev.EventID()] = types.StreamEvent{ + Event: ev, + StreamPosition: streamPos, + ExcludeFromSync: excludeFromSync, } } @@ -221,8 +249,8 @@ func (s *outputRoomEventsStatements) selectMaxEventID( func (s *outputRoomEventsStatements) insertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string, - transactionID *api.TransactionID, -) (streamPos int64, err error) { + transactionID *api.TransactionID, excludeFromSync bool, +) (streamPos types.StreamPosition, err error) { var txnID *string var sessionID *int64 if transactionID != nil { @@ -251,16 +279,53 @@ func (s *outputRoomEventsStatements) insertEvent( pq.StringArray(removeState), sessionID, txnID, + excludeFromSync, ).Scan(&streamPos) return } -// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. +// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. +// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude +// from sync. func (s *outputRoomEventsStatements) selectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos int64, limit int, -) ([]streamEvent, error) { - stmt := common.TxStmt(txn, s.selectRecentEventsStmt) + roomID string, fromPos, toPos types.StreamPosition, limit int, + chronologicalOrder bool, onlySyncEvents bool, +) ([]types.StreamEvent, error) { + var stmt *sql.Stmt + if onlySyncEvents { + stmt = common.TxStmt(txn, s.selectRecentEventsForSyncStmt) + } else { + stmt = common.TxStmt(txn, s.selectRecentEventsStmt) + } + + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + events, err := rowsToStreamEvents(rows) + if err != nil { + return nil, err + } + if chronologicalOrder { + // The events need to be returned from oldest to latest, which isn't + // necessary the way the SQL query returns them, so a sort is necessary to + // ensure the events are in the right order in the slice. + sort.SliceStable(events, func(i int, j int) bool { + return events[i].StreamPosition < events[j].StreamPosition + }) + } + return events, nil +} + +// selectEarlyEvents returns the earliest events in the given room, starting +// from a given position, up to a maximum of 'limit'. +func (s *outputRoomEventsStatements) selectEarlyEvents( + ctx context.Context, txn *sql.Tx, + roomID string, fromPos, toPos types.StreamPosition, limit int, +) ([]types.StreamEvent, error) { + stmt := common.TxStmt(txn, s.selectEarlyEventsStmt) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { return nil, err @@ -274,16 +339,16 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // necessarily the way the SQL query returns them, so a sort is necessary to // ensure the events are in the right order in the slice. sort.SliceStable(events, func(i int, j int) bool { - return events[i].streamPosition < events[j].streamPosition + return events[i].StreamPosition < events[j].StreamPosition }) return events, nil } -// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing -// from the database. +// selectEvents returns the events for the given event IDs. If an event is +// missing from the database, it will be omitted. func (s *outputRoomEventsStatements) selectEvents( ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]streamEvent, error) { +) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEventsStmt) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) if err != nil { @@ -293,17 +358,18 @@ func (s *outputRoomEventsStatements) selectEvents( return rowsToStreamEvents(rows) } -func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { - var result []streamEvent +func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { + var result []types.StreamEvent for rows.Next() { var ( - streamPos int64 - eventBytes []byte - sessionID *int64 - txnID *string - transactionID *api.TransactionID + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &txnID); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { return nil, err } // TODO: Handle redacted events @@ -319,10 +385,11 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { } } - result = append(result, streamEvent{ - Event: ev, - streamPosition: streamPos, - transactionID: transactionID, + result = append(result, types.StreamEvent{ + Event: ev, + StreamPosition: streamPos, + TransactionID: transactionID, + ExcludeFromSync: excludeFromSync, }) } return result, nil |