aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-14 17:30:16 +0100
committerGitHub <noreply@github.com>2020-05-14 17:30:16 +0100
commit7ca230e931faceb0b8a9e60314258d6cd59f33d4 (patch)
treee1a6ba92d289e322111a39d230ea7accc7e61c85 /syncapi/storage
parent3cb04e80042104d14ccfa162b3e40a5b08819eac (diff)
Cleanup syncapi topology logic (#1035)
* Cleanup syncapi topology logic * Variable renaming * comments
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/interface.go7
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go37
-rw-r--r--syncapi/storage/shared/syncserver.go68
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go5
-rw-r--r--syncapi/storage/sqlite3/output_room_events_topology_table.go40
-rw-r--r--syncapi/storage/storage_test.go13
-rw-r--r--syncapi/storage/tables/interface.go32
7 files changed, 60 insertions, 142 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 63af1136..22716789 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -93,15 +93,12 @@ type Database interface {
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event.
- EventPositionInTopology(ctx context.Context, eventID string) (depth types.StreamPosition, stream types.StreamPosition, err error)
- // EventsAtTopologicalPosition returns all of the events matching a given
- // position in the topology of a given room.
- EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error)
+ EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
// BackwardExtremitiesForRoom returns the event IDs of all of the backward
// extremities we know of for a given room.
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
// MaxTopologicalPosition returns the highest topological position for a given room.
- MaxTopologicalPosition(ctx context.Context, roomID string) (depth types.StreamPosition, stream types.StreamPosition, err error)
+ MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error)
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index 48ebcf25..8a9cc49c 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -72,17 +72,12 @@ const selectMaxPositionInTopologySQL = "" +
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" +
") ORDER BY stream_position DESC LIMIT 1"
-const selectEventIDsFromPositionSQL = "" +
- "SELECT event_id FROM syncapi_output_room_events_topology" +
- " WHERE room_id = $1 AND topological_position = $2"
-
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
- selectEventIDsFromPositionStmt *sql.Stmt
}
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
@@ -106,9 +101,6 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
- if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
- return nil, err
- }
return s, nil
}
@@ -127,7 +119,7 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
// given range in a given room's topological order.
// Returns an empty slice if no events match the given range.
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
- ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition,
limit int, chronologicalOrder bool,
) (eventIDs []string, err error) {
// Decide on the selection's order according to whether chronological order
@@ -140,7 +132,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
}
// Query the event IDs.
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit)
if err == sql.ErrNoRows {
// If no event matched the request, return an empty slice.
return []string{}, nil
@@ -176,28 +168,3 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}
-
-// SelectEventIDsFromPosition returns the IDs of all events that have a given
-// position in the topology of a given room.
-func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition(
- ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition,
-) (eventIDs []string, err error) {
- // Query the event IDs.
- rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos)
- if err == sql.ErrNoRows {
- // If no event matched the request, return an empty slice.
- return []string{}, nil
- } else if err != nil {
- return
- }
- defer common.CloseAndLogIfError(ctx, rows, "selectEventIDsFromPosition: rows.close() failed")
- // Return the IDs.
- var eventID string
- for rows.Next() {
- if err = rows.Scan(&eventID); err != nil {
- return
- }
- eventIDs = append(eventIDs, eventID)
- }
- return eventIDs, rows.Err()
-}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 96e9ff61..e03a6b9f 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -319,25 +319,25 @@ func (d *Database) GetEventsInTopologicalRange(
roomID string, limit int,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
- // Determine the backward and forward limit, i.e. the upper and lower
- // limits to the selection in the room's topology, from the direction.
- var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
+ var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition
if backwardOrdering {
- // Backward ordering is antichronological (latest event to oldest
- // one).
- backwardLimit = to.Depth()
- forwardLimit = from.Depth()
- forwardMicroLimit = from.PDUPosition()
+ // Backward ordering means the 'from' token has a higher depth than the 'to' token
+ minDepth = to.Depth()
+ maxDepth = from.Depth()
+ // for cases where we have say 5 events with the same depth, the TopologyToken needs to
+ // know which of the 5 the client has seen. This is done by using the PDU position.
+ // Events with the same maxDepth but less than this PDU position will be returned.
+ maxStreamPosForMaxDepth = from.PDUPosition()
} else {
- // Forward ordering is chronological (oldest event to latest one).
- backwardLimit = from.Depth()
- forwardLimit = to.Depth()
+ // Forward ordering means the 'from' token has a lower depth than the 'to' token.
+ minDepth = from.Depth()
+ maxDepth = to.Depth()
}
// Select the event IDs from the defined range.
var eIDs []string
eIDs, err = d.Topology.SelectEventIDsInRange(
- ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
+ ctx, nil, roomID, minDepth, maxDepth, maxStreamPosForMaxDepth, limit, !backwardOrdering,
)
if err != nil {
return
@@ -350,7 +350,7 @@ func (d *Database) GetEventsInTopologicalRange(
func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
err = common.WithTransaction(d.DB, func(txn *sql.Tx) error {
- pos, err := d.SyncPositionTx(ctx, txn)
+ pos, err := d.syncPositionTx(ctx, txn)
if err != nil {
return err
}
@@ -368,29 +368,25 @@ func (d *Database) BackwardExtremitiesForRoom(
func (d *Database) MaxTopologicalPosition(
ctx context.Context, roomID string,
-) (depth types.StreamPosition, stream types.StreamPosition, err error) {
- return d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
-}
-
-func (d *Database) EventsAtTopologicalPosition(
- ctx context.Context, roomID string, pos types.StreamPosition,
-) ([]types.StreamEvent, error) {
- eIDs, err := d.Topology.SelectEventIDsFromPosition(ctx, nil, roomID, pos)
+) (types.TopologyToken, error) {
+ depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
if err != nil {
- return nil, err
+ return types.NewTopologyToken(0, 0), err
}
-
- return d.OutputEvents.SelectEvents(ctx, nil, eIDs)
+ return types.NewTopologyToken(depth, streamPos), nil
}
func (d *Database) EventPositionInTopology(
ctx context.Context, eventID string,
-) (depth types.StreamPosition, stream types.StreamPosition, err error) {
- return d.Topology.SelectPositionInTopology(ctx, nil, eventID)
+) (types.TopologyToken, error) {
+ depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID)
+ if err != nil {
+ return types.NewTopologyToken(0, 0), err
+ }
+ return types.NewTopologyToken(depth, stream), nil
}
-// TODO FIXME TEMPORARY PUBLIC
-func (d *Database) SyncPositionTx(
+func (d *Database) syncPositionTx(
ctx context.Context, txn *sql.Tx,
) (sp types.StreamingToken, err error) {
@@ -466,7 +462,7 @@ func (d *Database) addPDUDeltaToResponse(
}
// TODO: This should be done in getStateDeltas
- if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
return nil, err
}
@@ -510,8 +506,7 @@ func (d *Database) addTypingDeltaToResponse(
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
-// TODO FIXME TEMPORARY PUBLIC
-func (d *Database) AddEDUDeltaToResponse(
+func (d *Database) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
@@ -551,7 +546,7 @@ func (d *Database) IncrementalSync(
return nil, err
}
- err = d.AddEDUDeltaToResponse(
+ err = d.addEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
if err != nil {
@@ -590,7 +585,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
}()
// Get the current sync position which we will base the sync response on.
- toPos, err = d.SyncPositionTx(ctx, txn)
+ toPos, err = d.syncPositionTx(ctx, txn)
if err != nil {
return
}
@@ -649,7 +644,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
res.Rooms.Join[roomID] = *jr
}
- if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
return
}
@@ -668,7 +663,7 @@ func (d *Database) CompleteSync(
}
// Use a zero value SyncPosition for fromPos so all EDU states are added.
- err = d.AddEDUDeltaToResponse(
+ err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
)
if err != nil {
@@ -688,8 +683,7 @@ var txReadOnlySnapshot = sql.TxOptions{
ReadOnly: true,
}
-// TODO FIXME temporary public
-func (d *Database) AddInvitesToResponse(
+func (d *Database) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
fromPos, toPos types.StreamPosition,
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index d3e88a54..c1647a14 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -306,9 +306,6 @@ func (s *outputRoomEventsStatements) InsertEvent(
return
}
-// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
-// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
-// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
@@ -341,8 +338,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return events, nil
}
-// selectEarlyEvents returns the earliest events in the given room, starting
-// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go
index 4469f5b7..f25ac623 100644
--- a/syncapi/storage/sqlite3/output_room_events_topology_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go
@@ -65,17 +65,12 @@ const selectMaxPositionInTopologySQL = "" +
"SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 ORDER BY stream_position DESC"
-const selectEventIDsFromPositionSQL = "" +
- "SELECT event_id FROM syncapi_output_room_events_topology" +
- " WHERE room_id = $1 AND topological_position = $2"
-
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
- selectEventIDsFromPositionStmt *sql.Stmt
}
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
@@ -99,9 +94,6 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
- if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
- return nil, err
- }
return s, nil
}
@@ -117,12 +109,9 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
return
}
-// selectEventIDsInRange selects the IDs of events which positions are within a
-// given range in a given room's topological order.
-// Returns an empty slice if no events match the given range.
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
ctx context.Context, txn *sql.Tx, roomID string,
- fromPos, toPos, toMicroPos types.StreamPosition,
+ minDepth, maxDepth, maxStreamPos types.StreamPosition,
limit int, chronologicalOrder bool,
) (eventIDs []string, err error) {
// Decide on the selection's order according to whether chronological order
@@ -135,7 +124,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
}
// Query the event IDs.
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit)
if err == sql.ErrNoRows {
// If no event matched the request, return an empty slice.
return []string{}, nil
@@ -172,28 +161,3 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}
-
-// selectEventIDsFromPosition returns the IDs of all events that have a given
-// position in the topology of a given room.
-func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition(
- ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition,
-) (eventIDs []string, err error) {
- // Query the event IDs.
- stmt := common.TxStmt(txn, s.selectEventIDsFromPositionStmt)
- rows, err := stmt.QueryContext(ctx, roomID, pos)
- if err == sql.ErrNoRows {
- // If no event matched the request, return an empty slice.
- return []string{}, nil
- } else if err != nil {
- return
- }
- // Return the IDs.
- var eventID string
- for rows.Next() {
- if err = rows.Scan(&eventID); err != nil {
- return
- }
- eventIDs = append(eventIDs, eventID)
- }
- return
-}
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index bffcfd05..69ac5e6f 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -292,11 +292,10 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
db := MustCreateDatabase(t)
events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
MustWriteEvents(t, db, events)
- latest, latestStream, err := db.MaxTopologicalPosition(ctx, testRoomID)
+ from, err := db.MaxTopologicalPosition(ctx, testRoomID)
if err != nil {
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
}
- from := types.NewTopologyToken(latest, latestStream)
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
@@ -358,16 +357,14 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Depth: depth + 1,
}))
MustWriteEvents(t, db, events)
- latestPos, latestStreamPos, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID())
+ fromLatest, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID())
if err != nil {
t.Fatalf("failed to get EventPositionInTopology: %s", err)
}
- topoPos, streamPos, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2
+ fromFork, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2
if err != nil {
t.Fatalf("failed to get EventPositionInTopology for event: %s", err)
}
- fromLatest := types.NewTopologyToken(latestPos, latestStreamPos)
- fromFork := types.NewTopologyToken(topoPos, streamPos)
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
@@ -507,12 +504,10 @@ func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatr
}
func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *types.TopologyToken {
- pos, spos, err := db.EventPositionInTopology(ctx, eventID)
+ tok, err := db.EventPositionInTopology(ctx, eventID)
if err != nil {
t.Fatalf("failed to get EventPositionInTopology: %s", err)
}
-
- tok := types.NewTopologyToken(pos, spos)
tok.Decrement()
return &tok
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 8f0b8b89..1e5351b5 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -11,13 +11,15 @@ import (
type AccountData interface {
InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error)
- SelectAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
+ // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of `lowPos` and inclusive of `hiPos`.
+ SelectAccountDataInRange(ctx context.Context, userID string, lowPos, hiPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
DeleteInviteEvent(ctx context.Context, inviteEventID string) error
+ // SelectInviteEventsInRange returns a map of room ID to invite events. The range is exclusive of `startPos` and inclusive of `endPos`.
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
@@ -26,26 +28,30 @@ type Events interface {
SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
+ // SelectRecentEvents returns events between the two stream positions: exclusive of `fromPos` and inclusive of `toPos`.
+ // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
+ // Returns up to `limit` events.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
+ // SelectEarlyEvents returns the earliest events in the given room, exclusive of `fromPos` and inclusive of `toPos`.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
}
+// Topology keeps track of the depths and stream positions for all events.
+// These positions are used as types.TopologyToken when backfilling events locally.
type Topology interface {
- // InsertEventInTopology inserts the given event in the room's topology, based
- // on the event's depth.
+ // InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
+ // `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
- // SelectEventIDsInRange selects the IDs of events which positions are within a
- // given range in a given room's topological order.
+ // SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
+ // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
+ // `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
// Returns an empty slice if no events match the given range.
- SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
- // SelectPositionInTopology returns the position of a given event in the
- // topology of the room it belongs to.
- SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos, spos types.StreamPosition, err error)
- SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
- // SelectEventIDsFromPosition returns the IDs of all events that have a given
- // position in the topology of a given room.
- SelectEventIDsFromPosition(ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition) (eventIDs []string, err error)
+ SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
+ // SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to.
+ SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error)
+ // SelectMaxPositionInTopology returns the event which has the highest depth, and if there are multiple, the event with the highest stream position.
+ SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
}
type CurrentRoomState interface {