aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/output_room_events_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go197
1 files changed, 132 insertions, 65 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index ca271593..be302d73 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -22,6 +22,7 @@ import (
"sort"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrix"
"github.com/lib/pq"
@@ -36,28 +37,35 @@ CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
- -- An incrementing ID which denotes the position in the log that this event resides at.
- -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
- -- This isn't a problem for us since we just want to order by this field.
- id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
- -- The event ID for the event
- event_id TEXT NOT NULL,
- -- The 'room_id' key for the event.
- room_id TEXT NOT NULL,
- -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
- event_json TEXT NOT NULL,
- -- The event type e.g 'm.room.member'.
- type TEXT NOT NULL,
- -- The 'sender' property of the event.
- sender TEXT NOT NULL,
- -- true if the event content contains a url key.
- contains_url BOOL NOT NULL,
- -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
- -- if there is no delta.
- add_state_ids TEXT[],
- remove_state_ids TEXT[],
- session_id BIGINT, -- The client session that sent the event, if any
- transaction_id TEXT -- The transaction id used to send the event, if any
+ -- An incrementing ID which denotes the position in the log that this event resides at.
+ -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
+ -- This isn't a problem for us since we just want to order by this field.
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
+ -- The event ID for the event
+ event_id TEXT NOT NULL,
+ -- The 'room_id' key for the event.
+ room_id TEXT NOT NULL,
+ -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
+ event_json TEXT NOT NULL,
+ -- The event type e.g 'm.room.member'.
+ type TEXT NOT NULL,
+ -- The 'sender' property of the event.
+ sender TEXT NOT NULL,
+ -- true if the event content contains a url key.
+ contains_url BOOL NOT NULL,
+ -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
+ -- if there is no delta.
+ add_state_ids TEXT[],
+ remove_state_ids TEXT[],
+ -- The client session that sent the event, if any
+ session_id BIGINT,
+ -- The transaction id used to send the event, if any
+ transaction_id TEXT,
+ -- Should the event be excluded from responses to /sync requests. Useful for
+ -- events retrieved through backfilling that have a position in the stream
+ -- that relates to the moment these were retrieved rather than the moment these
+ -- were emitted.
+ exclude_from_sync BOOL DEFAULT FALSE
);
-- for event selection
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
@@ -65,23 +73,33 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
- "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id" +
- ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id"
+ "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
+ ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id"
const selectEventsSQL = "" +
- "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
+ "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" +
- "SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events" +
+ "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
+const selectRecentEventsForSyncSQL = "" +
+ "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
+ " ORDER BY id DESC LIMIT $4"
+
+const selectEarlyEventsSQL = "" +
+ "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ " WHERE room_id = $1 AND id > $2 AND id <= $3" +
+ " ORDER BY id ASC LIMIT $4"
+
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
- "SELECT id, event_json, add_state_ids, remove_state_ids" +
+ "SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
@@ -93,11 +111,13 @@ const selectStateInRangeSQL = "" +
" LIMIT $8"
type outputRoomEventsStatements struct {
- insertEventStmt *sql.Stmt
- selectEventsStmt *sql.Stmt
- selectMaxEventIDStmt *sql.Stmt
- selectRecentEventsStmt *sql.Stmt
- selectStateInRangeStmt *sql.Stmt
+ insertEventStmt *sql.Stmt
+ selectEventsStmt *sql.Stmt
+ selectMaxEventIDStmt *sql.Stmt
+ selectRecentEventsStmt *sql.Stmt
+ selectRecentEventsForSyncStmt *sql.Stmt
+ selectEarlyEventsStmt *sql.Stmt
+ selectStateInRangeStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@@ -117,6 +137,12 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
return
}
+ if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
+ return
+ }
+ if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
+ return
+ }
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return
}
@@ -127,9 +153,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange(
- ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
+ ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
stateFilterPart *gomatrix.FilterPart,
-) (map[string]map[string]bool, map[string]streamEvent, error) {
+) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
rows, err := stmt.QueryContext(
@@ -149,19 +175,20 @@ func (s *outputRoomEventsStatements) selectStateInRange(
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
// if they aren't in the event ID cache. We don't handle state deletion yet.
- eventIDToEvent := make(map[string]streamEvent)
+ eventIDToEvent := make(map[string]types.StreamEvent)
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
stateNeeded := make(map[string]map[string]bool)
for rows.Next() {
var (
- streamPos int64
- eventBytes []byte
- addIDs pq.StringArray
- delIDs pq.StringArray
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ addIDs pq.StringArray
+ delIDs pq.StringArray
)
- if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil {
+ if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
return nil, nil, err
}
// Sanity check for deleted state and whine if we see it. We don't need to do anything
@@ -192,9 +219,10 @@ func (s *outputRoomEventsStatements) selectStateInRange(
}
stateNeeded[ev.RoomID()] = needSet
- eventIDToEvent[ev.EventID()] = streamEvent{
- Event: ev,
- streamPosition: streamPos,
+ eventIDToEvent[ev.EventID()] = types.StreamEvent{
+ Event: ev,
+ StreamPosition: streamPos,
+ ExcludeFromSync: excludeFromSync,
}
}
@@ -221,8 +249,8 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string,
- transactionID *api.TransactionID,
-) (streamPos int64, err error) {
+ transactionID *api.TransactionID, excludeFromSync bool,
+) (streamPos types.StreamPosition, err error) {
var txnID *string
var sessionID *int64
if transactionID != nil {
@@ -251,16 +279,53 @@ func (s *outputRoomEventsStatements) insertEvent(
pq.StringArray(removeState),
sessionID,
txnID,
+ excludeFromSync,
).Scan(&streamPos)
return
}
-// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
+// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
+// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
+// from sync.
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos int64, limit int,
-) ([]streamEvent, error) {
- stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
+ roomID string, fromPos, toPos types.StreamPosition, limit int,
+ chronologicalOrder bool, onlySyncEvents bool,
+) ([]types.StreamEvent, error) {
+ var stmt *sql.Stmt
+ if onlySyncEvents {
+ stmt = common.TxStmt(txn, s.selectRecentEventsForSyncStmt)
+ } else {
+ stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
+ }
+
+ rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ events, err := rowsToStreamEvents(rows)
+ if err != nil {
+ return nil, err
+ }
+ if chronologicalOrder {
+ // The events need to be returned from oldest to latest, which isn't
+ // necessary the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(events, func(i int, j int) bool {
+ return events[i].StreamPosition < events[j].StreamPosition
+ })
+ }
+ return events, nil
+}
+
+// selectEarlyEvents returns the earliest events in the given room, starting
+// from a given position, up to a maximum of 'limit'.
+func (s *outputRoomEventsStatements) selectEarlyEvents(
+ ctx context.Context, txn *sql.Tx,
+ roomID string, fromPos, toPos types.StreamPosition, limit int,
+) ([]types.StreamEvent, error) {
+ stmt := common.TxStmt(txn, s.selectEarlyEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
@@ -274,16 +339,16 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
// necessarily the way the SQL query returns them, so a sort is necessary to
// ensure the events are in the right order in the slice.
sort.SliceStable(events, func(i int, j int) bool {
- return events[i].streamPosition < events[j].streamPosition
+ return events[i].StreamPosition < events[j].StreamPosition
})
return events, nil
}
-// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
-// from the database.
+// selectEvents returns the events for the given event IDs. If an event is
+// missing from the database, it will be omitted.
func (s *outputRoomEventsStatements) selectEvents(
ctx context.Context, txn *sql.Tx, eventIDs []string,
-) ([]streamEvent, error) {
+) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEventsStmt)
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil {
@@ -293,17 +358,18 @@ func (s *outputRoomEventsStatements) selectEvents(
return rowsToStreamEvents(rows)
}
-func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
- var result []streamEvent
+func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
+ var result []types.StreamEvent
for rows.Next() {
var (
- streamPos int64
- eventBytes []byte
- sessionID *int64
- txnID *string
- transactionID *api.TransactionID
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
)
- if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &txnID); err != nil {
+ if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
return nil, err
}
// TODO: Handle redacted events
@@ -319,10 +385,11 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
}
}
- result = append(result, streamEvent{
- Event: ev,
- streamPosition: streamPos,
- transactionID: transactionID,
+ result = append(result, types.StreamEvent{
+ Event: ev,
+ StreamPosition: streamPos,
+ TransactionID: transactionID,
+ ExcludeFromSync: excludeFromSync,
})
}
return result, nil