aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/roomserver.go2
-rw-r--r--syncapi/routing/routing.go2
-rw-r--r--syncapi/routing/search.go2
-rw-r--r--syncapi/storage/postgres/syncserver.go8
-rw-r--r--syncapi/storage/sqlite3/syncserver.go8
-rw-r--r--syncapi/storage/storage.go9
-rw-r--r--syncapi/storage/storage_test.go35
-rw-r--r--syncapi/storage/storage_wasm.go7
-rw-r--r--syncapi/syncapi.go23
-rw-r--r--syncapi/syncapi_test.go52
11 files changed, 80 insertions, 70 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 735f6718..43dc0f51 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -50,7 +50,7 @@ type OutputClientDataConsumer struct {
stream streams.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
- fts *fulltext.Search
+ fts fulltext.Indexer
cfg *config.SyncAPI
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index a8d4d2b2..21f6104d 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -51,7 +51,7 @@ type OutputRoomEventConsumer struct {
pduStream streams.StreamProvider
inviteStream streams.StreamProvider
notifier *notifier.Notifier
- fts *fulltext.Search
+ fts fulltext.Indexer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index 4cc1a6a8..b1283247 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -43,7 +43,7 @@ func Setup(
rsAPI api.SyncRoomserverAPI,
cfg *config.SyncAPI,
lazyLoadCache caching.LazyLoadCache,
- fts *fulltext.Search,
+ fts fulltext.Indexer,
) {
v1unstablemux := csMux.PathPrefix("/{apiversion:(?:v1|unstable)}/").Subrouter()
v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter()
diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go
index 081ec6cb..13625b9c 100644
--- a/syncapi/routing/search.go
+++ b/syncapi/routing/search.go
@@ -37,7 +37,7 @@ import (
)
// nolint:gocyclo
-func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts *fulltext.Search, from *string) util.JSONResponse {
+func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts fulltext.Indexer, from *string) util.JSONResponse {
start := time.Now()
var (
searchReq SearchRequest
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 850d24a0..9f9de28d 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -16,12 +16,12 @@
package postgres
import (
+ "context"
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
@@ -36,10 +36,10 @@ type SyncServerDatasource struct {
}
// NewDatabase creates a new sync server database
-func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
+func NewDatabase(ctx context.Context, cm sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
- if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
+ if d.db, d.writer, err = cm.Connection(dbProperties); err != nil {
return nil, err
}
accountData, err := NewPostgresAccountDataTable(d.db)
@@ -111,7 +111,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
},
)
- err = m.Up(base.Context())
+ err = m.Up(ctx)
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 51054690..3f1ca355 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -20,7 +20,6 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
@@ -37,13 +36,14 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint: gocyclo
-func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
+func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
- if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
+
+ if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil {
return nil, err
}
- if err = d.prepare(base.Context()); err != nil {
+ if err = d.prepare(ctx); err != nil {
return nil, err
}
return &d, nil
diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go
index 5b20c6cc..8714ec5e 100644
--- a/syncapi/storage/storage.go
+++ b/syncapi/storage/storage.go
@@ -18,21 +18,22 @@
package storage
import (
+ "context"
"fmt"
- "github.com/matrix-org/dendrite/setup/base"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
)
// NewSyncServerDatasource opens a database connection.
-func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
+func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
- return sqlite3.NewDatabase(base, dbProperties)
+ return sqlite3.NewDatabase(ctx, conMan, dbProperties)
case dbProperties.ConnectionString.IsPostgres():
- return postgres.NewDatabase(base, dbProperties)
+ return postgres.NewDatabase(ctx, conMan, dbProperties)
default:
return nil, fmt.Errorf("unexpected database type")
}
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 05d498bc..9f006490 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -9,27 +9,27 @@ import (
"reflect"
"testing"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
- "github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
)
var ctx = context.Background()
-func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) {
+func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
connStr, close := test.PrepareDBConnectionString(t, dbType)
- base, closeBase := testrig.CreateBaseDendrite(t, dbType)
- db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{
+ cm := sqlutil.NewConnectionManager()
+ db, err := storage.NewSyncServerDatasource(context.Background(), cm, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})
if err != nil {
t.Fatalf("NewSyncServerDatasource returned %s", err)
}
- return db, close, closeBase
+ return db, close
}
func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
@@ -55,9 +55,8 @@ func TestWriteEvents(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
- db, close, closeBase := MustCreateDatabase(t, dbType)
+ db, close := MustCreateDatabase(t, dbType)
defer close()
- defer closeBase()
MustWriteEvents(t, db, r.Events())
})
}
@@ -76,9 +75,8 @@ func WithSnapshot(t *testing.T, db storage.Database, f func(snapshot storage.Dat
// These tests assert basic functionality of RecentEvents for PDUs
func TestRecentEventsPDU(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close, closeBase := MustCreateDatabase(t, dbType)
+ db, close := MustCreateDatabase(t, dbType)
defer close()
- defer closeBase()
alice := test.NewUser(t)
// dummy room to make sure SQL queries are filtering on room ID
MustWriteEvents(t, db, test.NewRoom(t, alice).Events())
@@ -191,9 +189,8 @@ func TestRecentEventsPDU(t *testing.T) {
// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token
func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close, closeBase := MustCreateDatabase(t, dbType)
+ db, close := MustCreateDatabase(t, dbType)
defer close()
- defer closeBase()
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
for i := 0; i < 10; i++ {
@@ -276,9 +273,8 @@ func TestStreamToTopologicalPosition(t *testing.T) {
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close, closeBase := MustCreateDatabase(t, dbType)
+ db, close := MustCreateDatabase(t, dbType)
defer close()
- defer closeBase()
txn, err := db.NewDatabaseTransaction(ctx)
if err != nil {
@@ -514,9 +510,8 @@ func TestSendToDeviceBehaviour(t *testing.T) {
bob := test.NewUser(t)
deviceID := "one"
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close, closeBase := MustCreateDatabase(t, dbType)
+ db, close := MustCreateDatabase(t, dbType)
defer close()
- defer closeBase()
// At this point there should be no messages. We haven't sent anything
// yet.
@@ -899,9 +894,8 @@ func TestRoomSummary(t *testing.T) {
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close, closeBase := MustCreateDatabase(t, dbType)
+ db, close := MustCreateDatabase(t, dbType)
defer close()
- defer closeBase()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -939,11 +933,8 @@ func TestRecentEvents(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
filter := gomatrixserverlib.DefaultRoomEventFilter()
- db, close, closeBase := MustCreateDatabase(t, dbType)
- t.Cleanup(func() {
- close()
- closeBase()
- })
+ db, close := MustCreateDatabase(t, dbType)
+ t.Cleanup(close)
MustWriteEvents(t, db, room1.Events())
MustWriteEvents(t, db, room2.Events())
diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go
index c1544474..db0b173b 100644
--- a/syncapi/storage/storage_wasm.go
+++ b/syncapi/storage/storage_wasm.go
@@ -15,18 +15,19 @@
package storage
import (
+ "context"
"fmt"
- "github.com/matrix-org/dendrite/setup/base"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
)
// NewPublicRoomsServerDatabase opens a database connection.
-func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
+func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
- return sqlite3.NewDatabase(base, dbProperties)
+ return sqlite3.NewDatabase(ctx, conMan, dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation")
default:
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 153f7af5..e0cc8462 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -17,6 +17,7 @@ package syncapi
import (
"context"
+ "github.com/matrix-org/dendrite/internal/fulltext"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching"
@@ -41,24 +42,34 @@ func AddPublicRoutes(
base *base.BaseDendrite,
userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI,
+ caches caching.LazyLoadCache,
) {
cfg := &base.Cfg.SyncAPI
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
- syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
+ syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
eduCache := caching.NewTypingCache()
notifier := notifier.NewNotifier()
- streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, base.Caches, notifier)
+ streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, caches, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
+ var fts *fulltext.Search
+ if cfg.Fulltext.Enabled {
+ fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to create full text")
+ }
+ base.ProcessContext.ComponentStarted()
+ }
+
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js,
@@ -86,7 +97,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
- streams.InviteStreamProvider, rsAPI, base.Fulltext,
+ streams.InviteStreamProvider, rsAPI, fts,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@@ -94,7 +105,7 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
- streams.AccountDataStreamProvider, base.Fulltext,
+ streams.AccountDataStreamProvider, fts,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
@@ -129,7 +140,7 @@ func AddPublicRoutes(
}
routing.Setup(
- base.PublicClientAPIMux, requestPool, syncDB, userAPI,
- rsAPI, cfg, base.Caches, base.Fulltext,
+ base.Routers.Client, requestPool, syncDB, userAPI,
+ rsAPI, cfg, caches, fts,
)
}
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 1cb82ce1..13a07865 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
@@ -114,12 +115,13 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
}
base, close := testrig.CreateBaseDendrite(t, dbType)
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}})
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
@@ -162,7 +164,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
for _, tc := range testCases {
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, tc.req)
+ base.Routers.Client.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
@@ -218,12 +220,13 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
// m.room.history_visibility
msgs := toNATSMsgs(t, base, room.Events()...)
sinceTokens := make([]string, len(msgs))
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}})
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})))
@@ -253,7 +256,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens {
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"since": since,
@@ -302,9 +305,10 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{})
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"set_presence": "online",
@@ -417,10 +421,10 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use the actual internal roomserver API
- rsAPI := roomserver.NewInternalAPI(base)
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil)
-
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@@ -444,7 +448,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
@@ -484,7 +488,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
@@ -717,10 +721,11 @@ func TestGetMembership(t *testing.T) {
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use an actual roomserver for this
- rsAPI := roomserver.NewInternalAPI(base)
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -748,7 +753,7 @@ func TestGetMembership(t *testing.T) {
}
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, tc.request(t, room))
+ base.Routers.Client.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
@@ -786,8 +791,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
-
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{})
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
@@ -885,7 +890,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
@@ -1003,10 +1008,11 @@ func testContext(t *testing.T, dbType test.DBType) {
defer baseClose()
// Use an actual roomserver for this
- rsAPI := roomserver.NewInternalAPI(base)
+ caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
+ rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil)
- AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI)
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
room := test.NewRoom(t, user)
@@ -1049,7 +1055,7 @@ func testContext(t *testing.T, dbType test.DBType) {
params[k] = v
}
}
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
if tc.wantError && w.Code == 200 {
t.Fatalf("Expected an error, but got none")
@@ -1139,7 +1145,7 @@ func TestUpdateRelations(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, shutdownBase := testrig.CreateBaseDendrite(t, dbType)
t.Cleanup(shutdownBase)
- db, err := storage.NewSyncServerDatasource(base, &base.Cfg.SyncAPI.Database)
+ db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database)
if err != nil {
t.Fatal(err)
}
@@ -1178,7 +1184,7 @@ func syncUntil(t *testing.T,
go func() {
for {
w := httptest.NewRecorder()
- base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))