diff options
Diffstat (limited to 'federationsender/storage')
-rw-r--r-- | federationsender/storage/interface.go | 11 | ||||
-rw-r--r-- | federationsender/storage/postgres/inbound_peeks_table.go | 176 | ||||
-rw-r--r-- | federationsender/storage/postgres/outbound_peeks_table.go | 176 | ||||
-rw-r--r-- | federationsender/storage/postgres/storage.go | 28 | ||||
-rw-r--r-- | federationsender/storage/shared/storage.go | 60 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/inbound_peeks_table.go | 176 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/outbound_peeks_table.go | 176 | ||||
-rw-r--r-- | federationsender/storage/sqlite3/storage.go | 28 | ||||
-rw-r--r-- | federationsender/storage/tables/interface.go | 18 |
9 files changed, 822 insertions, 27 deletions
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 03d616f1..b8361304 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -51,7 +51,18 @@ type Database interface { GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) + // these don't have contexts passed in as we want things to happen regardless of the request context AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) + + AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) + GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) + + AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) + GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) } diff --git a/federationsender/storage/postgres/inbound_peeks_table.go b/federationsender/storage/postgres/inbound_peeks_table.go new file mode 100644 index 00000000..fe35ce44 --- /dev/null +++ b/federationsender/storage/postgres/inbound_peeks_table.go @@ -0,0 +1,176 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const inboundPeeksSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks ( + room_id TEXT NOT NULL, + server_name TEXT NOT NULL, + peek_id TEXT NOT NULL, + creation_ts BIGINT NOT NULL, + renewed_ts BIGINT NOT NULL, + renewal_interval BIGINT NOT NULL, + UNIQUE (room_id, server_name, peek_id) +); +` + +const insertInboundPeekSQL = "" + + "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)" + +const selectInboundPeekSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3" + +const selectInboundPeeksSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1" + +const renewInboundPeekSQL = "" + + "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5" + +const deleteInboundPeekSQL = "" + + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2" + +const deleteInboundPeeksSQL = "" + + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1" + +type inboundPeeksStatements struct { + db *sql.DB + insertInboundPeekStmt *sql.Stmt + selectInboundPeekStmt *sql.Stmt + selectInboundPeeksStmt *sql.Stmt + renewInboundPeekStmt *sql.Stmt + deleteInboundPeekStmt *sql.Stmt + deleteInboundPeeksStmt *sql.Stmt +} + +func NewPostgresInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) { + s = &inboundPeeksStatements{ + db: db, + } + _, err = db.Exec(inboundPeeksSchema) + if err != nil { + return + } + + if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil { + return + } + if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil { + return + } + if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil { + return + } + if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil { + return + } + if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil { + return + } + if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil { + return + } + return +} + +func (s *inboundPeeksStatements) InsertInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt) + _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) + return +} + +func (s *inboundPeeksStatements) RenewInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) + return +} + +func (s *inboundPeeksStatements) SelectInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (*types.InboundPeek, error) { + row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID) + inboundPeek := types.InboundPeek{} + err := row.Scan( + &inboundPeek.RoomID, + &inboundPeek.ServerName, + &inboundPeek.PeekID, + &inboundPeek.CreationTimestamp, + &inboundPeek.RenewedTimestamp, + &inboundPeek.RenewalInterval, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &inboundPeek, nil +} + +func (s *inboundPeeksStatements) SelectInboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (inboundPeeks []types.InboundPeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed") + + for rows.Next() { + inboundPeek := types.InboundPeek{} + if err = rows.Scan( + &inboundPeek.RoomID, + &inboundPeek.ServerName, + &inboundPeek.PeekID, + &inboundPeek.CreationTimestamp, + &inboundPeek.RenewedTimestamp, + &inboundPeek.RenewalInterval, + ); err != nil { + return + } + inboundPeeks = append(inboundPeeks, inboundPeek) + } + + return inboundPeeks, rows.Err() +} + +func (s *inboundPeeksStatements) DeleteInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID) + return +} + +func (s *inboundPeeksStatements) DeleteInboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID) + return +} diff --git a/federationsender/storage/postgres/outbound_peeks_table.go b/federationsender/storage/postgres/outbound_peeks_table.go new file mode 100644 index 00000000..596b4bcc --- /dev/null +++ b/federationsender/storage/postgres/outbound_peeks_table.go @@ -0,0 +1,176 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const outboundPeeksSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks ( + room_id TEXT NOT NULL, + server_name TEXT NOT NULL, + peek_id TEXT NOT NULL, + creation_ts BIGINT NOT NULL, + renewed_ts BIGINT NOT NULL, + renewal_interval BIGINT NOT NULL, + UNIQUE (room_id, server_name, peek_id) +); +` + +const insertOutboundPeekSQL = "" + + "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)" + +const selectOutboundPeekSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3" + +const selectOutboundPeeksSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1" + +const renewOutboundPeekSQL = "" + + "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5" + +const deleteOutboundPeekSQL = "" + + "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2" + +const deleteOutboundPeeksSQL = "" + + "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1" + +type outboundPeeksStatements struct { + db *sql.DB + insertOutboundPeekStmt *sql.Stmt + selectOutboundPeekStmt *sql.Stmt + selectOutboundPeeksStmt *sql.Stmt + renewOutboundPeekStmt *sql.Stmt + deleteOutboundPeekStmt *sql.Stmt + deleteOutboundPeeksStmt *sql.Stmt +} + +func NewPostgresOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) { + s = &outboundPeeksStatements{ + db: db, + } + _, err = db.Exec(outboundPeeksSchema) + if err != nil { + return + } + + if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil { + return + } + if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil { + return + } + if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil { + return + } + if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil { + return + } + if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil { + return + } + if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil { + return + } + return +} + +func (s *outboundPeeksStatements) InsertOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt) + _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) + return +} + +func (s *outboundPeeksStatements) RenewOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) + return +} + +func (s *outboundPeeksStatements) SelectOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (*types.OutboundPeek, error) { + row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID) + outboundPeek := types.OutboundPeek{} + err := row.Scan( + &outboundPeek.RoomID, + &outboundPeek.ServerName, + &outboundPeek.PeekID, + &outboundPeek.CreationTimestamp, + &outboundPeek.RenewedTimestamp, + &outboundPeek.RenewalInterval, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &outboundPeek, nil +} + +func (s *outboundPeeksStatements) SelectOutboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (outboundPeeks []types.OutboundPeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed") + + for rows.Next() { + outboundPeek := types.OutboundPeek{} + if err = rows.Scan( + &outboundPeek.RoomID, + &outboundPeek.ServerName, + &outboundPeek.PeekID, + &outboundPeek.CreationTimestamp, + &outboundPeek.RenewedTimestamp, + &outboundPeek.RenewalInterval, + ); err != nil { + return + } + outboundPeeks = append(outboundPeeks, outboundPeek) + } + + return outboundPeeks, rows.Err() +} + +func (s *outboundPeeksStatements) DeleteOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID) + return +} + +func (s *outboundPeeksStatements) DeleteOutboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID) + return +} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 75b54bbc..b9827ca1 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -64,16 +64,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + inboundPeeks, err := NewPostgresInboundPeeksTable(d.db) + if err != nil { + return nil, err + } + outboundPeeks, err := NewPostgresOutboundPeeksTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ - DB: d.db, - Cache: cache, - Writer: d.writer, - FederationSenderJoinedHosts: joinedHosts, - FederationSenderQueuePDUs: queuePDUs, - FederationSenderQueueEDUs: queueEDUs, - FederationSenderQueueJSON: queueJSON, - FederationSenderRooms: rooms, - FederationSenderBlacklist: blacklist, + DB: d.db, + Cache: cache, + Writer: d.writer, + FederationSenderJoinedHosts: joinedHosts, + FederationSenderQueuePDUs: queuePDUs, + FederationSenderQueueEDUs: queueEDUs, + FederationSenderQueueJSON: queueJSON, + FederationSenderRooms: rooms, + FederationSenderBlacklist: blacklist, + FederationSenderInboundPeeks: inboundPeeks, + FederationSenderOutboundPeeks: outboundPeeks, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index fbf84c70..4c949042 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -27,15 +27,17 @@ import ( ) type Database struct { - DB *sql.DB - Cache caching.FederationSenderCache - Writer sqlutil.Writer - FederationSenderQueuePDUs tables.FederationSenderQueuePDUs - FederationSenderQueueEDUs tables.FederationSenderQueueEDUs - FederationSenderQueueJSON tables.FederationSenderQueueJSON - FederationSenderJoinedHosts tables.FederationSenderJoinedHosts - FederationSenderRooms tables.FederationSenderRooms - FederationSenderBlacklist tables.FederationSenderBlacklist + DB *sql.DB + Cache caching.FederationSenderCache + Writer sqlutil.Writer + FederationSenderQueuePDUs tables.FederationSenderQueuePDUs + FederationSenderQueueEDUs tables.FederationSenderQueueEDUs + FederationSenderQueueJSON tables.FederationSenderQueueJSON + FederationSenderJoinedHosts tables.FederationSenderJoinedHosts + FederationSenderRooms tables.FederationSenderRooms + FederationSenderBlacklist tables.FederationSenderBlacklist + FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks + FederationSenderInboundPeeks tables.FederationSenderInboundPeeks } // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. @@ -173,3 +175,43 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) } + +func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + }) +} + +func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + }) +} + +func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) { + return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID) +} + +func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) { + return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID) +} + +func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + }) +} + +func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + }) +} + +func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) { + return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID) +} + +func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) { + return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID) +} diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationsender/storage/sqlite3/inbound_peeks_table.go new file mode 100644 index 00000000..d5eacf9e --- /dev/null +++ b/federationsender/storage/sqlite3/inbound_peeks_table.go @@ -0,0 +1,176 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const inboundPeeksSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks ( + room_id TEXT NOT NULL, + server_name TEXT NOT NULL, + peek_id TEXT NOT NULL, + creation_ts INTEGER NOT NULL, + renewed_ts INTEGER NOT NULL, + renewal_interval INTEGER NOT NULL, + UNIQUE (room_id, server_name, peek_id) +); +` + +const insertInboundPeekSQL = "" + + "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)" + +const selectInboundPeekSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3" + +const selectInboundPeeksSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1" + +const renewInboundPeekSQL = "" + + "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5" + +const deleteInboundPeekSQL = "" + + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2" + +const deleteInboundPeeksSQL = "" + + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1" + +type inboundPeeksStatements struct { + db *sql.DB + insertInboundPeekStmt *sql.Stmt + selectInboundPeekStmt *sql.Stmt + selectInboundPeeksStmt *sql.Stmt + renewInboundPeekStmt *sql.Stmt + deleteInboundPeekStmt *sql.Stmt + deleteInboundPeeksStmt *sql.Stmt +} + +func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) { + s = &inboundPeeksStatements{ + db: db, + } + _, err = db.Exec(inboundPeeksSchema) + if err != nil { + return + } + + if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil { + return + } + if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil { + return + } + if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil { + return + } + if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil { + return + } + if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil { + return + } + if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil { + return + } + return +} + +func (s *inboundPeeksStatements) InsertInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt) + _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) + return +} + +func (s *inboundPeeksStatements) RenewInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) + return +} + +func (s *inboundPeeksStatements) SelectInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (*types.InboundPeek, error) { + row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID) + inboundPeek := types.InboundPeek{} + err := row.Scan( + &inboundPeek.RoomID, + &inboundPeek.ServerName, + &inboundPeek.PeekID, + &inboundPeek.CreationTimestamp, + &inboundPeek.RenewedTimestamp, + &inboundPeek.RenewalInterval, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &inboundPeek, nil +} + +func (s *inboundPeeksStatements) SelectInboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (inboundPeeks []types.InboundPeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed") + + for rows.Next() { + inboundPeek := types.InboundPeek{} + if err = rows.Scan( + &inboundPeek.RoomID, + &inboundPeek.ServerName, + &inboundPeek.PeekID, + &inboundPeek.CreationTimestamp, + &inboundPeek.RenewedTimestamp, + &inboundPeek.RenewalInterval, + ); err != nil { + return + } + inboundPeeks = append(inboundPeeks, inboundPeek) + } + + return inboundPeeks, rows.Err() +} + +func (s *inboundPeeksStatements) DeleteInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID) + return +} + +func (s *inboundPeeksStatements) DeleteInboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID) + return +} diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationsender/storage/sqlite3/outbound_peeks_table.go new file mode 100644 index 00000000..02aefce7 --- /dev/null +++ b/federationsender/storage/sqlite3/outbound_peeks_table.go @@ -0,0 +1,176 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const outboundPeeksSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks ( + room_id TEXT NOT NULL, + server_name TEXT NOT NULL, + peek_id TEXT NOT NULL, + creation_ts INTEGER NOT NULL, + renewed_ts INTEGER NOT NULL, + renewal_interval INTEGER NOT NULL, + UNIQUE (room_id, server_name, peek_id) +); +` + +const insertOutboundPeekSQL = "" + + "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)" + +const selectOutboundPeekSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3" + +const selectOutboundPeeksSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1" + +const renewOutboundPeekSQL = "" + + "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5" + +const deleteOutboundPeekSQL = "" + + "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2" + +const deleteOutboundPeeksSQL = "" + + "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1" + +type outboundPeeksStatements struct { + db *sql.DB + insertOutboundPeekStmt *sql.Stmt + selectOutboundPeekStmt *sql.Stmt + selectOutboundPeeksStmt *sql.Stmt + renewOutboundPeekStmt *sql.Stmt + deleteOutboundPeekStmt *sql.Stmt + deleteOutboundPeeksStmt *sql.Stmt +} + +func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) { + s = &outboundPeeksStatements{ + db: db, + } + _, err = db.Exec(outboundPeeksSchema) + if err != nil { + return + } + + if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil { + return + } + if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil { + return + } + if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil { + return + } + if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil { + return + } + if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil { + return + } + if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil { + return + } + return +} + +func (s *outboundPeeksStatements) InsertOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt) + _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) + return +} + +func (s *outboundPeeksStatements) RenewOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) + return +} + +func (s *outboundPeeksStatements) SelectOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (*types.OutboundPeek, error) { + row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID) + outboundPeek := types.OutboundPeek{} + err := row.Scan( + &outboundPeek.RoomID, + &outboundPeek.ServerName, + &outboundPeek.PeekID, + &outboundPeek.CreationTimestamp, + &outboundPeek.RenewedTimestamp, + &outboundPeek.RenewalInterval, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &outboundPeek, nil +} + +func (s *outboundPeeksStatements) SelectOutboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (outboundPeeks []types.OutboundPeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed") + + for rows.Next() { + outboundPeek := types.OutboundPeek{} + if err = rows.Scan( + &outboundPeek.RoomID, + &outboundPeek.ServerName, + &outboundPeek.PeekID, + &outboundPeek.CreationTimestamp, + &outboundPeek.RenewedTimestamp, + &outboundPeek.RenewalInterval, + ); err != nil { + return + } + outboundPeeks = append(outboundPeeks, outboundPeek) + } + + return outboundPeeks, rows.Err() +} + +func (s *outboundPeeksStatements) DeleteOutboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID) + return +} + +func (s *outboundPeeksStatements) DeleteOutboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID) + return +} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index e66d7690..2b135858 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -66,16 +66,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db) + if err != nil { + return nil, err + } + inboundPeeks, err := NewSQLiteInboundPeeksTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ - DB: d.db, - Cache: cache, - Writer: d.writer, - FederationSenderJoinedHosts: joinedHosts, - FederationSenderQueuePDUs: queuePDUs, - FederationSenderQueueEDUs: queueEDUs, - FederationSenderQueueJSON: queueJSON, - FederationSenderRooms: rooms, - FederationSenderBlacklist: blacklist, + DB: d.db, + Cache: cache, + Writer: d.writer, + FederationSenderJoinedHosts: joinedHosts, + FederationSenderQueuePDUs: queuePDUs, + FederationSenderQueueEDUs: queueEDUs, + FederationSenderQueueJSON: queueJSON, + FederationSenderRooms: rooms, + FederationSenderBlacklist: blacklist, + FederationSenderOutboundPeeks: outboundPeeks, + FederationSenderInboundPeeks: inboundPeeks, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 69e952de..22fd5554 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -67,3 +67,21 @@ type FederationSenderBlacklist interface { SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error } + +type FederationSenderOutboundPeeks interface { + InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error) + SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error) + DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) + DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) +} + +type FederationSenderInboundPeeks interface { + InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error) + SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error) + DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) + DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) +} |