aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-03-22 09:21:32 +0100
committerGitHub <noreply@github.com>2023-03-22 09:21:32 +0100
commit5e85a00cb36c3d343cd5b6f6a18435989724a135 (patch)
treeb22c34dd0a6cdc04025b90086843f9084a876412 /syncapi
parentec6879e5ae2919c903707475ce8d72244b2a6847 (diff)
Remove `BaseDendrite` (#3023)
Removes `BaseDendrite` to, hopefully, make testing and composing of components easier in the future.
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/storage/storage_test.go2
-rw-r--r--syncapi/syncapi.go48
-rw-r--r--syncapi/syncapi_test.go162
3 files changed, 121 insertions, 91 deletions
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 9f006490..e81a341f 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -22,7 +22,7 @@ var ctx = context.Background()
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
connStr, close := test.PrepareDBConnectionString(t, dbType)
- cm := sqlutil.NewConnectionManager()
+ cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
db, err := storage.NewSyncServerDatasource(context.Background(), cm, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index e0cc8462..9a27b954 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -18,12 +18,15 @@ import (
"context"
"github.com/matrix-org/dendrite/internal/fulltext"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -39,16 +42,19 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
// component.
func AddPublicRoutes(
- base *base.BaseDendrite,
+ processContext *process.ProcessContext,
+ routers httputil.Routers,
+ dendriteCfg *config.Dendrite,
+ cm sqlutil.Connections,
+ natsInstance *jetstream.NATSInstance,
userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI,
caches caching.LazyLoadCache,
+ enableMetrics bool,
) {
- cfg := &base.Cfg.SyncAPI
+ js, natsClient := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
- js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
-
- syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database)
+ syncDB, err := storage.NewSyncServerDatasource(processContext.Context(), cm, &dendriteCfg.SyncAPI.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
@@ -62,32 +68,32 @@ func AddPublicRoutes(
}
var fts *fulltext.Search
- if cfg.Fulltext.Enabled {
- fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext)
+ if dendriteCfg.SyncAPI.Fulltext.Enabled {
+ fts, err = fulltext.New(processContext.Context(), dendriteCfg.SyncAPI.Fulltext)
if err != nil {
logrus.WithError(err).Panicf("failed to create full text")
}
- base.ProcessContext.ComponentStarted()
+ processContext.ComponentStarted()
}
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
- Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js,
}
presenceConsumer := consumers.NewPresenceConsumer(
- base.ProcessContext, cfg, js, natsClient, syncDB,
+ processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB,
notifier, streams.PresenceStreamProvider,
userAPI,
)
- requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, base.EnableMetrics)
+ requestPool := sync.NewRequestPool(syncDB, &dendriteCfg.SyncAPI, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, enableMetrics)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")
}
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
+ processContext, &dendriteCfg.SyncAPI, dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider,
)
@@ -96,7 +102,7 @@ func AddPublicRoutes(
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
+ processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, fts,
)
if err = roomConsumer.Start(); err != nil {
@@ -104,7 +110,7 @@ func AddPublicRoutes(
}
clientConsumer := consumers.NewOutputClientDataConsumer(
- base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
+ processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB, notifier,
streams.AccountDataStreamProvider, fts,
)
if err = clientConsumer.Start(); err != nil {
@@ -112,35 +118,35 @@ func AddPublicRoutes(
}
notificationConsumer := consumers.NewOutputNotificationDataConsumer(
- base.ProcessContext, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider,
+ processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.NotificationDataStreamProvider,
)
if err = notificationConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start notification data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
- base.ProcessContext, cfg, js, eduCache, notifier, streams.TypingStreamProvider,
+ processContext, &dendriteCfg.SyncAPI, js, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
- base.ProcessContext, cfg, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider,
+ processContext, &dendriteCfg.SyncAPI, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
- base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider,
+ processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
routing.Setup(
- base.Routers.Client, requestPool, syncDB, userAPI,
- rsAPI, cfg, caches, fts,
+ routers.Client, requestPool, syncDB, userAPI,
+ rsAPI, &dendriteCfg.SyncAPI, caches, fts,
)
}
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 13a07865..584782af 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -11,6 +11,9 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/caching"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
@@ -22,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
@@ -114,14 +116,17 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ natsInstance := jetstream.NATSInstance{}
defer close()
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- msgs := toNATSMsgs(t, base, room.Events()...)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+ msgs := toNATSMsgs(t, cfg, room.Events()...)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
@@ -156,7 +161,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
},
}
- syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
+ syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
@@ -164,7 +169,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
for _, tc := range testCases {
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, tc.req)
+ routers.Client.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
@@ -207,26 +212,29 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// order is:
// m.room.create
// m.room.member
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
- msgs := toNATSMsgs(t, base, room.Events()...)
+ msgs := toNATSMsgs(t, cfg, room.Events()...)
sinceTokens := make([]string, len(msgs))
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})))
@@ -256,7 +264,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens {
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"since": since,
@@ -298,17 +306,20 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
- base.Cfg.Global.Presence.EnableOutbound = true
- base.Cfg.Global.Presence.EnableInbound = true
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ cfg.Global.Presence.EnableOutbound = true
+ cfg.Global.Presence.EnableInbound = true
defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"set_presence": "online",
@@ -414,17 +425,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
userType = "real user"
}
- base, close := testrig.CreateBaseDendrite(t, dbType)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use the actual internal roomserver API
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- rsAPI := roomserver.NewInternalAPI(base, caches)
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@@ -439,7 +453,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
- syncUntil(t, base, aliceDev.AccessToken, false,
+ syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
@@ -448,7 +462,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
@@ -479,7 +493,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
- syncUntil(t, base, aliceDev.AccessToken, false,
+ syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
@@ -488,7 +502,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
@@ -714,18 +728,20 @@ func TestGetMembership(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- base, close := testrig.CreateBaseDendrite(t, dbType)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
-
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ natsInstance := jetstream.NATSInstance{}
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use an actual roomserver for this
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- rsAPI := roomserver.NewInternalAPI(base, caches)
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -745,7 +761,7 @@ func TestGetMembership(t *testing.T) {
if tc.useSleep {
time.Sleep(time.Millisecond * 100)
} else {
- syncUntil(t, base, aliceDev.AccessToken, false, func(syncBody string) bool {
+ syncUntil(t, routers, aliceDev.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
@@ -753,7 +769,7 @@ func TestGetMembership(t *testing.T) {
}
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, tc.request(t, room))
+ routers.Client.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
@@ -786,16 +802,19 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, baseClose := testrig.CreateBaseDendrite(t, dbType)
- defer baseClose()
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ defer close()
+ natsInstance := jetstream.NATSInstance{}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
producer := producers.SyncAPIProducer{
- TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
JetStream: jsctx,
}
@@ -881,7 +900,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
- syncUntil(t, base, alice.AccessToken,
+ syncUntil(t, routers, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
@@ -890,7 +909,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
@@ -1004,15 +1023,18 @@ func testContext(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
- base, baseClose := testrig.CreateBaseDendrite(t, dbType)
- defer baseClose()
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ routers := httputil.NewRouters()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
+ defer close()
// Use an actual roomserver for this
- caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
- rsAPI := roomserver.NewInternalAPI(base, caches)
+ natsInstance := jetstream.NATSInstance{}
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
+ AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, caching.DisableMetrics)
room := test.NewRoom(t, user)
@@ -1025,10 +1047,10 @@ func testContext(t *testing.T, dbType test.DBType) {
t.Fatalf("failed to send events: %v", err)
}
- jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
- syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
+ syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
return gjson.Get(syncBody, path).Exists()
@@ -1055,7 +1077,7 @@ func testContext(t *testing.T, dbType test.DBType) {
params[k] = v
}
}
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
if tc.wantError && w.Code == 200 {
t.Fatalf("Expected an error, but got none")
@@ -1143,9 +1165,10 @@ func TestUpdateRelations(t *testing.T) {
room := test.NewRoom(t, alice)
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- base, shutdownBase := testrig.CreateBaseDendrite(t, dbType)
- t.Cleanup(shutdownBase)
- db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database)
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ t.Cleanup(close)
+ db, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
if err != nil {
t.Fatal(err)
}
@@ -1167,10 +1190,11 @@ func TestUpdateRelations(t *testing.T) {
}
func syncUntil(t *testing.T,
- base *base.BaseDendrite, accessToken string,
+ routers httputil.Routers, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
+ t.Helper()
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
@@ -1184,7 +1208,7 @@ func syncUntil(t *testing.T,
go func() {
for {
w := httptest.NewRecorder()
- base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
@@ -1202,14 +1226,14 @@ func syncUntil(t *testing.T,
}
}
-func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
+func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {
var addsStateIDs []string
if ev.StateKey() != nil {
addsStateIDs = append(addsStateIDs, ev.EventID())
}
- result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{
+ result[i] = testrig.NewOutputEventMsg(t, cfg, ev.RoomID(), api.OutputEvent{
Type: rsapi.OutputTypeNewRoomEvent,
NewRoomEvent: &rsapi.OutputNewRoomEvent{
Event: ev,