aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-13 12:14:50 +0100
committerGitHub <noreply@github.com>2020-05-13 12:14:50 +0100
commit5e9dce1c0c66736937eeddd5c33c92700d9a65a7 (patch)
treee0c40667cb17714d160fba2fbc64470eedf9ca7b /syncapi/storage/postgres
parent31e6a7f1932c11d9b5b682ad06a5b8db9d74a44f (diff)
syncapi: Rename and split out tokens (#1025)
* syncapi: Rename and split out tokens Previously we used the badly named `PaginationToken` which was used for both `/sync` and `/messages` requests. This quickly became confusing because named fields like `PDUPosition` meant different things depending on the token type. Instead, we now have two token types: `TopologyToken` and `StreamingToken`, both of which have fields which make more sense for their specific situations. Updated the codebase to use one or the other. `PaginationToken` still lives on as `syncToken`, an unexported type which both tokens rely on. This allows us to guarantee that the specific mappings of positions to a string remain solely under the control of the `types` package. This enables us to move high-level conceptual things like "decrement this topological token" to function calls e.g `TopologicalToken.Decrement()`. Currently broken because `/messages` seemingly used both stream and topological tokens, though I need to confirm this. * final tweaks/hacks * spurious logging * Review comments and linting
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/syncserver.go127
1 files changed, 61 insertions, 66 deletions
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 1845ac38..d45bc09e 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -228,69 +228,68 @@ func (d *SyncServerDatasource) GetStateEventsForRoom(
return
}
-func (d *SyncServerDatasource) GetEventsInRange(
+func (d *SyncServerDatasource) GetEventsInTopologicalRange(
ctx context.Context,
- from, to *types.PaginationToken,
+ from, to *types.TopologyToken,
roomID string, limit int,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
- // If the pagination token's type is types.PaginationTokenTypeTopology, the
- // events must be retrieved from the rooms' topology table rather than the
- // table contaning the syncapi server's whole stream of events.
- if from.Type == types.PaginationTokenTypeTopology {
- // Determine the backward and forward limit, i.e. the upper and lower
- // limits to the selection in the room's topology, from the direction.
- var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
- if backwardOrdering {
- // Backward ordering is antichronological (latest event to oldest
- // one).
- backwardLimit = to.PDUPosition
- forwardLimit = from.PDUPosition
- forwardMicroLimit = from.EDUTypingPosition
- } else {
- // Forward ordering is chronological (oldest event to latest one).
- backwardLimit = from.PDUPosition
- forwardLimit = to.PDUPosition
- }
-
- // Select the event IDs from the defined range.
- var eIDs []string
- eIDs, err = d.topology.selectEventIDsInRange(
- ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
- )
- if err != nil {
- return
- }
+ // Determine the backward and forward limit, i.e. the upper and lower
+ // limits to the selection in the room's topology, from the direction.
+ var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
+ if backwardOrdering {
+ // Backward ordering is antichronological (latest event to oldest
+ // one).
+ backwardLimit = to.Depth()
+ forwardLimit = from.Depth()
+ forwardMicroLimit = from.PDUPosition()
+ } else {
+ // Forward ordering is chronological (oldest event to latest one).
+ backwardLimit = from.Depth()
+ forwardLimit = to.Depth()
+ }
- // Retrieve the events' contents using their IDs.
- events, err = d.events.selectEvents(ctx, nil, eIDs)
+ // Select the event IDs from the defined range.
+ var eIDs []string
+ eIDs, err = d.topology.selectEventIDsInRange(
+ ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
+ )
+ if err != nil {
return
}
- // If the pagination token's type is types.PaginationTokenTypeStream, the
- // events must be retrieved from the table contaning the syncapi server's
- // whole stream of events.
+ // Retrieve the events' contents using their IDs.
+ events, err = d.events.selectEvents(ctx, nil, eIDs)
+ return
+}
+// GetEventsInStreamingRange retrieves all of the events on a given ordering using the
+// given extremities and limit.
+func (d *SyncServerDatasource) GetEventsInStreamingRange(
+ ctx context.Context,
+ from, to *types.StreamingToken,
+ roomID string, limit int,
+ backwardOrdering bool,
+) (events []types.StreamEvent, err error) {
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
if events, err = d.events.selectRecentEvents(
- ctx, nil, roomID, to.PDUPosition, from.PDUPosition, limit, false, false,
+ ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false,
); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.events.selectEarlyEvents(
- ctx, nil, roomID, from.PDUPosition, to.PDUPosition, limit,
+ ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit,
); err != nil {
return
}
}
-
- return
+ return events, err
}
-func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.PaginationToken, error) {
+func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) {
return d.syncPositionTx(ctx, nil)
}
@@ -353,7 +352,7 @@ func (d *SyncServerDatasource) syncStreamPositionTx(
func (d *SyncServerDatasource) syncPositionTx(
ctx context.Context, txn *sql.Tx,
-) (sp types.PaginationToken, err error) {
+) (sp types.StreamingToken, err error) {
maxEventID, err := d.events.selectMaxEventID(ctx, txn)
if err != nil {
@@ -373,8 +372,7 @@ func (d *SyncServerDatasource) syncPositionTx(
if maxInviteID > maxEventID {
maxEventID = maxInviteID
}
- sp.PDUPosition = types.StreamPosition(maxEventID)
- sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
+ sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.eduCache.GetLatestSyncPosition()))
return
}
@@ -439,7 +437,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
// addTypingDeltaToResponse adds all typing notifications to a sync response
// since the specified position.
func (d *SyncServerDatasource) addTypingDeltaToResponse(
- since types.PaginationToken,
+ since types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) error {
@@ -448,7 +446,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
var err error
for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter(
- roomID, int64(since.EDUTypingPosition),
+ roomID, int64(since.EDUPosition()),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
@@ -473,12 +471,12 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *SyncServerDatasource) addEDUDeltaToResponse(
- fromPos, toPos types.PaginationToken,
+ fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) (err error) {
- if fromPos.EDUTypingPosition != toPos.EDUTypingPosition {
+ if fromPos.EDUPosition() != toPos.EDUPosition() {
err = d.addTypingDeltaToResponse(
fromPos, joinedRoomIDs, res,
)
@@ -490,7 +488,7 @@ func (d *SyncServerDatasource) addEDUDeltaToResponse(
func (d *SyncServerDatasource) IncrementalSync(
ctx context.Context,
device authtypes.Device,
- fromPos, toPos types.PaginationToken,
+ fromPos, toPos types.StreamingToken,
numRecentEventsPerRoom int,
wantFullState bool,
) (*types.Response, error) {
@@ -499,9 +497,9 @@ func (d *SyncServerDatasource) IncrementalSync(
var joinedRoomIDs []string
var err error
- if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
+ if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
joinedRoomIDs, err = d.addPDUDeltaToResponse(
- ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res,
+ ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
)
} else {
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
@@ -530,7 +528,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
numRecentEventsPerRoom int,
) (
res *types.Response,
- toPos types.PaginationToken,
+ toPos types.StreamingToken,
joinedRoomIDs []string,
err error,
) {
@@ -577,7 +575,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
- ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition,
+ ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
numRecentEventsPerRoom, true, true,
)
if err != nil {
@@ -588,27 +586,25 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// oldest event in the room's topology.
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID())
- if backwardTopologyPos-1 <= 0 {
- backwardTopologyPos = types.StreamPosition(1)
- } else {
- backwardTopologyPos--
+ if err != nil {
+ return
}
+ prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
+ prevBatch.Decrement()
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs
recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
- types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos,
- ).String()
+ jr.Timeline.PrevBatch = prevBatch.String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
- if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
return
}
@@ -628,7 +624,7 @@ func (d *SyncServerDatasource) CompleteSync(
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
- types.PaginationToken{}, toPos, joinedRoomIDs, res,
+ types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, err
@@ -757,14 +753,15 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents)
+ prevBatch := types.NewTopologyToken(
+ backwardTopologyPos, backwardStreamPos,
+ )
switch delta.membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
- types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos,
- ).String()
+ jr.Timeline.PrevBatch = prevBatch.String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@@ -775,9 +772,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
- lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
- types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos,
- ).String()
+ lr.Timeline.PrevBatch = prevBatch.String()
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)