aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/storage_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/storage_test.go')
-rw-r--r--syncapi/storage/storage_test.go234
1 files changed, 87 insertions, 147 deletions
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 403b50ea..4e1634ec 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -3,6 +3,7 @@ package storage_test
import (
"context"
"fmt"
+ "reflect"
"testing"
"github.com/matrix-org/dendrite/setup/config"
@@ -38,7 +39,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver
if err != nil {
t.Fatalf("WriteEvent failed: %s", err)
}
- fmt.Println("Event ID", ev.EventID(), " spos=", pos, "depth=", ev.Depth())
+ t.Logf("Event ID %s spos=%v depth=%v", ev.EventID(), pos, ev.Depth())
positions = append(positions, pos)
}
return
@@ -46,7 +47,6 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver
func TestWriteEvents(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- t.Parallel()
alice := test.NewUser()
r := test.NewRoom(t, alice)
db, close := MustCreateDatabase(t, dbType)
@@ -61,84 +61,84 @@ func TestRecentEventsPDU(t *testing.T) {
db, close := MustCreateDatabase(t, dbType)
defer close()
alice := test.NewUser()
- var filter gomatrixserverlib.RoomEventFilter
- filter.Limit = 100
+ // dummy room to make sure SQL queries are filtering on room ID
+ MustWriteEvents(t, db, test.NewRoom(t, alice).Events())
+
+ // actual test room
r := test.NewRoom(t, alice)
r.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": "hi"})
events := r.Events()
positions := MustWriteEvents(t, db, events)
+
+ // dummy room to make sure SQL queries are filtering on room ID
+ MustWriteEvents(t, db, test.NewRoom(t, alice).Events())
+
latest, err := db.MaxStreamPositionForPDUs(ctx)
if err != nil {
t.Fatalf("failed to get MaxStreamPositionForPDUs: %s", err)
}
testCases := []struct {
- Name string
- From types.StreamPosition
- To types.StreamPosition
- WantEvents []*gomatrixserverlib.HeaderedEvent
- WantLimited bool
+ Name string
+ From types.StreamPosition
+ To types.StreamPosition
+ Limit int
+ ReverseOrder bool
+ WantEvents []*gomatrixserverlib.HeaderedEvent
+ WantLimited bool
}{
// The purpose of this test is to make sure that incremental syncs are including up to the latest events.
- // It's a basic sanity test that sync works. It creates a `since` token that is on the penultimate event.
+ // It's a basic sanity test that sync works. It creates a streaming position that is on the penultimate event.
// It makes sure the response includes the final event.
{
- Name: "IncrementalSync penultimate",
+ Name: "penultimate",
From: positions[len(positions)-2], // pretend we are at the penultimate event
To: latest,
+ Limit: 100,
WantEvents: events[len(events)-1:],
WantLimited: false,
},
- /*
- // The purpose of this test is to check that passing a `numRecentEventsPerRoom` correctly limits the
- // number of returned events. This is critical for big rooms hence the test here.
- {
- Name: "IncrementalSync limited",
- DoSync: func() (*types.Response, error) {
- from := types.StreamingToken{ // pretend we are 10 events behind
- PDUPosition: positions[len(positions)-11],
- }
- res := types.NewResponse()
- // limit is set to 5
- return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
- },
- // want the last 5 events, NOT the last 10.
- WantTimeline: events[len(events)-5:],
- },
- // The purpose of this test is to check that CompleteSync returns all the current state as well as
- // honouring the `numRecentEventsPerRoom` value
- {
- Name: "CompleteSync limited",
- DoSync: func() (*types.Response, error) {
- res := types.NewResponse()
- // limit set to 5
- return db.CompleteSync(ctx, res, testUserDeviceA, 5)
- },
- // want the last 5 events
- WantTimeline: events[len(events)-5:],
- // want all state for the room
- WantState: state,
- },
- // The purpose of this test is to check that CompleteSync can return everything with a high enough
- // `numRecentEventsPerRoom`.
- {
- Name: "CompleteSync",
- DoSync: func() (*types.Response, error) {
- res := types.NewResponse()
- return db.CompleteSync(ctx, res, testUserDeviceA, len(events)+1)
- },
- WantTimeline: events,
- // We want no state at all as that field in /sync is the delta between the token (beginning of time)
- // and the START of the timeline.
- }, */
+ // The purpose of this test is to check that limits can be applied and work.
+ // This is critical for big rooms hence the test here.
+ {
+ Name: "limited",
+ From: 0,
+ To: latest,
+ Limit: 1,
+ WantEvents: events[len(events)-1:],
+ WantLimited: true,
+ },
+ // The purpose of this test is to check that we can return every event with a high
+ // enough limit
+ {
+ Name: "large limited",
+ From: 0,
+ To: latest,
+ Limit: 100,
+ WantEvents: events,
+ WantLimited: false,
+ },
+ // The purpose of this test is to check that we can return events in reverse order
+ {
+ Name: "reverse",
+ From: positions[len(positions)-3], // 2 events back
+ To: latest,
+ Limit: 100,
+ ReverseOrder: true,
+ WantEvents: test.Reversed(events[len(events)-2:]),
+ WantLimited: false,
+ },
}
- for _, tc := range testCases {
+ for i := range testCases {
+ tc := testCases[i]
t.Run(tc.Name, func(st *testing.T) {
+ var filter gomatrixserverlib.RoomEventFilter
+ filter.Limit = tc.Limit
gotEvents, limited, err := db.RecentEvents(ctx, r.ID, types.Range{
From: tc.From,
To: tc.To,
- }, &filter, true, true)
+ }, &filter, !tc.ReverseOrder, true)
if err != nil {
st.Fatalf("failed to do sync: %s", err)
}
@@ -148,100 +148,48 @@ func TestRecentEventsPDU(t *testing.T) {
if len(gotEvents) != len(tc.WantEvents) {
st.Errorf("got %d events, want %d", len(gotEvents), len(tc.WantEvents))
}
+ for j := range gotEvents {
+ if !reflect.DeepEqual(gotEvents[j].JSON(), tc.WantEvents[j].JSON()) {
+ st.Errorf("event %d got %s want %s", j, string(gotEvents[j].JSON()), string(tc.WantEvents[j].JSON()))
+ }
+ }
})
}
})
}
-/*
-func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
- t.Parallel()
- db := MustCreateDatabase(t)
- events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
- positions := MustWriteEvents(t, db, events)
- latest, err := db.SyncPosition(ctx)
- if err != nil {
- t.Fatalf("failed to get SyncPosition: %s", err)
- }
- from := types.StreamingToken{
- PDUPosition: positions[len(positions)-2],
- }
-
- res := types.NewResponse()
- res, err = db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
- if err != nil {
- t.Fatalf("failed to IncrementalSync with latest token")
- }
- roomRes, ok := res.Rooms.Join[testRoomID]
- if !ok {
- t.Fatalf("IncrementalSync response missing room %s - response: %+v", testRoomID, res)
- }
- // returns the last event "Message 10"
- assertEventsEqual(t, "IncrementalSync Timeline", false, roomRes.Timeline.Events, reversed(events[len(events)-1:]))
-
- prev := roomRes.Timeline.PrevBatch.String()
- if prev == "" {
- t.Fatalf("IncrementalSync expected prev_batch token")
- }
- prevBatchToken, err := types.NewTopologyTokenFromString(prev)
- if err != nil {
- t.Fatalf("failed to NewTopologyTokenFromString : %s", err)
- }
- // backpaginate 5 messages starting at the latest position.
- // head towards the beginning of time
- to := types.TopologyToken{}
- paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &prevBatchToken, &to, testRoomID, 5, true)
- if err != nil {
- t.Fatalf("GetEventsInRange returned an error: %s", err)
- }
- gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll)
- assertEventsEqual(t, "", true, gots, reversed(events[len(events)-6:len(events)-1]))
-}
-
-// The purpose of this test is to ensure that backfill does indeed go backwards, using a stream token.
-func TestGetEventsInRangeWithStreamToken(t *testing.T) {
- t.Parallel()
- db := MustCreateDatabase(t)
- events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
- MustWriteEvents(t, db, events)
- latest, err := db.SyncPosition(ctx)
- if err != nil {
- t.Fatalf("failed to get SyncPosition: %s", err)
- }
- // head towards the beginning of time
- to := types.StreamingToken{}
-
- // backpaginate 5 messages starting at the latest position.
- paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true)
- if err != nil {
- t.Fatalf("GetEventsInRange returned an error: %s", err)
- }
- gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll)
- assertEventsEqual(t, "", true, gots, reversed(events[len(events)-5:]))
-}
-
// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token
func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
- t.Parallel()
- db := MustCreateDatabase(t)
- events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
- MustWriteEvents(t, db, events)
- from, err := db.MaxTopologicalPosition(ctx, testRoomID)
- if err != nil {
- t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
- }
- // head towards the beginning of time
- to := types.TopologyToken{}
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ db, close := MustCreateDatabase(t, dbType)
+ defer close()
+ alice := test.NewUser()
+ r := test.NewRoom(t, alice)
+ for i := 0; i < 10; i++ {
+ r.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("hi %d", i)})
+ }
+ events := r.Events()
+ _ = MustWriteEvents(t, db, events)
- // backpaginate 5 messages starting at the latest position.
- paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, testRoomID, 5, true)
- if err != nil {
- t.Fatalf("GetEventsInRange returned an error: %s", err)
- }
- gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll)
- assertEventsEqual(t, "", true, gots, reversed(events[len(events)-5:]))
+ from, err := db.MaxTopologicalPosition(ctx, r.ID)
+ if err != nil {
+ t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
+ }
+ t.Logf("max topo pos = %+v", from)
+ // head towards the beginning of time
+ to := types.TopologyToken{}
+
+ // backpaginate 5 messages starting at the latest position.
+ paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, 5, true)
+ if err != nil {
+ t.Fatalf("GetEventsInTopologicalRange returned an error: %s", err)
+ }
+ gots := db.StreamEventsToEvents(nil, paginatedEvents)
+ test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-5:]))
+ })
}
+/*
// The purpose of this test is to make sure that backpagination returns all events, even if some events have the same depth.
// For cases where events have the same depth, the streaming token should be used to tie break so events written via WriteEvent
// will appear FIRST when going backwards. This test creates a DAG like:
@@ -651,12 +599,4 @@ func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *typ
tok.Decrement()
return &tok
}
-
-func reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
- out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
- for i := 0; i < len(in); i++ {
- out[i] = in[len(in)-i-1]
- }
- return out
-}
*/