diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-10 18:57:10 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-10 18:57:10 +0000 |
commit | 9c03b0a4fa38971dfe83bd135aefb3c482a18380 (patch) | |
tree | ba8e55b75c056adac1fc7c42fb6febb2b8565d36 /syncapi/storage/shared/syncserver.go | |
parent | bad81c028f090af0e1005076829db67d1a749a14 (diff) |
Refactor sync tokens (#1628)
* Refactor sync tokens
* Comment out broken notifier test
* Update types, sytest-whitelist
* More robust token checking
* Remove New functions for streaming tokens
* Export Logs in StreamingToken
* Fix tests
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9df04943..c0ae3d7a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -78,8 +78,8 @@ func (d *Database) GetEventsInStreamingRange( backwardOrdering bool, ) (events []types.StreamEvent, err error) { r := types.Range{ - From: from.PDUPosition(), - To: to.PDUPosition(), + From: from.PDUPosition, + To: to.PDUPosition, Backwards: backwardOrdering, } if backwardOrdering { @@ -391,16 +391,16 @@ func (d *Database) GetEventsInTopologicalRange( var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition if backwardOrdering { // Backward ordering means the 'from' token has a higher depth than the 'to' token - minDepth = to.Depth() - maxDepth = from.Depth() + minDepth = to.Depth + maxDepth = from.Depth // for cases where we have say 5 events with the same depth, the TopologyToken needs to // know which of the 5 the client has seen. This is done by using the PDU position. // Events with the same maxDepth but less than this PDU position will be returned. - maxStreamPosForMaxDepth = from.PDUPosition() + maxStreamPosForMaxDepth = from.PDUPosition } else { // Forward ordering means the 'from' token has a lower depth than the 'to' token. - minDepth = from.Depth() - maxDepth = to.Depth() + minDepth = from.Depth + maxDepth = to.Depth } // Select the event IDs from the defined range. @@ -440,9 +440,9 @@ func (d *Database) MaxTopologicalPosition( ) (types.TopologyToken, error) { depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) if err != nil { - return types.NewTopologyToken(0, 0), err + return types.TopologyToken{}, err } - return types.NewTopologyToken(depth, streamPos), nil + return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil } func (d *Database) EventPositionInTopology( @@ -450,9 +450,9 @@ func (d *Database) EventPositionInTopology( ) (types.TopologyToken, error) { depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID) if err != nil { - return types.NewTopologyToken(0, 0), err + return types.TopologyToken{}, err } - return types.NewTopologyToken(depth, stream), nil + return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } func (d *Database) syncPositionTx( @@ -483,7 +483,11 @@ func (d *Database) syncPositionTx( if maxPeekID > maxEventID { maxEventID = maxPeekID } - sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil) + // TODO: complete these positions + sp = types.StreamingToken{ + PDUPosition: types.StreamPosition(maxEventID), + TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), + } return } @@ -555,7 +559,7 @@ func (d *Database) addTypingDeltaToResponse( for _, roomID := range joinedRoomIDs { var jr types.JoinResponse if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(since.EDUPosition()), + roomID, int64(since.TypingPosition), ); updated { ev := gomatrixserverlib.ClientEvent{ Type: gomatrixserverlib.MTyping, @@ -584,7 +588,7 @@ func (d *Database) addReceiptDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) error { - receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition()) + receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition) if err != nil { return fmt.Errorf("unable to select receipts for rooms: %w", err) } @@ -639,7 +643,7 @@ func (d *Database) addEDUDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) error { - if fromPos.EDUPosition() != toPos.EDUPosition() { + if fromPos.TypingPosition != toPos.TypingPosition { // add typing deltas if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { return fmt.Errorf("unable to apply typing delta to response: %w", err) @@ -647,8 +651,8 @@ func (d *Database) addEDUDeltaToResponse( } // Check on initial sync and if EDUPositions differ - if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) || - fromPos.EDUPosition() != toPos.EDUPosition() { + if (fromPos.ReceiptPosition == 0 && toPos.ReceiptPosition == 0) || + fromPos.ReceiptPosition != toPos.ReceiptPosition { if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { return fmt.Errorf("unable to apply receipts to response: %w", err) } @@ -687,10 +691,10 @@ func (d *Database) IncrementalSync( var joinedRoomIDs []string var err error - if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { r := types.Range{ - From: fromPos.PDUPosition(), - To: toPos.PDUPosition(), + From: fromPos.PDUPosition, + To: toPos.PDUPosition, } joinedRoomIDs, err = d.addPDUDeltaToResponse( ctx, device, r, numRecentEventsPerRoom, wantFullState, res, @@ -772,7 +776,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( } r := types.Range{ From: 0, - To: toPos.PDUPosition(), + To: toPos.PDUPosition, } res.NextBatch = toPos.String() @@ -882,7 +886,10 @@ func (d *Database) getJoinResponseForCompleteSync( if err != nil { return } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch := types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } prevBatch.Decrement() prevBatchStr = prevBatch.String() } @@ -915,7 +922,7 @@ func (d *Database) CompleteSync( // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res, + types.StreamingToken{}, toPos, joinedRoomIDs, res, ) if err != nil { return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err) @@ -965,7 +972,7 @@ func (d *Database) getBackwardTopologyPos( ctx context.Context, txn *sql.Tx, events []types.StreamEvent, ) (types.TopologyToken, error) { - zeroToken := types.NewTopologyToken(0, 0) + zeroToken := types.TopologyToken{} if len(events) == 0 { return zeroToken, nil } @@ -973,7 +980,7 @@ func (d *Database) getBackwardTopologyPos( if err != nil { return zeroToken, err } - tok := types.NewTopologyToken(pos, spos) + tok := types.TopologyToken{Depth: pos, PDUPosition: spos} tok.Decrement() return tok, nil } |