diff options
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r-- | roomserver/internal/input/input.go | 9 | ||||
-rw-r--r-- | roomserver/internal/input/input_test.go | 117 |
2 files changed, 55 insertions, 71 deletions
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 2ec19f01..cc0c673d 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -39,7 +39,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -74,7 +73,6 @@ import ( // or C. type Inputer struct { Cfg *config.RoomServer - Base *base.BaseDendrite ProcessContext *process.ProcessContext DB storage.RoomDatabase NATSClient *nats.Conn @@ -89,8 +87,9 @@ type Inputer struct { OutputProducer *producers.RoomEventProducer workers sync.Map // room ID -> *worker - Queryer *query.Queryer - UserAPI userapi.RoomserverUserAPI + Queryer *query.Queryer + UserAPI userapi.RoomserverUserAPI + enableMetrics bool } // If a room consumer is inactive for a while then we will allow NATS @@ -177,7 +176,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // will look to see if we have a worker for that room which has its // own consumer. If we don't, we'll start one. func (r *Inputer) Start() error { - if r.Base.EnableMetrics { + if r.enableMetrics { prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration) } _, err := r.JetStream.Subscribe( diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go index 555ec9c6..51c50c37 100644 --- a/roomserver/internal/input/input_test.go +++ b/roomserver/internal/input/input_test.go @@ -2,84 +2,69 @@ package input_test import ( "context" - "os" "testing" "time" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" - "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" ) -var js nats.JetStreamContext -var jc *nats.Conn +func TestSingleTransactionOnInput(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + cfg, processCtx, close := testrig.CreateConfig(t, dbType) + defer close() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) -func TestMain(m *testing.M) { - var b *base.BaseDendrite - b, js, jc = testrig.Base(nil) - code := m.Run() - b.ShutdownDendrite() - b.WaitForComponentsToFinish() - os.Exit(code) -} + natsInstance := &jetstream.NATSInstance{} + js, jc := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) -func TestSingleTransactionOnInput(t *testing.T) { - deadline, _ := t.Deadline() - if max := time.Now().Add(time.Second * 3); deadline.After(max) { - deadline = max - } - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() + deadline, _ := t.Deadline() + if max := time.Now().Add(time.Second * 3); deadline.Before(max) { + deadline = max + } + ctx, cancel := context.WithDeadline(processCtx.Context(), deadline) + defer cancel() + + event, err := gomatrixserverlib.NewEventFromTrustedJSON( + []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`), + false, gomatrixserverlib.RoomVersionV6, + ) + if err != nil { + t.Fatal(err) + } + in := api.InputRoomEvent{ + Kind: api.KindOutlier, // don't panic if we generate an output event + Event: event.Headered(gomatrixserverlib.RoomVersionV6), + } - event, err := gomatrixserverlib.NewEventFromTrustedJSON( - []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`), - false, gomatrixserverlib.RoomVersionV6, - ) - if err != nil { - t.Fatal(err) - } - in := api.InputRoomEvent{ - Kind: api.KindOutlier, // don't panic if we generate an output event - Event: event.Headered(gomatrixserverlib.RoomVersionV6), - } - cm := sqlutil.NewConnectionManager() - db, err := storage.Open( - context.Background(), cm, - &config.DatabaseOptions{ - ConnectionString: "", - MaxOpenConnections: 1, - MaxIdleConnections: 1, - }, - caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics), - ) - if err != nil { - t.Logf("PostgreSQL not available (%s), skipping", err) - t.SkipNow() - } - inputter := &input.Inputer{ - DB: db, - JetStream: js, - NATSClient: jc, - } - res := &api.InputRoomEventsResponse{} - inputter.InputRoomEvents( - ctx, - &api.InputRoomEventsRequest{ - InputRoomEvents: []api.InputRoomEvent{in}, - Asynchronous: false, - }, - res, - ) - // If we fail here then it's because we've hit the test deadline, - // so we probably deadlocked - if err := res.Err(); err != nil { - t.Fatal(err) - } + inputter := &input.Inputer{ + JetStream: js, + NATSClient: jc, + Cfg: &cfg.RoomServer, + } + res := &api.InputRoomEventsResponse{} + inputter.InputRoomEvents( + ctx, + &api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{in}, + Asynchronous: false, + }, + res, + ) + // If we fail here then it's because we've hit the test deadline, + // so we probably deadlocked + if err := res.Err(); err != nil { + t.Fatal(err) + } + }) } |