aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/internal')
-rw-r--r--roomserver/internal/alias.go8
-rw-r--r--roomserver/internal/api.go50
-rw-r--r--roomserver/internal/helpers/helpers_test.go2
-rw-r--r--roomserver/internal/input/input.go9
-rw-r--r--roomserver/internal/input/input_test.go117
5 files changed, 84 insertions, 102 deletions
diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go
index fc61b7f4..94b8b16c 100644
--- a/roomserver/internal/alias.go
+++ b/roomserver/internal/alias.go
@@ -117,7 +117,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
request *api.RemoveRoomAliasRequest,
response *api.RemoveRoomAliasResponse,
) error {
- _, virtualHost, err := r.Cfg.Matrix.SplitLocalID('@', request.UserID)
+ _, virtualHost, err := r.Cfg.Global.SplitLocalID('@', request.UserID)
if err != nil {
return err
}
@@ -175,12 +175,12 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
sender = ev.Sender()
}
- _, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', sender)
+ _, senderDomain, err := r.Cfg.Global.SplitLocalID('@', sender)
if err != nil {
return err
}
- identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain)
+ identity, err := r.Cfg.Global.SigningIdentityFor(senderDomain)
if err != nil {
return err
}
@@ -206,7 +206,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
return err
}
- newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, stateRes)
+ newEvent, err := eventutil.BuildEvent(ctx, builder, &r.Cfg.Global, identity, time.Now(), &eventsNeeded, stateRes)
if err != nil {
return err
}
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 2e987d68..7ca3675d 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -18,7 +18,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -41,9 +40,8 @@ type RoomserverInternalAPI struct {
*perform.Upgrader
*perform.Admin
ProcessContext *process.ProcessContext
- Base *base.BaseDendrite
DB storage.Database
- Cfg *config.RoomServer
+ Cfg *config.Dendrite
Cache caching.RoomServerCaches
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
@@ -56,43 +54,44 @@ type RoomserverInternalAPI struct {
InputRoomEventTopic string // JetStream topic for new input room events
OutputProducer *producers.RoomEventProducer
PerspectiveServerNames []gomatrixserverlib.ServerName
+ enableMetrics bool
}
func NewRoomserverAPI(
- base *base.BaseDendrite, roomserverDB storage.Database,
- js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches,
+ processContext *process.ProcessContext, dendriteCfg *config.Dendrite, roomserverDB storage.Database,
+ js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, enableMetrics bool,
) *RoomserverInternalAPI {
var perspectiveServerNames []gomatrixserverlib.ServerName
- for _, kp := range base.Cfg.FederationAPI.KeyPerspectives {
+ for _, kp := range dendriteCfg.FederationAPI.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
}
serverACLs := acls.NewServerACLs(roomserverDB)
producer := &producers.RoomEventProducer{
- Topic: string(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)),
+ Topic: string(dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)),
JetStream: js,
ACLs: serverACLs,
}
a := &RoomserverInternalAPI{
- ProcessContext: base.ProcessContext,
+ ProcessContext: processContext,
DB: roomserverDB,
- Base: base,
- Cfg: &base.Cfg.RoomServer,
+ Cfg: dendriteCfg,
Cache: caches,
- ServerName: base.Cfg.Global.ServerName,
+ ServerName: dendriteCfg.Global.ServerName,
PerspectiveServerNames: perspectiveServerNames,
- InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent),
+ InputRoomEventTopic: dendriteCfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent),
OutputProducer: producer,
JetStream: js,
NATSClient: nc,
- Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"),
+ Durable: dendriteCfg.Global.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
Queryer: &query.Queryer{
DB: roomserverDB,
Cache: caches,
- IsLocalServerName: base.Cfg.Global.IsLocalServerName,
+ IsLocalServerName: dendriteCfg.Global.IsLocalServerName,
ServerACLs: serverACLs,
},
+ enableMetrics: enableMetrics,
// perform-er structs get initialised when we have a federation sender to use
}
return a
@@ -105,15 +104,14 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
r.fsAPI = fsAPI
r.KeyRing = keyRing
- identity, err := r.Cfg.Matrix.SigningIdentityFor(r.ServerName)
+ identity, err := r.Cfg.Global.SigningIdentityFor(r.ServerName)
if err != nil {
logrus.Panic(err)
}
r.Inputer = &input.Inputer{
- Cfg: &r.Base.Cfg.RoomServer,
- Base: r.Base,
- ProcessContext: r.Base.ProcessContext,
+ Cfg: &r.Cfg.RoomServer,
+ ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
OutputProducer: r.OutputProducer,
@@ -129,12 +127,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
}
r.Inviter = &perform.Inviter{
DB: r.DB,
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Joiner = &perform.Joiner{
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
DB: r.DB,
FSAPI: r.fsAPI,
RSAPI: r,
@@ -143,7 +141,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
}
r.Peeker = &perform.Peeker{
ServerName: r.ServerName,
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
DB: r.DB,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
@@ -154,12 +152,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
}
r.Unpeeker = &perform.Unpeeker{
ServerName: r.ServerName,
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Leaver = &perform.Leaver{
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
DB: r.DB,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
@@ -168,7 +166,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
DB: r.DB,
}
r.Backfiller = &perform.Backfiller{
- IsLocalServerName: r.Cfg.Matrix.IsLocalServerName,
+ IsLocalServerName: r.Cfg.Global.IsLocalServerName,
DB: r.DB,
FSAPI: r.fsAPI,
KeyRing: r.KeyRing,
@@ -181,12 +179,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
DB: r.DB,
}
r.Upgrader = &perform.Upgrader{
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
URSAPI: r,
}
r.Admin = &perform.Admin{
DB: r.DB,
- Cfg: r.Cfg,
+ Cfg: &r.Cfg.RoomServer,
Inputer: r.Inputer,
Queryer: r.Queryer,
Leaver: r.Leaver,
diff --git a/roomserver/internal/helpers/helpers_test.go b/roomserver/internal/helpers/helpers_test.go
index 03a8bf57..dd74b844 100644
--- a/roomserver/internal/helpers/helpers_test.go
+++ b/roomserver/internal/helpers/helpers_test.go
@@ -19,7 +19,7 @@ import (
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
conStr, close := test.PrepareDBConnectionString(t, dbType)
caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics)
- cm := sqlutil.NewConnectionManager()
+ cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
db, err := storage.Open(context.Background(), cm, &config.DatabaseOptions{ConnectionString: config.DataSource(conStr)}, caches)
if err != nil {
t.Fatalf("failed to create Database: %v", err)
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index 2ec19f01..cc0c673d 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -39,7 +39,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -74,7 +73,6 @@ import (
// or C.
type Inputer struct {
Cfg *config.RoomServer
- Base *base.BaseDendrite
ProcessContext *process.ProcessContext
DB storage.RoomDatabase
NATSClient *nats.Conn
@@ -89,8 +87,9 @@ type Inputer struct {
OutputProducer *producers.RoomEventProducer
workers sync.Map // room ID -> *worker
- Queryer *query.Queryer
- UserAPI userapi.RoomserverUserAPI
+ Queryer *query.Queryer
+ UserAPI userapi.RoomserverUserAPI
+ enableMetrics bool
}
// If a room consumer is inactive for a while then we will allow NATS
@@ -177,7 +176,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// will look to see if we have a worker for that room which has its
// own consumer. If we don't, we'll start one.
func (r *Inputer) Start() error {
- if r.Base.EnableMetrics {
+ if r.enableMetrics {
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
}
_, err := r.JetStream.Subscribe(
diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go
index 555ec9c6..51c50c37 100644
--- a/roomserver/internal/input/input_test.go
+++ b/roomserver/internal/input/input_test.go
@@ -2,84 +2,69 @@ package input_test
import (
"context"
- "os"
"testing"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input"
- "github.com/matrix-org/dendrite/roomserver/storage"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
)
-var js nats.JetStreamContext
-var jc *nats.Conn
+func TestSingleTransactionOnInput(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ cfg, processCtx, close := testrig.CreateConfig(t, dbType)
+ defer close()
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
-func TestMain(m *testing.M) {
- var b *base.BaseDendrite
- b, js, jc = testrig.Base(nil)
- code := m.Run()
- b.ShutdownDendrite()
- b.WaitForComponentsToFinish()
- os.Exit(code)
-}
+ natsInstance := &jetstream.NATSInstance{}
+ js, jc := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
+ caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics)
+ rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
+ rsAPI.SetFederationAPI(nil, nil)
-func TestSingleTransactionOnInput(t *testing.T) {
- deadline, _ := t.Deadline()
- if max := time.Now().Add(time.Second * 3); deadline.After(max) {
- deadline = max
- }
- ctx, cancel := context.WithDeadline(context.Background(), deadline)
- defer cancel()
+ deadline, _ := t.Deadline()
+ if max := time.Now().Add(time.Second * 3); deadline.Before(max) {
+ deadline = max
+ }
+ ctx, cancel := context.WithDeadline(processCtx.Context(), deadline)
+ defer cancel()
+
+ event, err := gomatrixserverlib.NewEventFromTrustedJSON(
+ []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`),
+ false, gomatrixserverlib.RoomVersionV6,
+ )
+ if err != nil {
+ t.Fatal(err)
+ }
+ in := api.InputRoomEvent{
+ Kind: api.KindOutlier, // don't panic if we generate an output event
+ Event: event.Headered(gomatrixserverlib.RoomVersionV6),
+ }
- event, err := gomatrixserverlib.NewEventFromTrustedJSON(
- []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`),
- false, gomatrixserverlib.RoomVersionV6,
- )
- if err != nil {
- t.Fatal(err)
- }
- in := api.InputRoomEvent{
- Kind: api.KindOutlier, // don't panic if we generate an output event
- Event: event.Headered(gomatrixserverlib.RoomVersionV6),
- }
- cm := sqlutil.NewConnectionManager()
- db, err := storage.Open(
- context.Background(), cm,
- &config.DatabaseOptions{
- ConnectionString: "",
- MaxOpenConnections: 1,
- MaxIdleConnections: 1,
- },
- caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics),
- )
- if err != nil {
- t.Logf("PostgreSQL not available (%s), skipping", err)
- t.SkipNow()
- }
- inputter := &input.Inputer{
- DB: db,
- JetStream: js,
- NATSClient: jc,
- }
- res := &api.InputRoomEventsResponse{}
- inputter.InputRoomEvents(
- ctx,
- &api.InputRoomEventsRequest{
- InputRoomEvents: []api.InputRoomEvent{in},
- Asynchronous: false,
- },
- res,
- )
- // If we fail here then it's because we've hit the test deadline,
- // so we probably deadlocked
- if err := res.Err(); err != nil {
- t.Fatal(err)
- }
+ inputter := &input.Inputer{
+ JetStream: js,
+ NATSClient: jc,
+ Cfg: &cfg.RoomServer,
+ }
+ res := &api.InputRoomEventsResponse{}
+ inputter.InputRoomEvents(
+ ctx,
+ &api.InputRoomEventsRequest{
+ InputRoomEvents: []api.InputRoomEvent{in},
+ Asynchronous: false,
+ },
+ res,
+ )
+ // If we fail here then it's because we've hit the test deadline,
+ // so we probably deadlocked
+ if err := res.Err(); err != nil {
+ t.Fatal(err)
+ }
+ })
}