diff options
Diffstat (limited to 'syncapi/syncapi_test.go')
-rw-r--r-- | syncapi/syncapi_test.go | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 3ce7c64b..b10864ff 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -3,11 +3,14 @@ package syncapi import ( "context" "encoding/json" + "fmt" "net/http" "net/http/httptest" + "reflect" "testing" "time" + "github.com/matrix-org/dendrite/clientapi/producers" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" @@ -311,6 +314,139 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { } +func TestSendToDevice(t *testing.T) { + test.WithAllDatabases(t, testSendToDevice) +} + +func testSendToDevice(t *testing.T, dbType test.DBType) { + user := test.NewUser(t) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + base, close := testrig.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &syncKeyAPI{}) + + producer := producers.SyncAPIProducer{ + TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + JetStream: jsctx, + } + + msgCounter := 0 + + testCases := []struct { + name string + since string + want []string + sendMessagesCount int + }{ + { + name: "initial sync, no messages", + want: []string{}, + }, + { + name: "initial sync, one new message", + sendMessagesCount: 1, + want: []string{ + "message 1", + }, + }, + { + name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages + sendMessagesCount: 1, + want: []string{ + "message 1", + "message 2", + }, + }, + { + name: "incremental sync, one message", // this deletes message 1, as we advanced the since token + since: types.StreamingToken{SendToDevicePosition: 1}.String(), + want: []string{ + "message 2", + }, + }, + { + name: "failed incremental sync, one message", // didn't advance since, so still the same message + since: types.StreamingToken{SendToDevicePosition: 1}.String(), + want: []string{ + "message 2", + }, + }, + { + name: "incremental sync, no message", // this should delete message 2 + since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync + want: []string{}, + }, + { + name: "incremental sync, three new messages", + since: types.StreamingToken{SendToDevicePosition: 2}.String(), + sendMessagesCount: 3, + want: []string{ + "message 3", // message 2 was deleted in the previous test + "message 4", + "message 5", + }, + }, + { + name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2" + want: []string{ + "message 3", + "message 4", + "message 5", + }, + }, + { + name: "incremental sync, no messages", // advance the sync token, no new messages + since: types.StreamingToken{SendToDevicePosition: 5}.String(), + want: []string{}, + }, + } + + ctx := context.Background() + for _, tc := range testCases { + // Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}` + for i := 0; i < tc.sendMessagesCount; i++ { + msgCounter++ + msg := map[string]string{ + "dummy": fmt.Sprintf("message %d", msgCounter), + } + if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil { + t.Fatalf("unable to send to device message: %v", err) + } + } + time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed + // Execute a /sync request, recording the response + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "since": tc.since, + }))) + + // Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries + events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array() + got := make([]string, len(events)) + for i := range events { + got[i] = events[i].String() + } + + // Ensure the messages we received are as we expect them to be + if !reflect.DeepEqual(got, tc.want) { + t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String()) + t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want) + } + } +} + func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg { result := make([]*nats.Msg, len(input)) for i, ev := range input { |