aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/storage_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/storage_test.go')
-rw-r--r--syncapi/storage/storage_test.go59
1 files changed, 32 insertions, 27 deletions
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index b1b0d254..8387543f 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -165,9 +165,9 @@ func TestSyncResponse(t *testing.T) {
{
Name: "IncrementalSync penultimate",
DoSync: func() (*types.Response, error) {
- from := types.NewStreamToken( // pretend we are at the penultimate event
- positions[len(positions)-2], types.StreamPosition(0), nil,
- )
+ from := types.StreamingToken{ // pretend we are at the penultimate event
+ PDUPosition: positions[len(positions)-2],
+ }
res := types.NewResponse()
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
},
@@ -178,9 +178,9 @@ func TestSyncResponse(t *testing.T) {
{
Name: "IncrementalSync limited",
DoSync: func() (*types.Response, error) {
- from := types.NewStreamToken( // pretend we are 10 events behind
- positions[len(positions)-11], types.StreamPosition(0), nil,
- )
+ from := types.StreamingToken{ // pretend we are 10 events behind
+ PDUPosition: positions[len(positions)-11],
+ }
res := types.NewResponse()
// limit is set to 5
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
@@ -222,7 +222,12 @@ func TestSyncResponse(t *testing.T) {
if err != nil {
st.Fatalf("failed to do sync: %s", err)
}
- next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition(), nil)
+ next := types.StreamingToken{
+ PDUPosition: latest.PDUPosition,
+ TypingPosition: latest.TypingPosition,
+ ReceiptPosition: latest.ReceiptPosition,
+ SendToDevicePosition: latest.SendToDevicePosition,
+ }
if res.NextBatch != next.String() {
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
}
@@ -245,9 +250,9 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
if err != nil {
t.Fatalf("failed to get SyncPosition: %s", err)
}
- from := types.NewStreamToken(
- positions[len(positions)-2], types.StreamPosition(0), nil,
- )
+ from := types.StreamingToken{
+ PDUPosition: positions[len(positions)-2],
+ }
res := types.NewResponse()
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
@@ -271,7 +276,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
}
// backpaginate 5 messages starting at the latest position.
// head towards the beginning of time
- to := types.NewTopologyToken(0, 0)
+ to := types.TopologyToken{}
paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &prevBatchToken, &to, testRoomID, 5, true)
if err != nil {
t.Fatalf("GetEventsInRange returned an error: %s", err)
@@ -291,7 +296,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
// head towards the beginning of time
- to := types.NewStreamToken(0, 0, nil)
+ to := types.StreamingToken{}
// backpaginate 5 messages starting at the latest position.
paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true)
@@ -313,7 +318,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
}
// head towards the beginning of time
- to := types.NewTopologyToken(0, 0)
+ to := types.TopologyToken{}
// backpaginate 5 messages starting at the latest position.
paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, testRoomID, 5, true)
@@ -382,7 +387,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
t.Fatalf("failed to get EventPositionInTopology for event: %s", err)
}
// head towards the beginning of time
- to := types.NewTopologyToken(0, 0)
+ to := types.TopologyToken{}
testCases := []struct {
Name string
@@ -458,7 +463,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
}
// head towards the beginning of time
- to := types.NewTopologyToken(0, 0)
+ to := types.TopologyToken{}
// Query using room B as room A was inserted first and hence A will have lower stream positions but identical depths,
// allowing this bug to surface.
@@ -508,7 +513,7 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
}
// head towards the beginning of time
- to := types.NewTopologyToken(0, 0)
+ to := types.TopologyToken{}
// starting at `from`, backpaginate to the beginning of time, asserting as we go.
chunkSize = 3
@@ -534,14 +539,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point there should be no messages. We haven't sent anything
// yet.
- events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, nil))
+ events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{})
if err != nil {
t.Fatal(err)
}
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
t.Fatal("first call should have no updates")
}
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, nil))
+ err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{})
if err != nil {
return
}
@@ -559,14 +564,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should get exactly one message. We're sending the sync position
// that we were given from the update and the send-to-device update will be updated
// in the database to reflect that this was the sync position we sent the message at.
- events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil))
+ events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
t.Fatal(err)
}
if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 {
t.Fatal("second call should have one update")
}
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil))
+ err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
return
}
@@ -574,35 +579,35 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should still have one message because we haven't progressed the
// sync position yet. This is equivalent to the client failing to /sync and retrying
// with the same position.
- events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil))
+ events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
t.Fatal(err)
}
if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 {
t.Fatal("third call should have one update still")
}
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil))
+ err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
return
}
// At this point we should now have no updates, because we've progressed the sync
// position. Therefore the update from before will not be sent again.
- events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1, nil))
+ events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1})
if err != nil {
t.Fatal(err)
}
if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 {
t.Fatal("fourth call should have no updates")
}
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1, nil))
+ err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1})
if err != nil {
return
}
// At this point we should still have no updates, because no new updates have been
// sent.
- events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2, nil))
+ events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2})
if err != nil {
t.Fatal(err)
}
@@ -639,7 +644,7 @@ func TestInviteBehaviour(t *testing.T) {
}
// both invite events should appear in a new sync
beforeRetireRes := types.NewResponse()
- beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false)
+ beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.StreamingToken{}, latest, 0, false)
if err != nil {
t.Fatalf("IncrementalSync failed: %s", err)
}
@@ -654,7 +659,7 @@ func TestInviteBehaviour(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
res := types.NewResponse()
- res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false)
+ res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.StreamingToken{}, latest, 0, false)
if err != nil {
t.Fatalf("IncrementalSync failed: %s", err)
}