aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/input
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r--roomserver/internal/input/input.go9
-rw-r--r--roomserver/internal/input/input_test.go117
2 files changed, 55 insertions, 71 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 2ec19f01..cc0c673d 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -39,7 +39,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -74,7 +73,6 @@ import (
// or C.
type Inputer struct {
Cfg *config.RoomServer
- Base *base.BaseDendrite
ProcessContext *process.ProcessContext
DB storage.RoomDatabase
NATSClient *nats.Conn
@@ -89,8 +87,9 @@ type Inputer struct {
OutputProducer *producers.RoomEventProducer
workers sync.Map // room ID -> *worker
- Queryer *query.Queryer
- UserAPI userapi.RoomserverUserAPI
+ Queryer *query.Queryer
+ UserAPI userapi.RoomserverUserAPI
+ enableMetrics bool
}
// If a room consumer is inactive for a while then we will allow NATS
@@ -177,7 +176,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// will look to see if we have a worker for that room which has its
// own consumer. If we don't, we'll start one.
func (r *Inputer) Start() error {
- if r.Base.EnableMetrics {
+ if r.enableMetrics {
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
}
_, err := r.JetStream.Subscribe(
diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go
index 555ec9c6..51c50c37 100644
--- a/roomserver/internal/input/input_test.go
+++ b/roomserver/internal/input/input_test.go
@@ -2,84 +2,69 @@ package input_test
import (
"context"
- "os"
"testing"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input"
- "github.com/matrix-org/dendrite/roomserver/storage"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
)
-var js nats.JetStreamContext
-var jc *nats.Conn
+func TestSingleTransactionOnInput(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ defer close()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
-func TestMain(m *testing.M) {
- var b *base.BaseDendrite
- b, js, jc = testrig.Base(nil)
- code := m.Run()
- b.ShutdownDendrite()
- b.WaitForComponentsToFinish()
- os.Exit(code)
-}
+ natsInstance := &jetstream.NATSInstance{}
+ js, jc := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics)
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
-func TestSingleTransactionOnInput(t *testing.T) {
- deadline, _ := t.Deadline()
- if max := time.Now().Add(time.Second * 3); deadline.After(max) {
- deadline = max
- }
- ctx, cancel := context.WithDeadline(context.Background(), deadline)
- defer cancel()
+ deadline, _ := t.Deadline()
+ if max := time.Now().Add(time.Second * 3); deadline.Before(max) {
+ deadline = max
+ }
+ ctx, cancel := context.WithDeadline(processCtx.Context(), deadline)
+ defer cancel()
+
+ event, err := gomatrixserverlib.NewEventFromTrustedJSON(
+ []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`),
+ false, gomatrixserverlib.RoomVersionV6,
+ )
+ if err != nil {
+ t.Fatal(err)
+ }
+ in := api.InputRoomEvent{
+ Kind: api.KindOutlier, // don't panic if we generate an output event
+ Event: event.Headered(gomatrixserverlib.RoomVersionV6),
+ }
- event, err := gomatrixserverlib.NewEventFromTrustedJSON(
- []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`),
- false, gomatrixserverlib.RoomVersionV6,
- )
- if err != nil {
- t.Fatal(err)
- }
- in := api.InputRoomEvent{
- Kind: api.KindOutlier, // don't panic if we generate an output event
- Event: event.Headered(gomatrixserverlib.RoomVersionV6),
- }
- cm := sqlutil.NewConnectionManager()
- db, err := storage.Open(
- context.Background(), cm,
- &config.DatabaseOptions{
- ConnectionString: "",
- MaxOpenConnections: 1,
- MaxIdleConnections: 1,
- },
- caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics),
- )
- if err != nil {
- t.Logf("PostgreSQL not available (%s), skipping", err)
- t.SkipNow()
- }
- inputter := &input.Inputer{
- DB: db,
- JetStream: js,
- NATSClient: jc,
- }
- res := &api.InputRoomEventsResponse{}
- inputter.InputRoomEvents(
- ctx,
- &api.InputRoomEventsRequest{
- InputRoomEvents: []api.InputRoomEvent{in},
- Asynchronous: false,
- },
- res,
- )
- // If we fail here then it's because we've hit the test deadline,
- // so we probably deadlocked
- if err := res.Err(); err != nil {
- t.Fatal(err)
- }
+ inputter := &input.Inputer{
+ JetStream: js,
+ NATSClient: jc,
+ Cfg: &cfg.RoomServer,
+ }
+ res := &api.InputRoomEventsResponse{}
+ inputter.InputRoomEvents(
+ ctx,
+ &api.InputRoomEventsRequest{
+ InputRoomEvents: []api.InputRoomEvent{in},
+ Asynchronous: false,
+ },
+ res,
+ )
+ // If we fail here then it's because we've hit the test deadline,
+ // so we probably deadlocked
+ if err := res.Err(); err != nil {
+ t.Fatal(err)
+ }
+ })
}