aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-01 11:01:34 +0100
committerGitHub <noreply@github.com>2020-05-01 11:01:34 +0100
commitb28674435e3024bf0e4723a9cf53180607f2045e (patch)
treef1e7d1f717624bb8419250626109ac16c52af103 /syncapi/storage/postgres
parente15f6676ac3f76ec2ef679c2df300d6a8e7e668f (diff)
Correctly generate backpagination tokens for events which have the same depth (#996)
* Correctly generate backpagination tokens for events which have the same depth With tests. Unfortunately the code around here is hard to understand. There will be a subsequent PR which fixes this up now that we have a test harness in place. * Add postgres impl * More linting * Fix psql statement so it actually works
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go27
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go41
-rw-r--r--syncapi/storage/postgres/syncserver.go18
3 files changed, 57 insertions, 29 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 0b53dfa9..769823e9 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -94,6 +94,9 @@ const selectEarlyEventsSQL = "" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
+const selectStreamPositionForEventIDSQL = "" +
+ "SELECT id FROM syncapi_output_room_events WHERE event_id = $1"
+
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@@ -111,13 +114,14 @@ const selectStateInRangeSQL = "" +
" LIMIT $8"
type outputRoomEventsStatements struct {
- insertEventStmt *sql.Stmt
- selectEventsStmt *sql.Stmt
- selectMaxEventIDStmt *sql.Stmt
- selectRecentEventsStmt *sql.Stmt
- selectRecentEventsForSyncStmt *sql.Stmt
- selectEarlyEventsStmt *sql.Stmt
- selectStateInRangeStmt *sql.Stmt
+ insertEventStmt *sql.Stmt
+ selectEventsStmt *sql.Stmt
+ selectMaxEventIDStmt *sql.Stmt
+ selectRecentEventsStmt *sql.Stmt
+ selectRecentEventsForSyncStmt *sql.Stmt
+ selectEarlyEventsStmt *sql.Stmt
+ selectStateInRangeStmt *sql.Stmt
+ selectStreamPositionForEventIDStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@@ -146,9 +150,18 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return
}
+ if s.selectStreamPositionForEventIDStmt, err = db.Prepare(selectStreamPositionForEventIDSQL); err != nil {
+ return
+ }
return
}
+func (s *outputRoomEventsStatements) selectStreamPositionForEventID(ctx context.Context, eventID string) (types.StreamPosition, error) {
+ var id int64
+ err := s.selectStreamPositionForEventIDStmt.QueryRowContext(ctx, eventID).Scan(&id)
+ return types.StreamPosition(id), err
+}
+
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 280d4ec3..f77365c8 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -32,35 +32,44 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology (
-- The place of the event in the room's topology. This can usually be determined
-- from the event's depth.
topological_position BIGINT NOT NULL,
+ stream_position BIGINT NOT NULL,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL
);
-- The topological order will be used in events selection and ordering
-CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, room_id);
+CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, stream_position, room_id);
`
const insertEventInTopologySQL = "" +
- "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" +
- " VALUES ($1, $2, $3)" +
- " ON CONFLICT (topological_position, room_id) DO UPDATE SET event_id = $1"
+ "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
+ " VALUES ($1, $2, $3, $4)" +
+ " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1"
const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
- " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" +
- " ORDER BY topological_position ASC LIMIT $4"
+ " WHERE room_id = $1 AND" +
+ "(topological_position > $2 AND topological_position < $3) OR" +
+ "(topological_position = $4 AND stream_position <= $5)" +
+ " ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
const selectEventIDsInRangeDESCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
- " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" +
- " ORDER BY topological_position DESC LIMIT $4"
+ " WHERE room_id = $1 AND" +
+ "(topological_position > $2 AND topological_position < $3) OR" +
+ "(topological_position = $4 AND stream_position <= $5)" +
+ " ORDER BY topological_position DESC, stream_position DESC LIMIT $6"
const selectPositionInTopologySQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1"
+ // Select the max topological position for the room, then sort by stream position and take the highest,
+ // returning both topological and stream positions.
const selectMaxPositionInTopologySQL = "" +
- "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" +
- " WHERE room_id = $1"
+ "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
+ " WHERE topological_position=(" +
+ "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" +
+ ") ORDER BY stream_position DESC LIMIT 1"
const selectEventIDsFromPositionSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
@@ -104,10 +113,10 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
- ctx context.Context, event *gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) {
_, err = s.insertEventInTopologyStmt.ExecContext(
- ctx, event.EventID(), event.Depth(), event.RoomID(),
+ ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
)
return
}
@@ -116,7 +125,7 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
// given range in a given room's topological order.
// Returns an empty slice if no events match the given range.
func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
- ctx context.Context, roomID string, fromPos, toPos types.StreamPosition,
+ ctx context.Context, roomID string, fromPos, toPos, toMicroPos types.StreamPosition,
limit int, chronologicalOrder bool,
) (eventIDs []string, err error) {
// Decide on the selection's order according to whether chronological order
@@ -129,7 +138,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
}
// Query the event IDs.
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit)
if err == sql.ErrNoRows {
// If no event matched the request, return an empty slice.
return []string{}, nil
@@ -161,8 +170,8 @@ func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
ctx context.Context, roomID string,
-) (pos types.StreamPosition, err error) {
- err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos)
+) (pos types.StreamPosition, spos types.StreamPosition, err error) {
+ err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index ef970739..744ae7b8 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -159,7 +159,7 @@ func (d *SyncServerDatasource) WriteEvent(
}
pduPosition = pos
- if err = d.topology.insertEventInTopology(ctx, ev); err != nil {
+ if err = d.topology.insertEventInTopology(ctx, ev, pos); err != nil {
return err
}
@@ -240,12 +240,13 @@ func (d *SyncServerDatasource) GetEventsInRange(
if from.Type == types.PaginationTokenTypeTopology {
// Determine the backward and forward limit, i.e. the upper and lower
// limits to the selection in the room's topology, from the direction.
- var backwardLimit, forwardLimit types.StreamPosition
+ var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
if backwardOrdering {
// Backward ordering is antichronological (latest event to oldest
// one).
backwardLimit = to.PDUPosition
forwardLimit = from.PDUPosition
+ forwardMicroLimit = from.EDUTypingPosition
} else {
// Forward ordering is chronological (oldest event to latest one).
backwardLimit = from.PDUPosition
@@ -255,7 +256,7 @@ func (d *SyncServerDatasource) GetEventsInRange(
// Select the event IDs from the defined range.
var eIDs []string
eIDs, err = d.topology.selectEventIDsInRange(
- ctx, roomID, backwardLimit, forwardLimit, limit, !backwardOrdering,
+ ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
)
if err != nil {
return
@@ -301,7 +302,7 @@ func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
func (d *SyncServerDatasource) MaxTopologicalPosition(
ctx context.Context, roomID string,
-) (types.StreamPosition, error) {
+) (depth types.StreamPosition, stream types.StreamPosition, err error) {
return d.topology.selectMaxPositionInTopology(ctx, roomID)
}
@@ -318,8 +319,13 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition(
func (d *SyncServerDatasource) EventPositionInTopology(
ctx context.Context, eventID string,
-) (types.StreamPosition, error) {
- return d.topology.selectPositionInTopology(ctx, eventID)
+) (depth types.StreamPosition, stream types.StreamPosition, err error) {
+ depth, err = d.topology.selectPositionInTopology(ctx, eventID)
+ if err != nil {
+ return
+ }
+ stream, err = d.events.selectStreamPositionForEventID(ctx, eventID)
+ return
}
func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {