diff options
Diffstat (limited to 'syncapi/storage/storage_test.go')
-rw-r--r-- | syncapi/storage/storage_test.go | 234 |
1 files changed, 87 insertions, 147 deletions
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 403b50ea..4e1634ec 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -3,6 +3,7 @@ package storage_test import ( "context" "fmt" + "reflect" "testing" "github.com/matrix-org/dendrite/setup/config" @@ -38,7 +39,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver if err != nil { t.Fatalf("WriteEvent failed: %s", err) } - fmt.Println("Event ID", ev.EventID(), " spos=", pos, "depth=", ev.Depth()) + t.Logf("Event ID %s spos=%v depth=%v", ev.EventID(), pos, ev.Depth()) positions = append(positions, pos) } return @@ -46,7 +47,6 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver func TestWriteEvents(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - t.Parallel() alice := test.NewUser() r := test.NewRoom(t, alice) db, close := MustCreateDatabase(t, dbType) @@ -61,84 +61,84 @@ func TestRecentEventsPDU(t *testing.T) { db, close := MustCreateDatabase(t, dbType) defer close() alice := test.NewUser() - var filter gomatrixserverlib.RoomEventFilter - filter.Limit = 100 + // dummy room to make sure SQL queries are filtering on room ID + MustWriteEvents(t, db, test.NewRoom(t, alice).Events()) + + // actual test room r := test.NewRoom(t, alice) r.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": "hi"}) events := r.Events() positions := MustWriteEvents(t, db, events) + + // dummy room to make sure SQL queries are filtering on room ID + MustWriteEvents(t, db, test.NewRoom(t, alice).Events()) + latest, err := db.MaxStreamPositionForPDUs(ctx) if err != nil { t.Fatalf("failed to get MaxStreamPositionForPDUs: %s", err) } testCases := []struct { - Name string - From types.StreamPosition - To types.StreamPosition - WantEvents []*gomatrixserverlib.HeaderedEvent - WantLimited bool + Name string + From types.StreamPosition + To types.StreamPosition + Limit int + ReverseOrder bool + WantEvents []*gomatrixserverlib.HeaderedEvent + WantLimited bool }{ // The purpose of this test is to make sure that incremental syncs are including up to the latest events. - // It's a basic sanity test that sync works. It creates a `since` token that is on the penultimate event. + // It's a basic sanity test that sync works. It creates a streaming position that is on the penultimate event. // It makes sure the response includes the final event. { - Name: "IncrementalSync penultimate", + Name: "penultimate", From: positions[len(positions)-2], // pretend we are at the penultimate event To: latest, + Limit: 100, WantEvents: events[len(events)-1:], WantLimited: false, }, - /* - // The purpose of this test is to check that passing a `numRecentEventsPerRoom` correctly limits the - // number of returned events. This is critical for big rooms hence the test here. - { - Name: "IncrementalSync limited", - DoSync: func() (*types.Response, error) { - from := types.StreamingToken{ // pretend we are 10 events behind - PDUPosition: positions[len(positions)-11], - } - res := types.NewResponse() - // limit is set to 5 - return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) - }, - // want the last 5 events, NOT the last 10. - WantTimeline: events[len(events)-5:], - }, - // The purpose of this test is to check that CompleteSync returns all the current state as well as - // honouring the `numRecentEventsPerRoom` value - { - Name: "CompleteSync limited", - DoSync: func() (*types.Response, error) { - res := types.NewResponse() - // limit set to 5 - return db.CompleteSync(ctx, res, testUserDeviceA, 5) - }, - // want the last 5 events - WantTimeline: events[len(events)-5:], - // want all state for the room - WantState: state, - }, - // The purpose of this test is to check that CompleteSync can return everything with a high enough - // `numRecentEventsPerRoom`. - { - Name: "CompleteSync", - DoSync: func() (*types.Response, error) { - res := types.NewResponse() - return db.CompleteSync(ctx, res, testUserDeviceA, len(events)+1) - }, - WantTimeline: events, - // We want no state at all as that field in /sync is the delta between the token (beginning of time) - // and the START of the timeline. - }, */ + // The purpose of this test is to check that limits can be applied and work. + // This is critical for big rooms hence the test here. + { + Name: "limited", + From: 0, + To: latest, + Limit: 1, + WantEvents: events[len(events)-1:], + WantLimited: true, + }, + // The purpose of this test is to check that we can return every event with a high + // enough limit + { + Name: "large limited", + From: 0, + To: latest, + Limit: 100, + WantEvents: events, + WantLimited: false, + }, + // The purpose of this test is to check that we can return events in reverse order + { + Name: "reverse", + From: positions[len(positions)-3], // 2 events back + To: latest, + Limit: 100, + ReverseOrder: true, + WantEvents: test.Reversed(events[len(events)-2:]), + WantLimited: false, + }, } - for _, tc := range testCases { + for i := range testCases { + tc := testCases[i] t.Run(tc.Name, func(st *testing.T) { + var filter gomatrixserverlib.RoomEventFilter + filter.Limit = tc.Limit gotEvents, limited, err := db.RecentEvents(ctx, r.ID, types.Range{ From: tc.From, To: tc.To, - }, &filter, true, true) + }, &filter, !tc.ReverseOrder, true) if err != nil { st.Fatalf("failed to do sync: %s", err) } @@ -148,100 +148,48 @@ func TestRecentEventsPDU(t *testing.T) { if len(gotEvents) != len(tc.WantEvents) { st.Errorf("got %d events, want %d", len(gotEvents), len(tc.WantEvents)) } + for j := range gotEvents { + if !reflect.DeepEqual(gotEvents[j].JSON(), tc.WantEvents[j].JSON()) { + st.Errorf("event %d got %s want %s", j, string(gotEvents[j].JSON()), string(tc.WantEvents[j].JSON())) + } + } }) } }) } -/* -func TestGetEventsInRangeWithPrevBatch(t *testing.T) { - t.Parallel() - db := MustCreateDatabase(t) - events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB) - positions := MustWriteEvents(t, db, events) - latest, err := db.SyncPosition(ctx) - if err != nil { - t.Fatalf("failed to get SyncPosition: %s", err) - } - from := types.StreamingToken{ - PDUPosition: positions[len(positions)-2], - } - - res := types.NewResponse() - res, err = db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) - if err != nil { - t.Fatalf("failed to IncrementalSync with latest token") - } - roomRes, ok := res.Rooms.Join[testRoomID] - if !ok { - t.Fatalf("IncrementalSync response missing room %s - response: %+v", testRoomID, res) - } - // returns the last event "Message 10" - assertEventsEqual(t, "IncrementalSync Timeline", false, roomRes.Timeline.Events, reversed(events[len(events)-1:])) - - prev := roomRes.Timeline.PrevBatch.String() - if prev == "" { - t.Fatalf("IncrementalSync expected prev_batch token") - } - prevBatchToken, err := types.NewTopologyTokenFromString(prev) - if err != nil { - t.Fatalf("failed to NewTopologyTokenFromString : %s", err) - } - // backpaginate 5 messages starting at the latest position. - // head towards the beginning of time - to := types.TopologyToken{} - paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &prevBatchToken, &to, testRoomID, 5, true) - if err != nil { - t.Fatalf("GetEventsInRange returned an error: %s", err) - } - gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll) - assertEventsEqual(t, "", true, gots, reversed(events[len(events)-6:len(events)-1])) -} - -// The purpose of this test is to ensure that backfill does indeed go backwards, using a stream token. -func TestGetEventsInRangeWithStreamToken(t *testing.T) { - t.Parallel() - db := MustCreateDatabase(t) - events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB) - MustWriteEvents(t, db, events) - latest, err := db.SyncPosition(ctx) - if err != nil { - t.Fatalf("failed to get SyncPosition: %s", err) - } - // head towards the beginning of time - to := types.StreamingToken{} - - // backpaginate 5 messages starting at the latest position. - paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true) - if err != nil { - t.Fatalf("GetEventsInRange returned an error: %s", err) - } - gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll) - assertEventsEqual(t, "", true, gots, reversed(events[len(events)-5:])) -} - // The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token func TestGetEventsInRangeWithTopologyToken(t *testing.T) { - t.Parallel() - db := MustCreateDatabase(t) - events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB) - MustWriteEvents(t, db, events) - from, err := db.MaxTopologicalPosition(ctx, testRoomID) - if err != nil { - t.Fatalf("failed to get MaxTopologicalPosition: %s", err) - } - // head towards the beginning of time - to := types.TopologyToken{} + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, close := MustCreateDatabase(t, dbType) + defer close() + alice := test.NewUser() + r := test.NewRoom(t, alice) + for i := 0; i < 10; i++ { + r.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("hi %d", i)}) + } + events := r.Events() + _ = MustWriteEvents(t, db, events) - // backpaginate 5 messages starting at the latest position. - paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, testRoomID, 5, true) - if err != nil { - t.Fatalf("GetEventsInRange returned an error: %s", err) - } - gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll) - assertEventsEqual(t, "", true, gots, reversed(events[len(events)-5:])) + from, err := db.MaxTopologicalPosition(ctx, r.ID) + if err != nil { + t.Fatalf("failed to get MaxTopologicalPosition: %s", err) + } + t.Logf("max topo pos = %+v", from) + // head towards the beginning of time + to := types.TopologyToken{} + + // backpaginate 5 messages starting at the latest position. + paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, 5, true) + if err != nil { + t.Fatalf("GetEventsInTopologicalRange returned an error: %s", err) + } + gots := db.StreamEventsToEvents(nil, paginatedEvents) + test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-5:])) + }) } +/* // The purpose of this test is to make sure that backpagination returns all events, even if some events have the same depth. // For cases where events have the same depth, the streaming token should be used to tie break so events written via WriteEvent // will appear FIRST when going backwards. This test creates a DAG like: @@ -651,12 +599,4 @@ func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *typ tok.Decrement() return &tok } - -func reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { - out := make([]*gomatrixserverlib.HeaderedEvent, len(in)) - for i := 0; i < len(in); i++ { - out[i] = in[len(in)-i-1] - } - return out -} */ |