aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/clientapi.go4
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go4
-rw-r--r--syncapi/consumers/eduserver_typing.go4
-rw-r--r--syncapi/consumers/roomserver.go4
-rw-r--r--syncapi/routing/messages.go4
-rw-r--r--syncapi/routing/routing.go2
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/sqlite3/syncserver.go10
-rw-r--r--syncapi/storage/storage.go22
-rw-r--r--syncapi/storage/storage_test.go5
-rw-r--r--syncapi/storage/storage_wasm.go24
-rw-r--r--syncapi/syncapi.go6
12 files changed, 42 insertions, 52 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index f7cf96d9..ceaa735a 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -37,14 +37,14 @@ type OutputClientDataConsumer struct {
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputClientData),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputClientData),
Consumer: kafkaConsumer,
PartitionStore: store,
}
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index 06a8928d..20dd1756 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -41,14 +41,14 @@ type OutputSendToDeviceEventConsumer struct {
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputSendToDeviceEventConsumer(
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index 0a9a9c0c..fc5703d3 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -37,14 +37,14 @@ type OutputTypingEventConsumer struct {
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputTypingEventConsumer(
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index f8cdcd5c..06c904c3 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -40,7 +40,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
@@ -49,7 +49,7 @@ func NewOutputRoomEventConsumer(
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
- Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
+ Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 15add1b4..0999d3e8 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -36,7 +36,7 @@ type messagesReq struct {
db storage.Database
rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
- cfg *config.Dendrite
+ cfg *config.SyncAPI
roomID string
from *types.TopologyToken
to *types.TopologyToken
@@ -61,7 +61,7 @@ func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string,
federation *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI,
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
) util.JSONResponse {
var err error
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index ed0f872e..ec21c1b4 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -39,7 +39,7 @@ func Setup(
publicAPIMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database,
userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI,
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
) {
r0mux := publicAPIMux.PathPrefix(pathPrefixR0).Subrouter()
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 10c1b37c..26ef082f 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -21,6 +21,7 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
)
@@ -34,10 +35,10 @@ type SyncServerDatasource struct {
}
// NewDatabase creates a new sync server database
-func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*SyncServerDatasource, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
- if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil {
+ if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index c85db5a4..9564a23a 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -22,6 +22,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
)
@@ -37,13 +38,10 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint: gocyclo
-func NewDatabase(dataSourceName string) (*SyncServerDatasource, error) {
+func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
- cs, err := sqlutil.ParseFileURI(dataSourceName)
- if err != nil {
- return nil, err
- }
- if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil {
+ var err error
+ if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
if err = d.prepare(); err != nil {
diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go
index ea69da3b..c16dcd81 100644
--- a/syncapi/storage/storage.go
+++ b/syncapi/storage/storage.go
@@ -17,25 +17,21 @@
package storage
import (
- "net/url"
+ "fmt"
- "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
)
// NewSyncServerDatasource opens a database connection.
-func NewSyncServerDatasource(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) {
- uri, err := url.Parse(dataSourceName)
- if err != nil {
- return postgres.NewDatabase(dataSourceName, dbProperties)
- }
- switch uri.Scheme {
- case "postgres":
- return postgres.NewDatabase(dataSourceName, dbProperties)
- case "file":
- return sqlite3.NewDatabase(dataSourceName)
+func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) {
+ switch {
+ case dbProperties.ConnectionString.IsSQLite():
+ return sqlite3.NewDatabase(dbProperties)
+ case dbProperties.ConnectionString.IsPostgres():
+ return postgres.NewDatabase(dbProperties)
default:
- return postgres.NewDatabase(dataSourceName, dbProperties)
+ return nil, fmt.Errorf("unexpected database type")
}
}
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index 1f679def..0e827c95 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -9,6 +9,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -59,7 +60,9 @@ func MustCreateDatabase(t *testing.T) storage.Database {
t.Fatalf("tried to delete stale test database but failed: %s", err)
}
}
- db, err := sqlite3.NewDatabase(fmt.Sprintf("file:%s", dbname))
+ db, err := sqlite3.NewDatabase(&config.DatabaseOptions{
+ ConnectionString: config.DataSource(fmt.Sprintf("file:%s", dbname)),
+ })
if err != nil {
t.Fatalf("NewSyncServerDatasource returned %s", err)
}
diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go
index 0886b8c2..43b7bbea 100644
--- a/syncapi/storage/storage_wasm.go
+++ b/syncapi/storage/storage_wasm.go
@@ -16,27 +16,19 @@ package storage
import (
"fmt"
- "net/url"
- "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
)
// NewPublicRoomsServerDatabase opens a database connection.
-func NewSyncServerDatasource(
- dataSourceName string,
- dbProperties sqlutil.DbProperties, // nolint:unparam
-) (Database, error) {
- uri, err := url.Parse(dataSourceName)
- if err != nil {
- return nil, fmt.Errorf("Cannot use postgres implementation")
- }
- switch uri.Scheme {
- case "postgres":
- return nil, fmt.Errorf("Cannot use postgres implementation")
- case "file":
- return sqlite3.NewDatabase(dataSourceName)
+func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) {
+ switch {
+ case dbProperties.ConnectionString.IsSQLite():
+ return sqlite3.NewDatabase(dbProperties)
+ case dbProperties.ConnectionString.IsPostgres():
+ return nil, fmt.Errorf("can't use Postgres implementation")
default:
- return nil, fmt.Errorf("Cannot use postgres implementation")
+ return nil, fmt.Errorf("unexpected database type")
}
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 5198d59b..9caed7be 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -44,9 +44,9 @@ func AddPublicRoutes(
keyAPI keyapi.KeyInternalAPI,
currentStateAPI currentstateapi.CurrentStateInternalAPI,
federation *gomatrixserverlib.FederationClient,
- cfg *config.Dendrite,
+ cfg *config.SyncAPI,
) {
- syncDB, err := storage.NewSyncServerDatasource(string(cfg.Database.SyncAPI), cfg.DbProperties())
+ syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
@@ -65,7 +65,7 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent),
consumer, notifier, keyAPI, currentStateAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {