aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/storage_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/storage_test.go')
-rw-r--r--syncapi/storage/storage_test.go185
1 files changed, 111 insertions, 74 deletions
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 563c92e3..c7415170 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -1,7 +1,9 @@
package storage_test
import (
+ "bytes"
"context"
+ "encoding/json"
"fmt"
"reflect"
"testing"
@@ -394,90 +396,125 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
from = topologyTokenBefore(t, db, paginatedEvents[len(paginatedEvents)-1].EventID())
}
}
+*/
func TestSendToDeviceBehaviour(t *testing.T) {
- //t.Parallel()
- db := MustCreateDatabase(t)
+ t.Parallel()
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+ deviceID := "one"
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ db, close := MustCreateDatabase(t, dbType)
+ defer close()
+ // At this point there should be no messages. We haven't sent anything
+ // yet.
+ _, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(events) != 0 {
+ t.Fatal("first call should have no updates")
+ }
- // At this point there should be no messages. We haven't sent anything
- // yet.
- _, events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{})
- if err != nil {
- t.Fatal(err)
- }
- if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
- t.Fatal("first call should have no updates")
- }
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{})
- if err != nil {
- return
- }
+ err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, 100)
+ if err != nil {
+ return
+ }
- // Try sending a message.
- streamPos, err := db.StoreNewSendForDeviceMessage(ctx, "alice", "one", gomatrixserverlib.SendToDeviceEvent{
- Sender: "bob",
- Type: "m.type",
- Content: json.RawMessage("{}"),
- })
- if err != nil {
- t.Fatal(err)
- }
+ // Try sending a message.
+ streamPos, err := db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{
+ Sender: bob.ID,
+ Type: "m.type",
+ Content: json.RawMessage("{}"),
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
- // At this point we should get exactly one message. We're sending the sync position
- // that we were given from the update and the send-to-device update will be updated
- // in the database to reflect that this was the sync position we sent the message at.
- _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos})
- if err != nil {
- t.Fatal(err)
- }
- if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 {
- t.Fatal("second call should have one update")
- }
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos})
- if err != nil {
- return
- }
+ // At this point we should get exactly one message. We're sending the sync position
+ // that we were given from the update and the send-to-device update will be updated
+ // in the database to reflect that this was the sync position we sent the message at.
+ streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, streamPos)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if count := len(events); count != 1 {
+ t.Fatalf("second call should have one update, got %d", count)
+ }
+ err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos)
+ if err != nil {
+ return
+ }
- // At this point we should still have one message because we haven't progressed the
- // sync position yet. This is equivalent to the client failing to /sync and retrying
- // with the same position.
- _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos})
- if err != nil {
- t.Fatal(err)
- }
- if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 {
- t.Fatal("third call should have one update still")
- }
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos})
- if err != nil {
- return
- }
+ // At this point we should still have one message because we haven't progressed the
+ // sync position yet. This is equivalent to the client failing to /sync and retrying
+ // with the same position.
+ streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(events) != 1 {
+ t.Fatal("third call should have one update still")
+ }
+ err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1)
+ if err != nil {
+ return
+ }
- // At this point we should now have no updates, because we've progressed the sync
- // position. Therefore the update from before will not be sent again.
- _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1})
- if err != nil {
- t.Fatal(err)
- }
- if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 {
- t.Fatal("fourth call should have no updates")
- }
- err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1})
- if err != nil {
- return
- }
+ // At this point we should now have no updates, because we've progressed the sync
+ // position. Therefore the update from before will not be sent again.
+ _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos+1, streamPos+2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(events) != 0 {
+ t.Fatal("fourth call should have no updates")
+ }
+ err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1)
+ if err != nil {
+ return
+ }
- // At this point we should still have no updates, because no new updates have been
- // sent.
- _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2})
- if err != nil {
- t.Fatal(err)
- }
- if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
- t.Fatal("fifth call should have no updates")
- }
+ // At this point we should still have no updates, because no new updates have been
+ // sent.
+ _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos, streamPos+2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(events) != 0 {
+ t.Fatal("fifth call should have no updates")
+ }
+
+ // Send some more messages and verify the ordering is correct ("in order of arrival")
+ var lastPos types.StreamPosition = 0
+ for i := 0; i < 10; i++ {
+ streamPos, err = db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{
+ Sender: bob.ID,
+ Type: "m.type",
+ Content: json.RawMessage(fmt.Sprintf(`{ "count": %d }`, i)),
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ lastPos = streamPos
+ }
+
+ _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, lastPos)
+ if err != nil {
+ t.Fatalf("unable to get events: %v", err)
+ }
+
+ for i := 0; i < 10; i++ {
+ want := json.RawMessage(fmt.Sprintf(`{"count":%d}`, i))
+ got := events[i].Content
+ if !bytes.Equal(got, want) {
+ t.Fatalf("messages are out of order\nwant: %s\ngot: %s", string(want), string(got))
+ }
+ }
+ })
}
+/*
func TestInviteBehaviour(t *testing.T) {
db := MustCreateDatabase(t)
inviteRoom1 := "!inviteRoom1:somewhere"