diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-10 18:57:10 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-10 18:57:10 +0000 |
commit | 9c03b0a4fa38971dfe83bd135aefb3c482a18380 (patch) | |
tree | ba8e55b75c056adac1fc7c42fb6febb2b8565d36 /syncapi/storage | |
parent | bad81c028f090af0e1005076829db67d1a749a14 (diff) |
Refactor sync tokens (#1628)
* Refactor sync tokens
* Comment out broken notifier test
* Update types, sytest-whitelist
* More robust token checking
* Remove New functions for streaming tokens
* Export Logs in StreamingToken
* Fix tests
Diffstat (limited to 'syncapi/storage')
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 57 | ||||
-rw-r--r-- | syncapi/storage/storage_test.go | 59 |
2 files changed, 64 insertions, 52 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9df04943..c0ae3d7a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -78,8 +78,8 @@ func (d *Database) GetEventsInStreamingRange( backwardOrdering bool, ) (events []types.StreamEvent, err error) { r := types.Range{ - From: from.PDUPosition(), - To: to.PDUPosition(), + From: from.PDUPosition, + To: to.PDUPosition, Backwards: backwardOrdering, } if backwardOrdering { @@ -391,16 +391,16 @@ func (d *Database) GetEventsInTopologicalRange( var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition if backwardOrdering { // Backward ordering means the 'from' token has a higher depth than the 'to' token - minDepth = to.Depth() - maxDepth = from.Depth() + minDepth = to.Depth + maxDepth = from.Depth // for cases where we have say 5 events with the same depth, the TopologyToken needs to // know which of the 5 the client has seen. This is done by using the PDU position. // Events with the same maxDepth but less than this PDU position will be returned. - maxStreamPosForMaxDepth = from.PDUPosition() + maxStreamPosForMaxDepth = from.PDUPosition } else { // Forward ordering means the 'from' token has a lower depth than the 'to' token. - minDepth = from.Depth() - maxDepth = to.Depth() + minDepth = from.Depth + maxDepth = to.Depth } // Select the event IDs from the defined range. @@ -440,9 +440,9 @@ func (d *Database) MaxTopologicalPosition( ) (types.TopologyToken, error) { depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) if err != nil { - return types.NewTopologyToken(0, 0), err + return types.TopologyToken{}, err } - return types.NewTopologyToken(depth, streamPos), nil + return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil } func (d *Database) EventPositionInTopology( @@ -450,9 +450,9 @@ func (d *Database) EventPositionInTopology( ) (types.TopologyToken, error) { depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID) if err != nil { - return types.NewTopologyToken(0, 0), err + return types.TopologyToken{}, err } - return types.NewTopologyToken(depth, stream), nil + return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } func (d *Database) syncPositionTx( @@ -483,7 +483,11 @@ func (d *Database) syncPositionTx( if maxPeekID > maxEventID { maxEventID = maxPeekID } - sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil) + // TODO: complete these positions + sp = types.StreamingToken{ + PDUPosition: types.StreamPosition(maxEventID), + TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), + } return } @@ -555,7 +559,7 @@ func (d *Database) addTypingDeltaToResponse( for _, roomID := range joinedRoomIDs { var jr types.JoinResponse if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(since.EDUPosition()), + roomID, int64(since.TypingPosition), ); updated { ev := gomatrixserverlib.ClientEvent{ Type: gomatrixserverlib.MTyping, @@ -584,7 +588,7 @@ func (d *Database) addReceiptDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) error { - receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition()) + receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition) if err != nil { return fmt.Errorf("unable to select receipts for rooms: %w", err) } @@ -639,7 +643,7 @@ func (d *Database) addEDUDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) error { - if fromPos.EDUPosition() != toPos.EDUPosition() { + if fromPos.TypingPosition != toPos.TypingPosition { // add typing deltas if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { return fmt.Errorf("unable to apply typing delta to response: %w", err) @@ -647,8 +651,8 @@ func (d *Database) addEDUDeltaToResponse( } // Check on initial sync and if EDUPositions differ - if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) || - fromPos.EDUPosition() != toPos.EDUPosition() { + if (fromPos.ReceiptPosition == 0 && toPos.ReceiptPosition == 0) || + fromPos.ReceiptPosition != toPos.ReceiptPosition { if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { return fmt.Errorf("unable to apply receipts to response: %w", err) } @@ -687,10 +691,10 @@ func (d *Database) IncrementalSync( var joinedRoomIDs []string var err error - if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { r := types.Range{ - From: fromPos.PDUPosition(), - To: toPos.PDUPosition(), + From: fromPos.PDUPosition, + To: toPos.PDUPosition, } joinedRoomIDs, err = d.addPDUDeltaToResponse( ctx, device, r, numRecentEventsPerRoom, wantFullState, res, @@ -772,7 +776,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( } r := types.Range{ From: 0, - To: toPos.PDUPosition(), + To: toPos.PDUPosition, } res.NextBatch = toPos.String() @@ -882,7 +886,10 @@ func (d *Database) getJoinResponseForCompleteSync( if err != nil { return } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch := types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } prevBatch.Decrement() prevBatchStr = prevBatch.String() } @@ -915,7 +922,7 @@ func (d *Database) CompleteSync( // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res, + types.StreamingToken{}, toPos, joinedRoomIDs, res, ) if err != nil { return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err) @@ -965,7 +972,7 @@ func (d *Database) getBackwardTopologyPos( ctx context.Context, txn *sql.Tx, events []types.StreamEvent, ) (types.TopologyToken, error) { - zeroToken := types.NewTopologyToken(0, 0) + zeroToken := types.TopologyToken{} if len(events) == 0 { return zeroToken, nil } @@ -973,7 +980,7 @@ func (d *Database) getBackwardTopologyPos( if err != nil { return zeroToken, err } - tok := types.NewTopologyToken(pos, spos) + tok := types.TopologyToken{Depth: pos, PDUPosition: spos} tok.Decrement() return tok, nil } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index b1b0d254..8387543f 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -165,9 +165,9 @@ func TestSyncResponse(t *testing.T) { { Name: "IncrementalSync penultimate", DoSync: func() (*types.Response, error) { - from := types.NewStreamToken( // pretend we are at the penultimate event - positions[len(positions)-2], types.StreamPosition(0), nil, - ) + from := types.StreamingToken{ // pretend we are at the penultimate event + PDUPosition: positions[len(positions)-2], + } res := types.NewResponse() return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) }, @@ -178,9 +178,9 @@ func TestSyncResponse(t *testing.T) { { Name: "IncrementalSync limited", DoSync: func() (*types.Response, error) { - from := types.NewStreamToken( // pretend we are 10 events behind - positions[len(positions)-11], types.StreamPosition(0), nil, - ) + from := types.StreamingToken{ // pretend we are 10 events behind + PDUPosition: positions[len(positions)-11], + } res := types.NewResponse() // limit is set to 5 return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) @@ -222,7 +222,12 @@ func TestSyncResponse(t *testing.T) { if err != nil { st.Fatalf("failed to do sync: %s", err) } - next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition(), nil) + next := types.StreamingToken{ + PDUPosition: latest.PDUPosition, + TypingPosition: latest.TypingPosition, + ReceiptPosition: latest.ReceiptPosition, + SendToDevicePosition: latest.SendToDevicePosition, + } if res.NextBatch != next.String() { st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String()) } @@ -245,9 +250,9 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) { if err != nil { t.Fatalf("failed to get SyncPosition: %s", err) } - from := types.NewStreamToken( - positions[len(positions)-2], types.StreamPosition(0), nil, - ) + from := types.StreamingToken{ + PDUPosition: positions[len(positions)-2], + } res := types.NewResponse() res, err = db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) @@ -271,7 +276,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) { } // backpaginate 5 messages starting at the latest position. // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &prevBatchToken, &to, testRoomID, 5, true) if err != nil { t.Fatalf("GetEventsInRange returned an error: %s", err) @@ -291,7 +296,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } // head towards the beginning of time - to := types.NewStreamToken(0, 0, nil) + to := types.StreamingToken{} // backpaginate 5 messages starting at the latest position. paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true) @@ -313,7 +318,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} // backpaginate 5 messages starting at the latest position. paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, testRoomID, 5, true) @@ -382,7 +387,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { t.Fatalf("failed to get EventPositionInTopology for event: %s", err) } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} testCases := []struct { Name string @@ -458,7 +463,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} // Query using room B as room A was inserted first and hence A will have lower stream positions but identical depths, // allowing this bug to surface. @@ -508,7 +513,7 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) { } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} // starting at `from`, backpaginate to the beginning of time, asserting as we go. chunkSize = 3 @@ -534,14 +539,14 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point there should be no messages. We haven't sent anything // yet. - events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, nil)) + events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{}) if err != nil { t.Fatal(err) } if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { t.Fatal("first call should have no updates") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{}) if err != nil { return } @@ -559,14 +564,14 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point we should get exactly one message. We're sending the sync position // that we were given from the update and the send-to-device update will be updated // in the database to reflect that this was the sync position we sent the message at. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { t.Fatal(err) } if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 { t.Fatal("second call should have one update") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { return } @@ -574,35 +579,35 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point we should still have one message because we haven't progressed the // sync position yet. This is equivalent to the client failing to /sync and retrying // with the same position. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { t.Fatal(err) } if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 { t.Fatal("third call should have one update still") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { return } // At this point we should now have no updates, because we've progressed the sync // position. Therefore the update from before will not be sent again. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1}) if err != nil { t.Fatal(err) } if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 { t.Fatal("fourth call should have no updates") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1}) if err != nil { return } // At this point we should still have no updates, because no new updates have been // sent. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2}) if err != nil { t.Fatal(err) } @@ -639,7 +644,7 @@ func TestInviteBehaviour(t *testing.T) { } // both invite events should appear in a new sync beforeRetireRes := types.NewResponse() - beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false) + beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.StreamingToken{}, latest, 0, false) if err != nil { t.Fatalf("IncrementalSync failed: %s", err) } @@ -654,7 +659,7 @@ func TestInviteBehaviour(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } res := types.NewResponse() - res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false) + res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.StreamingToken{}, latest, 0, false) if err != nil { t.Fatalf("IncrementalSync failed: %s", err) } |