aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--federationapi/federationapi_keys_test.go2
-rw-r--r--setup/base/base.go12
-rw-r--r--setup/jetstream/nats.go8
-rw-r--r--syncapi/sync/requestpool.go10
-rw-r--r--syncapi/syncapi.go2
-rw-r--r--syncapi/syncapi_test.go162
-rw-r--r--test/base.go72
-rw-r--r--test/http.go45
-rw-r--r--test/jetstream.go35
9 files changed, 337 insertions, 11 deletions
diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go
index 4774c882..31e9a4c7 100644
--- a/federationapi/federationapi_keys_test.go
+++ b/federationapi/federationapi_keys_test.go
@@ -102,7 +102,7 @@ func TestMain(m *testing.M) {
)
// Finally, build the server key APIs.
- sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
+ sbase := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics)
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true)
}
diff --git a/setup/base/base.go b/setup/base/base.go
index 0e7528a0..5cbd7da9 100644
--- a/setup/base/base.go
+++ b/setup/base/base.go
@@ -86,6 +86,7 @@ type BaseDendrite struct {
DNSCache *gomatrixserverlib.DNSCache
Database *sql.DB
DatabaseWriter sqlutil.Writer
+ EnableMetrics bool
}
const NoListener = ""
@@ -96,7 +97,7 @@ const HTTPClientTimeout = time.Second * 30
type BaseDendriteOptions int
const (
- NoCacheMetrics BaseDendriteOptions = iota
+ DisableMetrics BaseDendriteOptions = iota
UseHTTPAPIs
PolylithMode
)
@@ -107,12 +108,12 @@ const (
func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...BaseDendriteOptions) *BaseDendrite {
platformSanityChecks()
useHTTPAPIs := false
- cacheMetrics := true
+ enableMetrics := true
isMonolith := true
for _, opt := range options {
switch opt {
- case NoCacheMetrics:
- cacheMetrics = false
+ case DisableMetrics:
+ enableMetrics = false
case UseHTTPAPIs:
useHTTPAPIs = true
case PolylithMode:
@@ -160,7 +161,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
}
}
- cache, err := caching.NewInMemoryLRUCache(cacheMetrics)
+ cache, err := caching.NewInMemoryLRUCache(enableMetrics)
if err != nil {
logrus.WithError(err).Warnf("Failed to create cache")
}
@@ -246,6 +247,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
apiHttpClient: &apiClient,
Database: db, // set if monolith with global connection pool only
DatabaseWriter: writer, // set if monolith with global connection pool only
+ EnableMetrics: enableMetrics,
}
}
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 426f02bb..248b0e65 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -13,6 +13,7 @@ import (
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
+ "github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
@@ -21,6 +22,13 @@ type NATSInstance struct {
sync.Mutex
}
+func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
+ for _, stream := range streams { // streams are defined in streams.go
+ name := cfg.Prefixed(stream.Name)
+ _ = js.DeleteStream(name)
+ }
+}
+
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 99d1e40c..8ab13091 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -65,11 +65,13 @@ func NewRequestPool(
userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI,
rsAPI roomserverAPI.SyncRoomserverAPI,
streams *streams.Streams, notifier *notifier.Notifier,
- producer PresencePublisher,
+ producer PresencePublisher, enableMetrics bool,
) *RequestPool {
- prometheus.MustRegister(
- activeSyncRequests, waitingSyncRequests,
- )
+ if enableMetrics {
+ prometheus.MustRegister(
+ activeSyncRequests, waitingSyncRequests,
+ )
+ }
rp := &RequestPool{
db: db,
cfg: cfg,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index dbc6e240..d8bacb2d 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -65,7 +65,7 @@ func AddPublicRoutes(
JetStream: js,
}
- requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
+ requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer, base.EnableMetrics)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
new file mode 100644
index 00000000..12b5178d
--- /dev/null
+++ b/syncapi/syncapi_test.go
@@ -0,0 +1,162 @@
+package syncapi
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/test"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/nats-io/nats.go"
+)
+
+type syncRoomserverAPI struct {
+ rsapi.SyncRoomserverAPI
+ rooms []*test.Room
+}
+
+func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
+ var room *test.Room
+ for _, r := range s.rooms {
+ if r.ID == req.RoomID {
+ room = r
+ break
+ }
+ }
+ if room == nil {
+ res.RoomExists = false
+ return nil
+ }
+ res.RoomVersion = room.Version
+ return nil // TODO: return state
+}
+
+type syncUserAPI struct {
+ userapi.SyncUserAPI
+ accounts []userapi.Device
+}
+
+func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
+ for _, acc := range s.accounts {
+ if acc.AccessToken == req.AccessToken {
+ res.Device = &acc
+ return nil
+ }
+ }
+ res.Err = "unknown user"
+ return nil
+}
+
+func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
+ return nil
+}
+
+type syncKeyAPI struct {
+ keyapi.KeyInternalAPI
+}
+
+func TestSyncAPI(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ testSync(t, dbType)
+ })
+}
+
+func testSync(t *testing.T, dbType test.DBType) {
+ user := test.NewUser()
+ room := test.NewRoom(t, user)
+ alice := userapi.Device{
+ ID: "ALICEID",
+ UserID: user.ID,
+ AccessToken: "ALICE_BEARER_TOKEN",
+ DisplayName: "Alice",
+ AccountType: userapi.AccountTypeUser,
+ }
+
+ base, close := test.CreateBaseDendrite(t, dbType)
+ defer close()
+
+ jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+ var msgs []*nats.Msg
+ for _, ev := range room.Events() {
+ var addsStateIDs []string
+ if ev.StateKey() != nil {
+ addsStateIDs = append(addsStateIDs, ev.EventID())
+ }
+ msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{
+ Type: rsapi.OutputTypeNewRoomEvent,
+ NewRoomEvent: &rsapi.OutputNewRoomEvent{
+ Event: ev,
+ AddsStateEventIDs: addsStateIDs,
+ },
+ }))
+ }
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{})
+ test.MustPublishMsgs(t, jsctx, msgs...)
+
+ testCases := []struct {
+ name string
+ req *http.Request
+ wantCode int
+ wantJoinedRooms []string
+ }{
+ {
+ name: "missing access token",
+ req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ "timeout": "0",
+ })),
+ wantCode: 401,
+ },
+ {
+ name: "unknown access token",
+ req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ "access_token": "foo",
+ "timeout": "0",
+ })),
+ wantCode: 401,
+ },
+ {
+ name: "valid access token",
+ req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
+ "access_token": alice.AccessToken,
+ "timeout": "0",
+ })),
+ wantCode: 200,
+ wantJoinedRooms: []string{room.ID},
+ },
+ }
+ // TODO: find a better way
+ time.Sleep(500 * time.Millisecond)
+
+ for _, tc := range testCases {
+ w := httptest.NewRecorder()
+ base.PublicClientAPIMux.ServeHTTP(w, tc.req)
+ if w.Code != tc.wantCode {
+ t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
+ }
+ if tc.wantJoinedRooms != nil {
+ var res types.Response
+ if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
+ t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
+ }
+ if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
+ t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
+ }
+ t.Logf("res: %+v", res.Rooms.Join[room.ID])
+
+ gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
+ for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
+ gotEventIDs[i] = ev.EventID
+ }
+ test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
+ }
+ }
+}
diff --git a/test/base.go b/test/base.go
index 32fc8dc5..664442c0 100644
--- a/test/base.go
+++ b/test/base.go
@@ -1,11 +1,83 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package test
import (
+ "errors"
+ "fmt"
+ "io/fs"
+ "os"
+ "strings"
+ "testing"
+
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/nats-io/nats.go"
)
+func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) {
+ var cfg config.Dendrite
+ cfg.Defaults(false)
+ cfg.Global.JetStream.InMemory = true
+
+ switch dbType {
+ case DBTypePostgres:
+ cfg.Global.Defaults(true) // autogen a signing key
+ cfg.MediaAPI.Defaults(true) // autogen a media path
+ // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
+ // the file system event with InMemory=true :(
+ cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
+ connStr, close := PrepareDBConnectionString(t, dbType)
+ cfg.Global.DatabaseOptions = config.DatabaseOptions{
+ ConnectionString: config.DataSource(connStr),
+ MaxOpenConnections: 10,
+ MaxIdleConnections: 2,
+ ConnMaxLifetimeSeconds: 60,
+ }
+ return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close
+ case DBTypeSQLite:
+ cfg.Defaults(true) // sets a sqlite db per component
+ // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
+ // the file system event with InMemory=true :(
+ cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
+ return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), func() {
+ // cleanup db files. This risks getting out of sync as we add more database strings :(
+ dbFiles := []config.DataSource{
+ cfg.AppServiceAPI.Database.ConnectionString,
+ cfg.FederationAPI.Database.ConnectionString,
+ cfg.KeyServer.Database.ConnectionString,
+ cfg.MSCs.Database.ConnectionString,
+ cfg.MediaAPI.Database.ConnectionString,
+ cfg.RoomServer.Database.ConnectionString,
+ cfg.SyncAPI.Database.ConnectionString,
+ cfg.UserAPI.AccountDatabase.ConnectionString,
+ }
+ for _, fileURI := range dbFiles {
+ path := strings.TrimPrefix(string(fileURI), "file:")
+ err := os.Remove(path)
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ t.Fatalf("failed to cleanup sqlite db '%s': %s", fileURI, err)
+ }
+ }
+ }
+ default:
+ t.Fatalf("unknown db type: %v", dbType)
+ }
+ return nil, nil
+}
+
func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) {
if cfg == nil {
cfg = &config.Dendrite{}
diff --git a/test/http.go b/test/http.go
new file mode 100644
index 00000000..a458a338
--- /dev/null
+++ b/test/http.go
@@ -0,0 +1,45 @@
+package test
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/url"
+ "testing"
+)
+
+type HTTPRequestOpt func(req *http.Request)
+
+func WithJSONBody(t *testing.T, body interface{}) HTTPRequestOpt {
+ t.Helper()
+ b, err := json.Marshal(body)
+ if err != nil {
+ t.Fatalf("WithJSONBody: %s", err)
+ }
+ return func(req *http.Request) {
+ req.Body = io.NopCloser(bytes.NewBuffer(b))
+ }
+}
+
+func WithQueryParams(qps map[string]string) HTTPRequestOpt {
+ var vals url.Values = map[string][]string{}
+ for k, v := range qps {
+ vals.Set(k, v)
+ }
+ return func(req *http.Request) {
+ req.URL.RawQuery = vals.Encode()
+ }
+}
+
+func NewRequest(t *testing.T, method, path string, opts ...HTTPRequestOpt) *http.Request {
+ t.Helper()
+ req, err := http.NewRequest(method, "http://localhost"+path, nil)
+ if err != nil {
+ t.Fatalf("failed to make new HTTP request %v %v : %v", method, path, err)
+ }
+ for _, o := range opts {
+ o(req)
+ }
+ return req
+}
diff --git a/test/jetstream.go b/test/jetstream.go
new file mode 100644
index 00000000..488c22be
--- /dev/null
+++ b/test/jetstream.go
@@ -0,0 +1,35 @@
+package test
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/base"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/nats-io/nats.go"
+)
+
+func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
+ t.Helper()
+ for _, msg := range msgs {
+ if _, err := jsctx.PublishMsg(msg); err != nil {
+ t.Fatalf("MustPublishMsgs: failed to publish message: %s", err)
+ }
+ }
+}
+
+func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
+ t.Helper()
+ msg := &nats.Msg{
+ Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ Header: nats.Header{},
+ }
+ msg.Header.Set(jetstream.RoomID, roomID)
+ var err error
+ msg.Data, err = json.Marshal(update)
+ if err != nil {
+ t.Fatalf("failed to marshal update: %s", err)
+ }
+ return msg
+}