aboutsummaryrefslogtreecommitdiff
path: root/federationsender
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-02-13 17:27:33 +0000
committerGitHub <noreply@github.com>2020-02-13 17:27:33 +0000
commitb6ea1bc67ab51667b9e139dd05e0778aca025501 (patch)
tree18569c317fd28544144c320ce844d93a8ff8ec5e /federationsender
parent6942ee1de0250235164cf0ce45570b7fc919669d (diff)
Support sqlite in addition to postgres (#869)
* Move current work into single branch * Initial massaging of clientapi etc (not working yet) * Interfaces for accounts/devices databases * Duplicate postgres package for sqlite3 (no changes made to it yet) * Some keydb, accountdb, devicedb, common partition fixes, some more syncapi tweaking * Fix accounts DB, device DB * Update naffka dependency for SQLite * Naffka SQLite * Update naffka to latest master * SQLite support for federationsender * Mostly not-bad support for SQLite in syncapi (although there are problems where lots of events get classed incorrectly as backward extremities, probably because of IN/ANY clauses that are badly supported) * Update Dockerfile -> Go 1.13.7, add build-base (as gcc and friends are needed for SQLite) * Implement GET endpoints for account_data in clientapi * Nuke filtering for now... * Revert "Implement GET endpoints for account_data in clientapi" This reverts commit 4d80dff4583d278620d9b3ed437e9fcd8d4674ee. * Implement GET endpoints for account_data in clientapi (#861) * Implement GET endpoints for account_data in clientapi * Fix accountDB parameter * Remove fmt.Println * Fix insertAccountData SQLite query * Fix accountDB storage interfaces * Add empty push rules into account data on account creation (#862) * Put SaveAccountData into the right function this time * Not sure if roomserver is better or worse now * sqlite work * Allow empty last sent ID for the first event * sqlite: room creation works * Support sending messages * Nuke fmt.println * Move QueryVariadic etc into common, other device fixes * Fix some linter issues * Fix bugs * Fix some linting errors * Fix errcheck lint errors * Make naffka use postgres as fallback, fix couple of compile errors * What on earth happened to the /rooms/{roomID}/send/{eventType} routing Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'federationsender')
-rw-r--r--federationsender/storage/postgres/storage.go2
-rw-r--r--federationsender/storage/sqlite3/joined_hosts_table.go139
-rw-r--r--federationsender/storage/sqlite3/room_table.go101
-rw-r--r--federationsender/storage/sqlite3/storage.go124
-rw-r--r--federationsender/storage/storage.go3
5 files changed, 368 insertions, 1 deletions
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index c60f6dc5..d97b5d29 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -87,7 +87,7 @@ func (d *Database) UpdateRoom(
return nil
}
- if lastSentEventID != oldEventID {
+ if lastSentEventID != "" && lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go
new file mode 100644
index 00000000..1437a062
--- /dev/null
+++ b/federationsender/storage/sqlite3/joined_hosts_table.go
@@ -0,0 +1,139 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const joinedHostsSchema = `
+-- The joined_hosts table stores a list of m.room.member event ids in the
+-- current state for each room where the membership is "join".
+-- There will be an entry for every user that is joined to the room.
+CREATE TABLE IF NOT EXISTS federationsender_joined_hosts (
+ -- The string ID of the room.
+ room_id TEXT NOT NULL,
+ -- The event ID of the m.room.member join event.
+ event_id TEXT NOT NULL,
+ -- The domain part of the user ID the m.room.member event is for.
+ server_name TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS federatonsender_joined_hosts_event_id_idx
+ ON federationsender_joined_hosts (event_id);
+
+CREATE INDEX IF NOT EXISTS federatonsender_joined_hosts_room_id_idx
+ ON federationsender_joined_hosts (room_id)
+`
+
+const insertJoinedHostsSQL = "" +
+ "INSERT INTO federationsender_joined_hosts (room_id, event_id, server_name)" +
+ " VALUES ($1, $2, $3)"
+
+const deleteJoinedHostsSQL = "" +
+ "DELETE FROM federationsender_joined_hosts WHERE event_id = $1"
+
+const selectJoinedHostsSQL = "" +
+ "SELECT event_id, server_name FROM federationsender_joined_hosts" +
+ " WHERE room_id = $1"
+
+type joinedHostsStatements struct {
+ insertJoinedHostsStmt *sql.Stmt
+ deleteJoinedHostsStmt *sql.Stmt
+ selectJoinedHostsStmt *sql.Stmt
+}
+
+func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(joinedHostsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil {
+ return
+ }
+ if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil {
+ return
+ }
+ if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *joinedHostsStatements) insertJoinedHosts(
+ ctx context.Context,
+ txn *sql.Tx,
+ roomID, eventID string,
+ serverName gomatrixserverlib.ServerName,
+) error {
+ stmt := common.TxStmt(txn, s.insertJoinedHostsStmt)
+ _, err := stmt.ExecContext(ctx, roomID, eventID, serverName)
+ return err
+}
+
+func (s *joinedHostsStatements) deleteJoinedHosts(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) error {
+ for _, eventID := range eventIDs {
+ stmt := common.TxStmt(txn, s.deleteJoinedHostsStmt)
+ if _, err := stmt.ExecContext(ctx, eventID); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *joinedHostsStatements) selectJoinedHostsWithTx(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) ([]types.JoinedHost, error) {
+ stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
+ return joinedHostsFromStmt(ctx, stmt, roomID)
+}
+
+func (s *joinedHostsStatements) selectJoinedHosts(
+ ctx context.Context, roomID string,
+) ([]types.JoinedHost, error) {
+ return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
+}
+
+func joinedHostsFromStmt(
+ ctx context.Context, stmt *sql.Stmt, roomID string,
+) ([]types.JoinedHost, error) {
+ rows, err := stmt.QueryContext(ctx, roomID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ var result []types.JoinedHost
+ for rows.Next() {
+ var eventID, serverName string
+ if err = rows.Scan(&eventID, &serverName); err != nil {
+ return nil, err
+ }
+ result = append(result, types.JoinedHost{
+ MemberEventID: eventID,
+ ServerName: gomatrixserverlib.ServerName(serverName),
+ })
+ }
+
+ return result, nil
+}
diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go
new file mode 100644
index 00000000..6361400d
--- /dev/null
+++ b/federationsender/storage/sqlite3/room_table.go
@@ -0,0 +1,101 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+)
+
+const roomSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_rooms (
+ -- The string ID of the room
+ room_id TEXT PRIMARY KEY,
+ -- The most recent event state by the room server.
+ -- We can use this to tell if our view of the room state has become
+ -- desynchronised.
+ last_event_id TEXT NOT NULL
+);`
+
+const insertRoomSQL = "" +
+ "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" +
+ " ON CONFLICT DO NOTHING"
+
+const selectRoomForUpdateSQL = "" +
+ "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1"
+
+const updateRoomSQL = "" +
+ "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1"
+
+type roomStatements struct {
+ insertRoomStmt *sql.Stmt
+ selectRoomForUpdateStmt *sql.Stmt
+ updateRoomStmt *sql.Stmt
+}
+
+func (s *roomStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(roomSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil {
+ return
+ }
+ if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil {
+ return
+ }
+ if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil {
+ return
+ }
+ return
+}
+
+// insertRoom inserts the room if it didn't already exist.
+// If the room didn't exist then last_event_id is set to the empty string.
+func (s *roomStatements) insertRoom(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := common.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
+ return err
+}
+
+// selectRoomForUpdate locks the row for the room and returns the last_event_id.
+// The row must already exist in the table. Callers can ensure that the row
+// exists by calling insertRoom first.
+func (s *roomStatements) selectRoomForUpdate(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (string, error) {
+ var lastEventID string
+ stmt := common.TxStmt(txn, s.selectRoomForUpdateStmt)
+ err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID)
+ if err != nil {
+ return "", err
+ }
+ return lastEventID, nil
+}
+
+// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
+// have already been called earlier within the transaction.
+func (s *roomStatements) updateRoom(
+ ctx context.Context, txn *sql.Tx, roomID, lastEventID string,
+) error {
+ stmt := common.TxStmt(txn, s.updateRoomStmt)
+ _, err := stmt.ExecContext(ctx, roomID, lastEventID)
+ return err
+}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
new file mode 100644
index 00000000..f9cfaa99
--- /dev/null
+++ b/federationsender/storage/sqlite3/storage.go
@@ -0,0 +1,124 @@
+// Copyright 2017-2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ _ "github.com/mattn/go-sqlite3"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/types"
+)
+
+// Database stores information needed by the federation sender
+type Database struct {
+ joinedHostsStatements
+ roomStatements
+ common.PartitionOffsetStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ var err error
+
+ if err = d.joinedHostsStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.roomStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
+}
+
+// UpdateRoom updates the joined hosts for a room and returns what the joined
+// hosts were before the update, or nil if this was a duplicate message.
+// This is called when we receive a message from kafka, so we pass in
+// oldEventID and newEventID to check that we haven't missed any messages or
+// this isn't a duplicate message.
+func (d *Database) UpdateRoom(
+ ctx context.Context,
+ roomID, oldEventID, newEventID string,
+ addHosts []types.JoinedHost,
+ removeHosts []string,
+) (joinedHosts []types.JoinedHost, err error) {
+ err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ err = d.insertRoom(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ if lastSentEventID == newEventID {
+ // We've handled this message before, so let's just ignore it.
+ // We can only get a duplicate for the last message we processed,
+ // so its enough just to compare the newEventID with lastSentEventID
+ return nil
+ }
+
+ if lastSentEventID != "" && lastSentEventID != oldEventID {
+ return types.EventIDMismatchError{
+ DatabaseID: lastSentEventID, RoomServerID: oldEventID,
+ }
+ }
+
+ joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ for _, add := range addHosts {
+ err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
+ if err != nil {
+ return err
+ }
+ }
+ if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
+ return err
+ }
+ return d.updateRoom(ctx, txn, roomID, newEventID)
+ })
+ return
+}
+
+// GetJoinedHosts returns the currently joined hosts for room,
+// as known to federationserver.
+// Returns an error if something goes wrong.
+func (d *Database) GetJoinedHosts(
+ ctx context.Context, roomID string,
+) ([]types.JoinedHost, error) {
+ return d.selectJoinedHosts(ctx, roomID)
+}
diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go
index 4ce151c7..e83c1e9d 100644
--- a/federationsender/storage/storage.go
+++ b/federationsender/storage/storage.go
@@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
+ "github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
"github.com/matrix-org/dendrite/federationsender/types"
)
@@ -36,6 +37,8 @@ func NewDatabase(dataSourceName string) (Database, error) {
return postgres.NewDatabase(dataSourceName)
}
switch uri.Scheme {
+ case "file":
+ return sqlite3.NewDatabase(dataSourceName)
case "postgres":
return postgres.NewDatabase(dataSourceName)
default: