aboutsummaryrefslogtreecommitdiff
path: root/syncapi/syncapi_test.go
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-03-22 09:21:32 +0100
committerGitHub <noreply@github.com>2023-03-22 09:21:32 +0100
commit5e85a00cb36c3d343cd5b6f6a18435989724a135 (patch)
treeb22c34dd0a6cdc04025b90086843f9084a876412 /syncapi/syncapi_test.go
parentec6879e5ae2919c903707475ce8d72244b2a6847 (diff)
Remove `BaseDendrite` (#3023)
Removes `BaseDendrite` to, hopefully, make testing and composing of components easier in the future.
Diffstat (limited to 'syncapi/syncapi_test.go')
-rw-r--r--syncapi/syncapi_test.go162
1 files changed, 93 insertions, 69 deletions
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 13a07865..584782af 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -11,6 +11,9 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
@@ -22,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
@@ -114,14 +116,17 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ natsInstance := jetstream.NATSInstance{}
defer close()
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- msgs := toNATSMsgs(t, base, room.Events()...)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+ msgs := toNATSMsgs(t, cfg, room.Events()...)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
@@ -156,7 +161,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
},
}
- syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
+ syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
@@ -164,7 +169,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
for _, tc := range testCases {
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, tc.req)
+ routers.Client.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
@@ -207,26 +212,29 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// order is:
// m.room.create
// m.room.member
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
- msgs := toNATSMsgs(t, base, room.Events()...)
+ msgs := toNATSMsgs(t, cfg, room.Events()...)
sinceTokens := make([]string, len(msgs))
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})))
@@ -256,7 +264,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens {
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"since": since,
@@ -298,17 +306,20 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
- base.Cfg.Global.Presence.EnableOutbound = true
- base.Cfg.Global.Presence.EnableInbound = true
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ cfg.Global.Presence.EnableOutbound = true
+ cfg.Global.Presence.EnableInbound = true
defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"set_presence": "online",
@@ -414,17 +425,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
userType = "real user"
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use the actual internal roomserver API
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- rsAPI := roomserver.NewInternalAPI(base, caches)
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@@ -439,7 +453,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
- syncUntil(t, base, aliceDev.AccessToken, false,
+ syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
@@ -448,7 +462,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
@@ -479,7 +493,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
- syncUntil(t, base, aliceDev.AccessToken, false,
+ syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
@@ -488,7 +502,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
@@ -714,18 +728,20 @@ func TestGetMembership(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- base, close := testrig.CreateBaseDendrite(t, dbType)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
-
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ natsInstance := jetstream.NATSInstance{}
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use an actual roomserver for this
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- rsAPI := roomserver.NewInternalAPI(base, caches)
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -745,7 +761,7 @@ func TestGetMembership(t *testing.T) {
if tc.useSleep {
time.Sleep(time.Millisecond * 100)
} else {
- syncUntil(t, base, aliceDev.AccessToken, false, func(syncBody string) bool {
+ syncUntil(t, routers, aliceDev.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
@@ -753,7 +769,7 @@ func TestGetMembership(t *testing.T) {
}
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, tc.request(t, room))
+ routers.Client.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
@@ -786,16 +802,19 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, baseClose := testrig.CreateBaseDendrite(t, dbType)
- defer baseClose()
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
producer := producers.SyncAPIProducer{
- TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
JetStream: jsctx,
}
@@ -881,7 +900,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
- syncUntil(t, base, alice.AccessToken,
+ syncUntil(t, routers, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
@@ -890,7 +909,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
@@ -1004,15 +1023,18 @@ func testContext(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, baseClose := testrig.CreateBaseDendrite(t, dbType)
- defer baseClose()
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ defer close()
// Use an actual roomserver for this
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- rsAPI := roomserver.NewInternalAPI(base, caches)
+ natsInstance := jetstream.NATSInstance{}
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, caching.DisableMetrics)
room := test.NewRoom(t, user)
@@ -1025,10 +1047,10 @@ func testContext(t *testing.T, dbType test.DBType) {
t.Fatalf("failed to send events: %v", err)
}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
- syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
+ syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
return gjson.Get(syncBody, path).Exists()
@@ -1055,7 +1077,7 @@ func testContext(t *testing.T, dbType test.DBType) {
params[k] = v
}
}
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
if tc.wantError && w.Code == 200 {
t.Fatalf("Expected an error, but got none")
@@ -1143,9 +1165,10 @@ func TestUpdateRelations(t *testing.T) {
room := test.NewRoom(t, alice)
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- base, shutdownBase := testrig.CreateBaseDendrite(t, dbType)
- t.Cleanup(shutdownBase)
- db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ t.Cleanup(close)
+ db, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
if err != nil {
t.Fatal(err)
}
@@ -1167,10 +1190,11 @@ func TestUpdateRelations(t *testing.T) {
}
func syncUntil(t *testing.T,
- base *base.BaseDendrite, accessToken string,
+ routers httputil.Routers, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
+ t.Helper()
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
@@ -1184,7 +1208,7 @@ func syncUntil(t *testing.T,
go func() {
for {
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
@@ -1202,14 +1226,14 @@ func syncUntil(t *testing.T,
}
}
-func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
+func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {
var addsStateIDs []string
if ev.StateKey() != nil {
addsStateIDs = append(addsStateIDs, ev.EventID())
}
- result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{
+ result[i] = testrig.NewOutputEventMsg(t, cfg, ev.RoomID(), api.OutputEvent{
Type: rsapi.OutputTypeNewRoomEvent,
NewRoomEvent: &rsapi.OutputNewRoomEvent{
Event: ev,