aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-08-19 11:03:55 +0200
committerGitHub <noreply@github.com>2022-08-19 11:03:55 +0200
commit5cacca92d2b888d022f9fa346b8068ce13087b00 (patch)
treed04ec1f1c91751235ec94fa27c631c78c2b00519 /syncapi
parenta379d3e8141cfceb0ca49435ffb7a596d57619bb (diff)
Make SyncAPI unit tests more reliable (#2655)
This should hopefully make some SyncAPI tests more reliable
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/syncapi_test.go86
1 files changed, 76 insertions, 10 deletions
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 089bdafa..8b33c5e4 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -154,8 +154,12 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
wantJoinedRooms: []string{room.ID},
},
}
- // TODO: find a better way
- time.Sleep(500 * time.Millisecond)
+
+ syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
+ // wait for the last sent eventID to come down sync
+ path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
+ return gjson.Get(syncBody, path).Exists()
+ })
for _, tc := range testCases {
w := httptest.NewRecorder()
@@ -343,6 +347,13 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// create the users
alice := test.NewUser(t)
+ aliceDev := userapi.Device{
+ ID: "ALICEID",
+ UserID: alice.ID,
+ AccessToken: "ALICE_BEARER_TOKEN",
+ DisplayName: "ALICE",
+ }
+
bob := test.NewUser(t)
bobDev := userapi.Device{
@@ -409,7 +420,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{})
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{})
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@@ -418,12 +429,18 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
// send the events/messages to NATS to create the rooms
- beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)})
+ beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
+ beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
eventsToSend := append(room.Events(), beforeJoinEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
- time.Sleep(100 * time.Millisecond) // TODO: find a better way
+ syncUntil(t, base, aliceDev.AccessToken, false,
+ func(syncBody string) bool {
+ path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
+ return gjson.Get(syncBody, path).Exists()
+ },
+ )
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
@@ -449,14 +466,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
- msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)})
+ afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
+ msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
- time.Sleep(100 * time.Millisecond) // TODO: find a better way
+ syncUntil(t, base, aliceDev.AccessToken, false,
+ func(syncBody string) bool {
+ path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
+ return gjson.Get(syncBody, path).Exists()
+ },
+ )
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
@@ -511,8 +534,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
- defer close()
+ base, baseClose := testrig.CreateBaseDendrite(t, dbType)
+ defer baseClose()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
@@ -607,7 +630,14 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
t.Fatalf("unable to send to device message: %v", err)
}
}
- time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed
+
+ syncUntil(t, base, alice.AccessToken,
+ len(tc.want) == 0,
+ func(body string) bool {
+ return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
+ },
+ )
+
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
@@ -630,6 +660,42 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
+func syncUntil(t *testing.T,
+ base *base.BaseDendrite, accessToken string,
+ skip bool,
+ checkFunc func(syncBody string) bool,
+) {
+ if checkFunc == nil {
+ t.Fatalf("No checkFunc defined")
+ }
+ if skip {
+ return
+ }
+ // loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
+ // to the syncAPI when hitting /sync
+ done := make(chan bool)
+ defer close(done)
+ go func() {
+ for {
+ w := httptest.NewRecorder()
+ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ "access_token": accessToken,
+ "timeout": "1000",
+ })))
+ if checkFunc(w.Body.String()) {
+ done <- true
+ return
+ }
+ }
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(time.Second * 5):
+ t.Fatalf("Timed out waiting for messages")
+ }
+}
+
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {