aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-23 17:55:12 +0000
committerGitHub <noreply@github.com>2023-01-23 17:55:12 +0000
commit5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch)
treeb6dac51b6be7a1e591f24881ee1bfae1b92088e9 /test
parent48fa869fa3578741d1d5775d30f24f6b097ab995 (diff)
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
Diffstat (limited to 'test')
-rw-r--r--test/db.go1
-rw-r--r--test/memory_federation_db.go488
-rw-r--r--test/memory_relay_db.go140
-rw-r--r--test/testrig/base.go4
4 files changed, 631 insertions, 2 deletions
diff --git a/test/db.go b/test/db.go
index 54ded6ad..d2f405d4 100644
--- a/test/db.go
+++ b/test/db.go
@@ -101,7 +101,6 @@ func currentUser() string {
// Returns the connection string to use and a close function which must be called when the test finishes.
// Calling this function twice will return the same database, which will have data from previous tests
// unless close() is called.
-// TODO: namespace for concurrent package tests
func PrepareDBConnectionString(t *testing.T, dbType DBType) (connStr string, close func()) {
if dbType == DBTypeSQLite {
// this will be made in the t.TempDir, which is unique per test
diff --git a/test/memory_federation_db.go b/test/memory_federation_db.go
new file mode 100644
index 00000000..cc9e1e8f
--- /dev/null
+++ b/test/memory_federation_db.go
@@ -0,0 +1,488 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
+ "github.com/matrix-org/dendrite/federationapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+var nidMutex sync.Mutex
+var nid = int64(0)
+
+type InMemoryFederationDatabase struct {
+ dbMutex sync.Mutex
+ pendingPDUServers map[gomatrixserverlib.ServerName]struct{}
+ pendingEDUServers map[gomatrixserverlib.ServerName]struct{}
+ blacklistedServers map[gomatrixserverlib.ServerName]struct{}
+ assumedOffline map[gomatrixserverlib.ServerName]struct{}
+ pendingPDUs map[*receipt.Receipt]*gomatrixserverlib.HeaderedEvent
+ pendingEDUs map[*receipt.Receipt]*gomatrixserverlib.EDU
+ associatedPDUs map[gomatrixserverlib.ServerName]map[*receipt.Receipt]struct{}
+ associatedEDUs map[gomatrixserverlib.ServerName]map[*receipt.Receipt]struct{}
+ relayServers map[gomatrixserverlib.ServerName][]gomatrixserverlib.ServerName
+}
+
+func NewInMemoryFederationDatabase() *InMemoryFederationDatabase {
+ return &InMemoryFederationDatabase{
+ pendingPDUServers: make(map[gomatrixserverlib.ServerName]struct{}),
+ pendingEDUServers: make(map[gomatrixserverlib.ServerName]struct{}),
+ blacklistedServers: make(map[gomatrixserverlib.ServerName]struct{}),
+ assumedOffline: make(map[gomatrixserverlib.ServerName]struct{}),
+ pendingPDUs: make(map[*receipt.Receipt]*gomatrixserverlib.HeaderedEvent),
+ pendingEDUs: make(map[*receipt.Receipt]*gomatrixserverlib.EDU),
+ associatedPDUs: make(map[gomatrixserverlib.ServerName]map[*receipt.Receipt]struct{}),
+ associatedEDUs: make(map[gomatrixserverlib.ServerName]map[*receipt.Receipt]struct{}),
+ relayServers: make(map[gomatrixserverlib.ServerName][]gomatrixserverlib.ServerName),
+ }
+}
+
+func (d *InMemoryFederationDatabase) StoreJSON(
+ ctx context.Context,
+ js string,
+) (*receipt.Receipt, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ var event gomatrixserverlib.HeaderedEvent
+ if err := json.Unmarshal([]byte(js), &event); err == nil {
+ nidMutex.Lock()
+ defer nidMutex.Unlock()
+ nid++
+ newReceipt := receipt.NewReceipt(nid)
+ d.pendingPDUs[&newReceipt] = &event
+ return &newReceipt, nil
+ }
+
+ var edu gomatrixserverlib.EDU
+ if err := json.Unmarshal([]byte(js), &edu); err == nil {
+ nidMutex.Lock()
+ defer nidMutex.Unlock()
+ nid++
+ newReceipt := receipt.NewReceipt(nid)
+ d.pendingEDUs[&newReceipt] = &edu
+ return &newReceipt, nil
+ }
+
+ return nil, errors.New("Failed to determine type of json to store")
+}
+
+func (d *InMemoryFederationDatabase) GetPendingPDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ limit int,
+) (pdus map[*receipt.Receipt]*gomatrixserverlib.HeaderedEvent, err error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ pduCount := 0
+ pdus = make(map[*receipt.Receipt]*gomatrixserverlib.HeaderedEvent)
+ if receipts, ok := d.associatedPDUs[serverName]; ok {
+ for dbReceipt := range receipts {
+ if event, ok := d.pendingPDUs[dbReceipt]; ok {
+ pdus[dbReceipt] = event
+ pduCount++
+ if pduCount == limit {
+ break
+ }
+ }
+ }
+ }
+ return pdus, nil
+}
+
+func (d *InMemoryFederationDatabase) GetPendingEDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ limit int,
+) (edus map[*receipt.Receipt]*gomatrixserverlib.EDU, err error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ eduCount := 0
+ edus = make(map[*receipt.Receipt]*gomatrixserverlib.EDU)
+ if receipts, ok := d.associatedEDUs[serverName]; ok {
+ for dbReceipt := range receipts {
+ if event, ok := d.pendingEDUs[dbReceipt]; ok {
+ edus[dbReceipt] = event
+ eduCount++
+ if eduCount == limit {
+ break
+ }
+ }
+ }
+ }
+ return edus, nil
+}
+
+func (d *InMemoryFederationDatabase) AssociatePDUWithDestinations(
+ ctx context.Context,
+ destinations map[gomatrixserverlib.ServerName]struct{},
+ dbReceipt *receipt.Receipt,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ if _, ok := d.pendingPDUs[dbReceipt]; ok {
+ for destination := range destinations {
+ if _, ok := d.associatedPDUs[destination]; !ok {
+ d.associatedPDUs[destination] = make(map[*receipt.Receipt]struct{})
+ }
+ d.associatedPDUs[destination][dbReceipt] = struct{}{}
+ }
+
+ return nil
+ } else {
+ return errors.New("PDU doesn't exist")
+ }
+}
+
+func (d *InMemoryFederationDatabase) AssociateEDUWithDestinations(
+ ctx context.Context,
+ destinations map[gomatrixserverlib.ServerName]struct{},
+ dbReceipt *receipt.Receipt,
+ eduType string,
+ expireEDUTypes map[string]time.Duration,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ if _, ok := d.pendingEDUs[dbReceipt]; ok {
+ for destination := range destinations {
+ if _, ok := d.associatedEDUs[destination]; !ok {
+ d.associatedEDUs[destination] = make(map[*receipt.Receipt]struct{})
+ }
+ d.associatedEDUs[destination][dbReceipt] = struct{}{}
+ }
+
+ return nil
+ } else {
+ return errors.New("EDU doesn't exist")
+ }
+}
+
+func (d *InMemoryFederationDatabase) CleanPDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ receipts []*receipt.Receipt,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ if pdus, ok := d.associatedPDUs[serverName]; ok {
+ for _, dbReceipt := range receipts {
+ delete(pdus, dbReceipt)
+ }
+ }
+
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) CleanEDUs(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ receipts []*receipt.Receipt,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ if edus, ok := d.associatedEDUs[serverName]; ok {
+ for _, dbReceipt := range receipts {
+ delete(edus, dbReceipt)
+ }
+ }
+
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) GetPendingPDUCount(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+) (int64, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ var count int64
+ if pdus, ok := d.associatedPDUs[serverName]; ok {
+ count = int64(len(pdus))
+ }
+ return count, nil
+}
+
+func (d *InMemoryFederationDatabase) GetPendingEDUCount(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+) (int64, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ var count int64
+ if edus, ok := d.associatedEDUs[serverName]; ok {
+ count = int64(len(edus))
+ }
+ return count, nil
+}
+
+func (d *InMemoryFederationDatabase) GetPendingPDUServerNames(
+ ctx context.Context,
+) ([]gomatrixserverlib.ServerName, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ servers := []gomatrixserverlib.ServerName{}
+ for server := range d.pendingPDUServers {
+ servers = append(servers, server)
+ }
+ return servers, nil
+}
+
+func (d *InMemoryFederationDatabase) GetPendingEDUServerNames(
+ ctx context.Context,
+) ([]gomatrixserverlib.ServerName, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ servers := []gomatrixserverlib.ServerName{}
+ for server := range d.pendingEDUServers {
+ servers = append(servers, server)
+ }
+ return servers, nil
+}
+
+func (d *InMemoryFederationDatabase) AddServerToBlacklist(
+ serverName gomatrixserverlib.ServerName,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ d.blacklistedServers[serverName] = struct{}{}
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) RemoveServerFromBlacklist(
+ serverName gomatrixserverlib.ServerName,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ delete(d.blacklistedServers, serverName)
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) RemoveAllServersFromBlacklist() error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ d.blacklistedServers = make(map[gomatrixserverlib.ServerName]struct{})
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) IsServerBlacklisted(
+ serverName gomatrixserverlib.ServerName,
+) (bool, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ isBlacklisted := false
+ if _, ok := d.blacklistedServers[serverName]; ok {
+ isBlacklisted = true
+ }
+
+ return isBlacklisted, nil
+}
+
+func (d *InMemoryFederationDatabase) SetServerAssumedOffline(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ d.assumedOffline[serverName] = struct{}{}
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) RemoveServerAssumedOffline(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ delete(d.assumedOffline, serverName)
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) RemoveAllServersAssumedOffine(
+ ctx context.Context,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ d.assumedOffline = make(map[gomatrixserverlib.ServerName]struct{})
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) IsServerAssumedOffline(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+) (bool, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ assumedOffline := false
+ if _, ok := d.assumedOffline[serverName]; ok {
+ assumedOffline = true
+ }
+
+ return assumedOffline, nil
+}
+
+func (d *InMemoryFederationDatabase) P2PGetRelayServersForServer(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+) ([]gomatrixserverlib.ServerName, error) {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ knownRelayServers := []gomatrixserverlib.ServerName{}
+ if relayServers, ok := d.relayServers[serverName]; ok {
+ knownRelayServers = relayServers
+ }
+
+ return knownRelayServers, nil
+}
+
+func (d *InMemoryFederationDatabase) P2PAddRelayServersForServer(
+ ctx context.Context,
+ serverName gomatrixserverlib.ServerName,
+ relayServers []gomatrixserverlib.ServerName,
+) error {
+ d.dbMutex.Lock()
+ defer d.dbMutex.Unlock()
+
+ if knownRelayServers, ok := d.relayServers[serverName]; ok {
+ for _, relayServer := range relayServers {
+ alreadyKnown := false
+ for _, knownRelayServer := range knownRelayServers {
+ if relayServer == knownRelayServer {
+ alreadyKnown = true
+ }
+ }
+ if !alreadyKnown {
+ d.relayServers[serverName] = append(d.relayServers[serverName], relayServer)
+ }
+ }
+ } else {
+ d.relayServers[serverName] = relayServers
+ }
+
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) FetcherName() string {
+ return ""
+}
+
+func (d *InMemoryFederationDatabase) StoreKeys(ctx context.Context, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) UpdateRoom(ctx context.Context, roomID string, addHosts []types.JoinedHost, removeHosts []string, purgeRoomFirst bool) (joinedHosts []types.JoinedHost, err error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string, excludeSelf, excludeBlacklisted bool) ([]gomatrixserverlib.ServerName, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) RemoveAllServersAssumedOffline(ctx context.Context) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) P2PRemoveRelayServersForServer(ctx context.Context, serverName gomatrixserverlib.ServerName, relayServers []gomatrixserverlib.ServerName) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) P2PRemoveAllRelayServersForServer(ctx context.Context, serverName gomatrixserverlib.ServerName) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) {
+ return nil, nil
+}
+
+func (d *InMemoryFederationDatabase) DeleteExpiredEDUs(ctx context.Context) error {
+ return nil
+}
+
+func (d *InMemoryFederationDatabase) PurgeRoom(ctx context.Context, roomID string) error {
+ return nil
+}
diff --git a/test/memory_relay_db.go b/test/memory_relay_db.go
new file mode 100644
index 00000000..db93919d
--- /dev/null
+++ b/test/memory_relay_db.go
@@ -0,0 +1,140 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package test
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "sync"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type InMemoryRelayDatabase struct {
+ nid int64
+ nidMutex sync.Mutex
+ transactions map[int64]json.RawMessage
+ associations map[gomatrixserverlib.ServerName][]int64
+}
+
+func NewInMemoryRelayDatabase() *InMemoryRelayDatabase {
+ return &InMemoryRelayDatabase{
+ nid: 1,
+ nidMutex: sync.Mutex{},
+ transactions: make(map[int64]json.RawMessage),
+ associations: make(map[gomatrixserverlib.ServerName][]int64),
+ }
+}
+
+func (d *InMemoryRelayDatabase) InsertQueueEntry(
+ ctx context.Context,
+ txn *sql.Tx,
+ transactionID gomatrixserverlib.TransactionID,
+ serverName gomatrixserverlib.ServerName,
+ nid int64,
+) error {
+ if _, ok := d.associations[serverName]; !ok {
+ d.associations[serverName] = []int64{}
+ }
+ d.associations[serverName] = append(d.associations[serverName], nid)
+ return nil
+}
+
+func (d *InMemoryRelayDatabase) DeleteQueueEntries(
+ ctx context.Context,
+ txn *sql.Tx,
+ serverName gomatrixserverlib.ServerName,
+ jsonNIDs []int64,
+) error {
+ for _, nid := range jsonNIDs {
+ for index, associatedNID := range d.associations[serverName] {
+ if associatedNID == nid {
+ d.associations[serverName] = append(d.associations[serverName][:index], d.associations[serverName][index+1:]...)
+ }
+ }
+ }
+
+ return nil
+}
+
+func (d *InMemoryRelayDatabase) SelectQueueEntries(
+ ctx context.Context,
+ txn *sql.Tx, serverName gomatrixserverlib.ServerName,
+ limit int,
+) ([]int64, error) {
+ results := []int64{}
+ resultCount := limit
+ if limit > len(d.associations[serverName]) {
+ resultCount = len(d.associations[serverName])
+ }
+ if resultCount > 0 {
+ for i := 0; i < resultCount; i++ {
+ results = append(results, d.associations[serverName][i])
+ }
+ }
+
+ return results, nil
+}
+
+func (d *InMemoryRelayDatabase) SelectQueueEntryCount(
+ ctx context.Context,
+ txn *sql.Tx,
+ serverName gomatrixserverlib.ServerName,
+) (int64, error) {
+ return int64(len(d.associations[serverName])), nil
+}
+
+func (d *InMemoryRelayDatabase) InsertQueueJSON(
+ ctx context.Context,
+ txn *sql.Tx,
+ json string,
+) (int64, error) {
+ d.nidMutex.Lock()
+ defer d.nidMutex.Unlock()
+
+ nid := d.nid
+ d.transactions[nid] = []byte(json)
+ d.nid++
+
+ return nid, nil
+}
+
+func (d *InMemoryRelayDatabase) DeleteQueueJSON(
+ ctx context.Context,
+ txn *sql.Tx,
+ nids []int64,
+) error {
+ for _, nid := range nids {
+ delete(d.transactions, nid)
+ }
+
+ return nil
+}
+
+func (d *InMemoryRelayDatabase) SelectQueueJSON(
+ ctx context.Context,
+ txn *sql.Tx,
+ jsonNIDs []int64,
+) (map[int64][]byte, error) {
+ result := make(map[int64][]byte)
+ for _, nid := range jsonNIDs {
+ if transaction, ok := d.transactions[nid]; ok {
+ result[nid] = transaction
+ }
+ }
+
+ return result, nil
+}
diff --git a/test/testrig/base.go b/test/testrig/base.go
index 9773da22..dfc0d8aa 100644
--- a/test/testrig/base.go
+++ b/test/testrig/base.go
@@ -67,9 +67,10 @@ func CreateBaseDendrite(t *testing.T, dbType test.DBType) (*base.BaseDendrite, f
case test.DBTypeSQLite:
cfg.Defaults(config.DefaultOpts{
Generate: true,
- Monolithic: false, // because we need a database per component
+ Monolithic: true,
})
cfg.Global.ServerName = "test"
+
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
// the file system event with InMemory=true :(
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
@@ -83,6 +84,7 @@ func CreateBaseDendrite(t *testing.T, dbType test.DBType) (*base.BaseDendrite, f
cfg.RoomServer.Database.ConnectionString = config.DataSource(filepath.Join("file://", tempDir, "roomserver.db"))
cfg.SyncAPI.Database.ConnectionString = config.DataSource(filepath.Join("file://", tempDir, "syncapi.db"))
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(filepath.Join("file://", tempDir, "userapi.db"))
+ cfg.RelayAPI.Database.ConnectionString = config.DataSource(filepath.Join("file://", tempDir, "relayapi.db"))
base := base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics)
return base, func() {