diff options
Diffstat (limited to 'roomserver/internal')
-rw-r--r-- | roomserver/internal/alias.go | 8 | ||||
-rw-r--r-- | roomserver/internal/api.go | 50 | ||||
-rw-r--r-- | roomserver/internal/helpers/helpers_test.go | 2 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 9 | ||||
-rw-r--r-- | roomserver/internal/input/input_test.go | 117 |
5 files changed, 84 insertions, 102 deletions
diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index fc61b7f4..94b8b16c 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -117,7 +117,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( request *api.RemoveRoomAliasRequest, response *api.RemoveRoomAliasResponse, ) error { - _, virtualHost, err := r.Cfg.Matrix.SplitLocalID('@', request.UserID) + _, virtualHost, err := r.Cfg.Global.SplitLocalID('@', request.UserID) if err != nil { return err } @@ -175,12 +175,12 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( sender = ev.Sender() } - _, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', sender) + _, senderDomain, err := r.Cfg.Global.SplitLocalID('@', sender) if err != nil { return err } - identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain) + identity, err := r.Cfg.Global.SigningIdentityFor(senderDomain) if err != nil { return err } @@ -206,7 +206,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, stateRes) + newEvent, err := eventutil.BuildEvent(ctx, builder, &r.Cfg.Global, identity, time.Now(), &eventsNeeded, stateRes) if err != nil { return err } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 2e987d68..7ca3675d 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -18,7 +18,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -41,9 +40,8 @@ type RoomserverInternalAPI struct { *perform.Upgrader *perform.Admin ProcessContext *process.ProcessContext - Base *base.BaseDendrite DB storage.Database - Cfg *config.RoomServer + Cfg *config.Dendrite Cache caching.RoomServerCaches ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier @@ -56,43 +54,44 @@ type RoomserverInternalAPI struct { InputRoomEventTopic string // JetStream topic for new input room events OutputProducer *producers.RoomEventProducer PerspectiveServerNames []gomatrixserverlib.ServerName + enableMetrics bool } func NewRoomserverAPI( - base *base.BaseDendrite, roomserverDB storage.Database, - js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, + processContext *process.ProcessContext, dendriteCfg *config.Dendrite, roomserverDB storage.Database, + js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, enableMetrics bool, ) *RoomserverInternalAPI { var perspectiveServerNames []gomatrixserverlib.ServerName - for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { + for _, kp := range dendriteCfg.FederationAPI.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) } serverACLs := acls.NewServerACLs(roomserverDB) producer := &producers.RoomEventProducer{ - Topic: string(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)), + Topic: string(dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)), JetStream: js, ACLs: serverACLs, } a := &RoomserverInternalAPI{ - ProcessContext: base.ProcessContext, + ProcessContext: processContext, DB: roomserverDB, - Base: base, - Cfg: &base.Cfg.RoomServer, + Cfg: dendriteCfg, Cache: caches, - ServerName: base.Cfg.Global.ServerName, + ServerName: dendriteCfg.Global.ServerName, PerspectiveServerNames: perspectiveServerNames, - InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent), + InputRoomEventTopic: dendriteCfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent), OutputProducer: producer, JetStream: js, NATSClient: nc, - Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"), + Durable: dendriteCfg.Global.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, Queryer: &query.Queryer{ DB: roomserverDB, Cache: caches, - IsLocalServerName: base.Cfg.Global.IsLocalServerName, + IsLocalServerName: dendriteCfg.Global.IsLocalServerName, ServerACLs: serverACLs, }, + enableMetrics: enableMetrics, // perform-er structs get initialised when we have a federation sender to use } return a @@ -105,15 +104,14 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio r.fsAPI = fsAPI r.KeyRing = keyRing - identity, err := r.Cfg.Matrix.SigningIdentityFor(r.ServerName) + identity, err := r.Cfg.Global.SigningIdentityFor(r.ServerName) if err != nil { logrus.Panic(err) } r.Inputer = &input.Inputer{ - Cfg: &r.Base.Cfg.RoomServer, - Base: r.Base, - ProcessContext: r.Base.ProcessContext, + Cfg: &r.Cfg.RoomServer, + ProcessContext: r.ProcessContext, DB: r.DB, InputRoomEventTopic: r.InputRoomEventTopic, OutputProducer: r.OutputProducer, @@ -129,12 +127,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio } r.Inviter = &perform.Inviter{ DB: r.DB, - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, FSAPI: r.fsAPI, Inputer: r.Inputer, } r.Joiner = &perform.Joiner{ - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, DB: r.DB, FSAPI: r.fsAPI, RSAPI: r, @@ -143,7 +141,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio } r.Peeker = &perform.Peeker{ ServerName: r.ServerName, - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, DB: r.DB, FSAPI: r.fsAPI, Inputer: r.Inputer, @@ -154,12 +152,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio } r.Unpeeker = &perform.Unpeeker{ ServerName: r.ServerName, - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, FSAPI: r.fsAPI, Inputer: r.Inputer, } r.Leaver = &perform.Leaver{ - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, DB: r.DB, FSAPI: r.fsAPI, Inputer: r.Inputer, @@ -168,7 +166,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio DB: r.DB, } r.Backfiller = &perform.Backfiller{ - IsLocalServerName: r.Cfg.Matrix.IsLocalServerName, + IsLocalServerName: r.Cfg.Global.IsLocalServerName, DB: r.DB, FSAPI: r.fsAPI, KeyRing: r.KeyRing, @@ -181,12 +179,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio DB: r.DB, } r.Upgrader = &perform.Upgrader{ - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, URSAPI: r, } r.Admin = &perform.Admin{ DB: r.DB, - Cfg: r.Cfg, + Cfg: &r.Cfg.RoomServer, Inputer: r.Inputer, Queryer: r.Queryer, Leaver: r.Leaver, diff --git a/roomserver/internal/helpers/helpers_test.go b/roomserver/internal/helpers/helpers_test.go index 03a8bf57..dd74b844 100644 --- a/roomserver/internal/helpers/helpers_test.go +++ b/roomserver/internal/helpers/helpers_test.go @@ -19,7 +19,7 @@ import ( func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { conStr, close := test.PrepareDBConnectionString(t, dbType) caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics) - cm := sqlutil.NewConnectionManager() + cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{}) db, err := storage.Open(context.Background(), cm, &config.DatabaseOptions{ConnectionString: config.DataSource(conStr)}, caches) if err != nil { t.Fatalf("failed to create Database: %v", err) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 2ec19f01..cc0c673d 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -39,7 +39,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -74,7 +73,6 @@ import ( // or C. type Inputer struct { Cfg *config.RoomServer - Base *base.BaseDendrite ProcessContext *process.ProcessContext DB storage.RoomDatabase NATSClient *nats.Conn @@ -89,8 +87,9 @@ type Inputer struct { OutputProducer *producers.RoomEventProducer workers sync.Map // room ID -> *worker - Queryer *query.Queryer - UserAPI userapi.RoomserverUserAPI + Queryer *query.Queryer + UserAPI userapi.RoomserverUserAPI + enableMetrics bool } // If a room consumer is inactive for a while then we will allow NATS @@ -177,7 +176,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // will look to see if we have a worker for that room which has its // own consumer. If we don't, we'll start one. func (r *Inputer) Start() error { - if r.Base.EnableMetrics { + if r.enableMetrics { prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration) } _, err := r.JetStream.Subscribe( diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go index 555ec9c6..51c50c37 100644 --- a/roomserver/internal/input/input_test.go +++ b/roomserver/internal/input/input_test.go @@ -2,84 +2,69 @@ package input_test import ( "context" - "os" "testing" "time" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" - "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" ) -var js nats.JetStreamContext -var jc *nats.Conn +func TestSingleTransactionOnInput(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + cfg, processCtx, close := testrig.CreateConfig(t, dbType) + defer close() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) -func TestMain(m *testing.M) { - var b *base.BaseDendrite - b, js, jc = testrig.Base(nil) - code := m.Run() - b.ShutdownDendrite() - b.WaitForComponentsToFinish() - os.Exit(code) -} + natsInstance := &jetstream.NATSInstance{} + js, jc := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) -func TestSingleTransactionOnInput(t *testing.T) { - deadline, _ := t.Deadline() - if max := time.Now().Add(time.Second * 3); deadline.After(max) { - deadline = max - } - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() + deadline, _ := t.Deadline() + if max := time.Now().Add(time.Second * 3); deadline.Before(max) { + deadline = max + } + ctx, cancel := context.WithDeadline(processCtx.Context(), deadline) + defer cancel() + + event, err := gomatrixserverlib.NewEventFromTrustedJSON( + []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`), + false, gomatrixserverlib.RoomVersionV6, + ) + if err != nil { + t.Fatal(err) + } + in := api.InputRoomEvent{ + Kind: api.KindOutlier, // don't panic if we generate an output event + Event: event.Headered(gomatrixserverlib.RoomVersionV6), + } - event, err := gomatrixserverlib.NewEventFromTrustedJSON( - []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`), - false, gomatrixserverlib.RoomVersionV6, - ) - if err != nil { - t.Fatal(err) - } - in := api.InputRoomEvent{ - Kind: api.KindOutlier, // don't panic if we generate an output event - Event: event.Headered(gomatrixserverlib.RoomVersionV6), - } - cm := sqlutil.NewConnectionManager() - db, err := storage.Open( - context.Background(), cm, - &config.DatabaseOptions{ - ConnectionString: "", - MaxOpenConnections: 1, - MaxIdleConnections: 1, - }, - caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics), - ) - if err != nil { - t.Logf("PostgreSQL not available (%s), skipping", err) - t.SkipNow() - } - inputter := &input.Inputer{ - DB: db, - JetStream: js, - NATSClient: jc, - } - res := &api.InputRoomEventsResponse{} - inputter.InputRoomEvents( - ctx, - &api.InputRoomEventsRequest{ - InputRoomEvents: []api.InputRoomEvent{in}, - Asynchronous: false, - }, - res, - ) - // If we fail here then it's because we've hit the test deadline, - // so we probably deadlocked - if err := res.Err(); err != nil { - t.Fatal(err) - } + inputter := &input.Inputer{ + JetStream: js, + NATSClient: jc, + Cfg: &cfg.RoomServer, + } + res := &api.InputRoomEventsResponse{} + inputter.InputRoomEvents( + ctx, + &api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{in}, + Asynchronous: false, + }, + res, + ) + // If we fail here then it's because we've hit the test deadline, + // so we probably deadlocked + if err := res.Err(); err != nil { + t.Fatal(err) + } + }) } |