aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/shared/syncserver.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-10 18:57:10 +0000
committerGitHub <noreply@github.com>2020-12-10 18:57:10 +0000
commit9c03b0a4fa38971dfe83bd135aefb3c482a18380 (patch)
treeba8e55b75c056adac1fc7c42fb6febb2b8565d36 /syncapi/storage/shared/syncserver.go
parentbad81c028f090af0e1005076829db67d1a749a14 (diff)
Refactor sync tokens (#1628)
* Refactor sync tokens * Comment out broken notifier test * Update types, sytest-whitelist * More robust token checking * Remove New functions for streaming tokens * Export Logs in StreamingToken * Fix tests
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r--syncapi/storage/shared/syncserver.go57
1 files changed, 32 insertions, 25 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 9df04943..c0ae3d7a 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -78,8 +78,8 @@ func (d *Database) GetEventsInStreamingRange(
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
r := types.Range{
- From: from.PDUPosition(),
- To: to.PDUPosition(),
+ From: from.PDUPosition,
+ To: to.PDUPosition,
Backwards: backwardOrdering,
}
if backwardOrdering {
@@ -391,16 +391,16 @@ func (d *Database) GetEventsInTopologicalRange(
var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition
if backwardOrdering {
// Backward ordering means the 'from' token has a higher depth than the 'to' token
- minDepth = to.Depth()
- maxDepth = from.Depth()
+ minDepth = to.Depth
+ maxDepth = from.Depth
// for cases where we have say 5 events with the same depth, the TopologyToken needs to
// know which of the 5 the client has seen. This is done by using the PDU position.
// Events with the same maxDepth but less than this PDU position will be returned.
- maxStreamPosForMaxDepth = from.PDUPosition()
+ maxStreamPosForMaxDepth = from.PDUPosition
} else {
// Forward ordering means the 'from' token has a lower depth than the 'to' token.
- minDepth = from.Depth()
- maxDepth = to.Depth()
+ minDepth = from.Depth
+ maxDepth = to.Depth
}
// Select the event IDs from the defined range.
@@ -440,9 +440,9 @@ func (d *Database) MaxTopologicalPosition(
) (types.TopologyToken, error) {
depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
if err != nil {
- return types.NewTopologyToken(0, 0), err
+ return types.TopologyToken{}, err
}
- return types.NewTopologyToken(depth, streamPos), nil
+ return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil
}
func (d *Database) EventPositionInTopology(
@@ -450,9 +450,9 @@ func (d *Database) EventPositionInTopology(
) (types.TopologyToken, error) {
depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID)
if err != nil {
- return types.NewTopologyToken(0, 0), err
+ return types.TopologyToken{}, err
}
- return types.NewTopologyToken(depth, stream), nil
+ return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) syncPositionTx(
@@ -483,7 +483,11 @@ func (d *Database) syncPositionTx(
if maxPeekID > maxEventID {
maxEventID = maxPeekID
}
- sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil)
+ // TODO: complete these positions
+ sp = types.StreamingToken{
+ PDUPosition: types.StreamPosition(maxEventID),
+ TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
+ }
return
}
@@ -555,7 +559,7 @@ func (d *Database) addTypingDeltaToResponse(
for _, roomID := range joinedRoomIDs {
var jr types.JoinResponse
if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
- roomID, int64(since.EDUPosition()),
+ roomID, int64(since.TypingPosition),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
@@ -584,7 +588,7 @@ func (d *Database) addReceiptDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
- receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition())
+ receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
if err != nil {
return fmt.Errorf("unable to select receipts for rooms: %w", err)
}
@@ -639,7 +643,7 @@ func (d *Database) addEDUDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
- if fromPos.EDUPosition() != toPos.EDUPosition() {
+ if fromPos.TypingPosition != toPos.TypingPosition {
// add typing deltas
if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return fmt.Errorf("unable to apply typing delta to response: %w", err)
@@ -647,8 +651,8 @@ func (d *Database) addEDUDeltaToResponse(
}
// Check on initial sync and if EDUPositions differ
- if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) ||
- fromPos.EDUPosition() != toPos.EDUPosition() {
+ if (fromPos.ReceiptPosition == 0 && toPos.ReceiptPosition == 0) ||
+ fromPos.ReceiptPosition != toPos.ReceiptPosition {
if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return fmt.Errorf("unable to apply receipts to response: %w", err)
}
@@ -687,10 +691,10 @@ func (d *Database) IncrementalSync(
var joinedRoomIDs []string
var err error
- if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
+ if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
r := types.Range{
- From: fromPos.PDUPosition(),
- To: toPos.PDUPosition(),
+ From: fromPos.PDUPosition,
+ To: toPos.PDUPosition,
}
joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
@@ -772,7 +776,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
}
r := types.Range{
From: 0,
- To: toPos.PDUPosition(),
+ To: toPos.PDUPosition,
}
res.NextBatch = toPos.String()
@@ -882,7 +886,10 @@ func (d *Database) getJoinResponseForCompleteSync(
if err != nil {
return
}
- prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
+ prevBatch := types.TopologyToken{
+ Depth: backwardTopologyPos,
+ PDUPosition: backwardStreamPos,
+ }
prevBatch.Decrement()
prevBatchStr = prevBatch.String()
}
@@ -915,7 +922,7 @@ func (d *Database) CompleteSync(
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
- types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
+ types.StreamingToken{}, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
@@ -965,7 +972,7 @@ func (d *Database) getBackwardTopologyPos(
ctx context.Context, txn *sql.Tx,
events []types.StreamEvent,
) (types.TopologyToken, error) {
- zeroToken := types.NewTopologyToken(0, 0)
+ zeroToken := types.TopologyToken{}
if len(events) == 0 {
return zeroToken, nil
}
@@ -973,7 +980,7 @@ func (d *Database) getBackwardTopologyPos(
if err != nil {
return zeroToken, err
}
- tok := types.NewTopologyToken(pos, spos)
+ tok := types.TopologyToken{Depth: pos, PDUPosition: spos}
tok.Decrement()
return tok, nil
}