aboutsummaryrefslogtreecommitdiff
path: root/syncapi/syncapi_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/syncapi_test.go')
-rw-r--r--syncapi/syncapi_test.go136
1 files changed, 136 insertions, 0 deletions
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 3ce7c64b..b10864ff 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -3,11 +3,14 @@ package syncapi
import (
"context"
"encoding/json"
+ "fmt"
"net/http"
"net/http/httptest"
+ "reflect"
"testing"
"time"
+ "github.com/matrix-org/dendrite/clientapi/producers"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
@@ -311,6 +314,139 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
}
+func TestSendToDevice(t *testing.T) {
+ test.WithAllDatabases(t, testSendToDevice)
+}
+
+func testSendToDevice(t *testing.T, dbType test.DBType) {
+ user := test.NewUser(t)
+ alice := userapi.Device{
+ ID: "ALICEID",
+ UserID: user.ID,
+ AccessToken: "ALICE_BEARER_TOKEN",
+ DisplayName: "Alice",
+ AccountType: userapi.AccountTypeUser,
+ }
+
+ base, close := testrig.CreateBaseDendrite(t, dbType)
+ defer close()
+
+ jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &syncKeyAPI{})
+
+ producer := producers.SyncAPIProducer{
+ TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ JetStream: jsctx,
+ }
+
+ msgCounter := 0
+
+ testCases := []struct {
+ name string
+ since string
+ want []string
+ sendMessagesCount int
+ }{
+ {
+ name: "initial sync, no messages",
+ want: []string{},
+ },
+ {
+ name: "initial sync, one new message",
+ sendMessagesCount: 1,
+ want: []string{
+ "message 1",
+ },
+ },
+ {
+ name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages
+ sendMessagesCount: 1,
+ want: []string{
+ "message 1",
+ "message 2",
+ },
+ },
+ {
+ name: "incremental sync, one message", // this deletes message 1, as we advanced the since token
+ since: types.StreamingToken{SendToDevicePosition: 1}.String(),
+ want: []string{
+ "message 2",
+ },
+ },
+ {
+ name: "failed incremental sync, one message", // didn't advance since, so still the same message
+ since: types.StreamingToken{SendToDevicePosition: 1}.String(),
+ want: []string{
+ "message 2",
+ },
+ },
+ {
+ name: "incremental sync, no message", // this should delete message 2
+ since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync
+ want: []string{},
+ },
+ {
+ name: "incremental sync, three new messages",
+ since: types.StreamingToken{SendToDevicePosition: 2}.String(),
+ sendMessagesCount: 3,
+ want: []string{
+ "message 3", // message 2 was deleted in the previous test
+ "message 4",
+ "message 5",
+ },
+ },
+ {
+ name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2"
+ want: []string{
+ "message 3",
+ "message 4",
+ "message 5",
+ },
+ },
+ {
+ name: "incremental sync, no messages", // advance the sync token, no new messages
+ since: types.StreamingToken{SendToDevicePosition: 5}.String(),
+ want: []string{},
+ },
+ }
+
+ ctx := context.Background()
+ for _, tc := range testCases {
+ // Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
+ for i := 0; i < tc.sendMessagesCount; i++ {
+ msgCounter++
+ msg := map[string]string{
+ "dummy": fmt.Sprintf("message %d", msgCounter),
+ }
+ if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
+ t.Fatalf("unable to send to device message: %v", err)
+ }
+ }
+ time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed
+ // Execute a /sync request, recording the response
+ w := httptest.NewRecorder()
+ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ "access_token": alice.AccessToken,
+ "since": tc.since,
+ })))
+
+ // Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
+ events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array()
+ got := make([]string, len(events))
+ for i := range events {
+ got[i] = events[i].String()
+ }
+
+ // Ensure the messages we received are as we expect them to be
+ if !reflect.DeepEqual(got, tc.want) {
+ t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String())
+ t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want)
+ }
+ }
+}
+
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {