aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage
diff options
context:
space:
mode:
authorruben <code@rbn.im>2019-05-21 22:56:55 +0200
committerBrendan Abolivier <babolivier@matrix.org>2019-05-21 21:56:55 +0100
commit74827428bd3e11faab65f12204449c1b9469b0ae (patch)
tree0decafa542436a0667ed2d3e3cfd4df0f03de1e5 /federationsender/storage
parent4d588f7008afe5600219ac0930c2eee2de5c447b (diff)
use go module for dependencies (#594)
Diffstat (limited to 'federationsender/storage')
-rw-r--r--federationsender/storage/joined_hosts_table.go135
-rw-r--r--federationsender/storage/room_table.go100
-rw-r--r--federationsender/storage/storage.go121
3 files changed, 356 insertions, 0 deletions
diff --git a/federationsender/storage/joined_hosts_table.go b/federationsender/storage/joined_hosts_table.go
new file mode 100644
index 00000000..5d652a1a
--- /dev/null
+++ b/federationsender/storage/joined_hosts_table.go
@@ -0,0 +1,135 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const joinedHostsSchema = `
+-- The joined_hosts table stores a list of m.room.member event ids in the
+-- current state for each room where the membership is "join".
+-- There will be an entry for every user that is joined to the room.
+CREATE TABLE IF NOT EXISTS federationsender_joined_hosts (
+ -- The string ID of the room.
+ room_id TEXT NOT NULL,
+ -- The event ID of the m.room.member join event.
+ event_id TEXT NOT NULL,
+ -- The domain part of the user ID the m.room.member event is for.
+ server_name TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS federatonsender_joined_hosts_event_id_idx
+ ON federationsender_joined_hosts (event_id);
+
+CREATE INDEX IF NOT EXISTS federatonsender_joined_hosts_room_id_idx
+ ON federationsender_joined_hosts (room_id)
+`
+
+const insertJoinedHostsSQL = "" +
+ "INSERT INTO federationsender_joined_hosts (room_id, event_id, server_name)" +
+ " VALUES ($1, $2, $3)"
+
+const deleteJoinedHostsSQL = "" +
+ "DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)"
+
+const selectJoinedHostsSQL = "" +
+ "SELECT event_id, server_name FROM federationsender_joined_hosts" +
+ " WHERE room_id = $1"
+
+type joinedHostsStatements struct {
+ insertJoinedHostsStmt *sql.Stmt
+ deleteJoinedHostsStmt *sql.Stmt
+ selectJoinedHostsStmt *sql.Stmt
+}
+
+func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(joinedHostsSchema)
+ if err != nil {
+ return
+ }
+ if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil {
+ return
+ }
+ if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil {
+ return
+ }
+ if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *joinedHostsStatements) insertJoinedHosts(
+ ctx context.Context,
+ txn *sql.Tx,
+ roomID, eventID string,
+ serverName gomatrixserverlib.ServerName,
+) error {
+ stmt := common.TxStmt(txn, s.insertJoinedHostsStmt)
+ _, err := stmt.ExecContext(ctx, roomID, eventID, serverName)
+ return err
+}
+
+func (s *joinedHostsStatements) deleteJoinedHosts(
+ ctx context.Context, txn *sql.Tx, eventIDs []string,
+) error {
+ stmt := common.TxStmt(txn, s.deleteJoinedHostsStmt)
+ _, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs))
+ return err
+}
+
+func (s *joinedHostsStatements) selectJoinedHostsWithTx(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) ([]types.JoinedHost, error) {
+ stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
+ return joinedHostsFromStmt(ctx, stmt, roomID)
+}
+
+func (s *joinedHostsStatements) selectJoinedHosts(
+ ctx context.Context, roomID string,
+) ([]types.JoinedHost, error) {
+ return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
+}
+
+func joinedHostsFromStmt(
+ ctx context.Context, stmt *sql.Stmt, roomID string,
+) ([]types.JoinedHost, error) {
+ rows, err := stmt.QueryContext(ctx, roomID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+
+ var result []types.JoinedHost
+ for rows.Next() {
+ var eventID, serverName string
+ if err = rows.Scan(&eventID, &serverName); err != nil {
+ return nil, err
+ }
+ result = append(result, types.JoinedHost{
+ MemberEventID: eventID,
+ ServerName: gomatrixserverlib.ServerName(serverName),
+ })
+ }
+
+ return result, nil
+}
diff --git a/federationsender/storage/room_table.go b/federationsender/storage/room_table.go
new file mode 100644
index 00000000..bb52b707
--- /dev/null
+++ b/federationsender/storage/room_table.go
@@ -0,0 +1,100 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+)
+
+const roomSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_rooms (
+ -- The string ID of the room
+ room_id TEXT PRIMARY KEY,
+ -- The most recent event state by the room server.
+ -- We can use this to tell if our view of the room state has become
+ -- desynchronised.
+ last_event_id TEXT NOT NULL
+);`
+
+const insertRoomSQL = "" +
+ "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" +
+ " ON CONFLICT DO NOTHING"
+
+const selectRoomForUpdateSQL = "" +
+ "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1 FOR UPDATE"
+
+const updateRoomSQL = "" +
+ "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1"
+
+type roomStatements struct {
+ insertRoomStmt *sql.Stmt
+ selectRoomForUpdateStmt *sql.Stmt
+ updateRoomStmt *sql.Stmt
+}
+
+func (s *roomStatements) prepare(db *sql.DB) (err error) {
+ _, err = db.Exec(roomSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil {
+ return
+ }
+ if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil {
+ return
+ }
+ if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil {
+ return
+ }
+ return
+}
+
+// insertRoom inserts the room if it didn't already exist.
+// If the room didn't exist then last_event_id is set to the empty string.
+func (s *roomStatements) insertRoom(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := common.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
+ return err
+}
+
+// selectRoomForUpdate locks the row for the room and returns the last_event_id.
+// The row must already exist in the table. Callers can ensure that the row
+// exists by calling insertRoom first.
+func (s *roomStatements) selectRoomForUpdate(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (string, error) {
+ var lastEventID string
+ stmt := common.TxStmt(txn, s.selectRoomForUpdateStmt)
+ err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID)
+ if err != nil {
+ return "", err
+ }
+ return lastEventID, nil
+}
+
+// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
+// have already been called earlier within the transaction.
+func (s *roomStatements) updateRoom(
+ ctx context.Context, txn *sql.Tx, roomID, lastEventID string,
+) error {
+ stmt := common.TxStmt(txn, s.updateRoomStmt)
+ _, err := stmt.ExecContext(ctx, roomID, lastEventID)
+ return err
+}
diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go
new file mode 100644
index 00000000..3a0f8775
--- /dev/null
+++ b/federationsender/storage/storage.go
@@ -0,0 +1,121 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/federationsender/types"
+)
+
+// Database stores information needed by the federation sender
+type Database struct {
+ joinedHostsStatements
+ roomStatements
+ common.PartitionOffsetStatements
+ db *sql.DB
+}
+
+// NewDatabase opens a new database
+func NewDatabase(dataSourceName string) (*Database, error) {
+ var result Database
+ var err error
+ if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
+ return nil, err
+ }
+ if err = result.prepare(); err != nil {
+ return nil, err
+ }
+ return &result, nil
+}
+
+func (d *Database) prepare() error {
+ var err error
+
+ if err = d.joinedHostsStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ if err = d.roomStatements.prepare(d.db); err != nil {
+ return err
+ }
+
+ return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
+}
+
+// UpdateRoom updates the joined hosts for a room and returns what the joined
+// hosts were before the update, or nil if this was a duplicate message.
+// This is called when we receive a message from kafka, so we pass in
+// oldEventID and newEventID to check that we haven't missed any messages or
+// this isn't a duplicate message.
+func (d *Database) UpdateRoom(
+ ctx context.Context,
+ roomID, oldEventID, newEventID string,
+ addHosts []types.JoinedHost,
+ removeHosts []string,
+) (joinedHosts []types.JoinedHost, err error) {
+ err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
+ err = d.insertRoom(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ if lastSentEventID == newEventID {
+ // We've handled this message before, so let's just ignore it.
+ // We can only get a duplicate for the last message we processed,
+ // so its enough just to compare the newEventID with lastSentEventID
+ return nil
+ }
+
+ if lastSentEventID != oldEventID {
+ return types.EventIDMismatchError{
+ DatabaseID: lastSentEventID, RoomServerID: oldEventID,
+ }
+ }
+
+ joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
+ if err != nil {
+ return err
+ }
+
+ for _, add := range addHosts {
+ err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
+ if err != nil {
+ return err
+ }
+ }
+ if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
+ return err
+ }
+ return d.updateRoom(ctx, txn, roomID, newEventID)
+ })
+ return
+}
+
+// GetJoinedHosts returns the currently joined hosts for room,
+// as known to federationserver.
+// Returns an error if something goes wrong.
+func (d *Database) GetJoinedHosts(
+ ctx context.Context, roomID string,
+) ([]types.JoinedHost, error) {
+ return d.selectJoinedHosts(ctx, roomID)
+}