aboutsummaryrefslogtreecommitdiff
path: root/federationsender/storage
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/storage')
-rw-r--r--federationsender/storage/interface.go11
-rw-r--r--federationsender/storage/postgres/inbound_peeks_table.go176
-rw-r--r--federationsender/storage/postgres/outbound_peeks_table.go176
-rw-r--r--federationsender/storage/postgres/storage.go28
-rw-r--r--federationsender/storage/shared/storage.go60
-rw-r--r--federationsender/storage/sqlite3/inbound_peeks_table.go176
-rw-r--r--federationsender/storage/sqlite3/outbound_peeks_table.go176
-rw-r--r--federationsender/storage/sqlite3/storage.go28
-rw-r--r--federationsender/storage/tables/interface.go18
9 files changed, 822 insertions, 27 deletions
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index 03d616f1..b8361304 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -51,7 +51,18 @@ type Database interface {
GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
+ // these don't have contexts passed in as we want things to happen regardless of the request context
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
+
+ AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
+ GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
+
+ AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error)
+ GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error)
}
diff --git a/federationsender/storage/postgres/inbound_peeks_table.go b/federationsender/storage/postgres/inbound_peeks_table.go
new file mode 100644
index 00000000..fe35ce44
--- /dev/null
+++ b/federationsender/storage/postgres/inbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts BIGINT NOT NULL,
+ renewed_ts BIGINT NOT NULL,
+ renewal_interval BIGINT NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertInboundPeekSQL = "" +
+ "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectInboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectInboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+const renewInboundPeekSQL = "" +
+ "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteInboundPeekSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteInboundPeeksSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+type inboundPeeksStatements struct {
+ db *sql.DB
+ insertInboundPeekStmt *sql.Stmt
+ selectInboundPeekStmt *sql.Stmt
+ selectInboundPeeksStmt *sql.Stmt
+ renewInboundPeekStmt *sql.Stmt
+ deleteInboundPeekStmt *sql.Stmt
+ deleteInboundPeeksStmt *sql.Stmt
+}
+
+func NewPostgresInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
+ s = &inboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(inboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inboundPeeksStatements) InsertInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *inboundPeeksStatements) RenewInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.InboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
+ inboundPeek := types.InboundPeek{}
+ err := row.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &inboundPeek, nil
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (inboundPeeks []types.InboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ inboundPeek := types.InboundPeek{}
+ if err = rows.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ inboundPeeks = append(inboundPeeks, inboundPeek)
+ }
+
+ return inboundPeeks, rows.Err()
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/postgres/outbound_peeks_table.go b/federationsender/storage/postgres/outbound_peeks_table.go
new file mode 100644
index 00000000..596b4bcc
--- /dev/null
+++ b/federationsender/storage/postgres/outbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const outboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts BIGINT NOT NULL,
+ renewed_ts BIGINT NOT NULL,
+ renewal_interval BIGINT NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertOutboundPeekSQL = "" +
+ "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectOutboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectOutboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+const renewOutboundPeekSQL = "" +
+ "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteOutboundPeekSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteOutboundPeeksSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+type outboundPeeksStatements struct {
+ db *sql.DB
+ insertOutboundPeekStmt *sql.Stmt
+ selectOutboundPeekStmt *sql.Stmt
+ selectOutboundPeeksStmt *sql.Stmt
+ renewOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeeksStmt *sql.Stmt
+}
+
+func NewPostgresOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
+ s = &outboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(outboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *outboundPeeksStatements) InsertOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *outboundPeeksStatements) RenewOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.OutboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
+ outboundPeek := types.OutboundPeek{}
+ err := row.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &outboundPeek, nil
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (outboundPeeks []types.OutboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ outboundPeek := types.OutboundPeek{}
+ if err = rows.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ outboundPeeks = append(outboundPeeks, outboundPeek)
+ }
+
+ return outboundPeeks, rows.Err()
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index 75b54bbc..b9827ca1 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -64,16 +64,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil {
return nil, err
}
+ inboundPeeks, err := NewPostgresInboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ outboundPeeks, err := NewPostgresOutboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
- DB: d.db,
- Cache: cache,
- Writer: d.writer,
- FederationSenderJoinedHosts: joinedHosts,
- FederationSenderQueuePDUs: queuePDUs,
- FederationSenderQueueEDUs: queueEDUs,
- FederationSenderQueueJSON: queueJSON,
- FederationSenderRooms: rooms,
- FederationSenderBlacklist: blacklist,
+ DB: d.db,
+ Cache: cache,
+ Writer: d.writer,
+ FederationSenderJoinedHosts: joinedHosts,
+ FederationSenderQueuePDUs: queuePDUs,
+ FederationSenderQueueEDUs: queueEDUs,
+ FederationSenderQueueJSON: queueJSON,
+ FederationSenderRooms: rooms,
+ FederationSenderBlacklist: blacklist,
+ FederationSenderInboundPeeks: inboundPeeks,
+ FederationSenderOutboundPeeks: outboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index fbf84c70..4c949042 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -27,15 +27,17 @@ import (
)
type Database struct {
- DB *sql.DB
- Cache caching.FederationSenderCache
- Writer sqlutil.Writer
- FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
- FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
- FederationSenderQueueJSON tables.FederationSenderQueueJSON
- FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
- FederationSenderRooms tables.FederationSenderRooms
- FederationSenderBlacklist tables.FederationSenderBlacklist
+ DB *sql.DB
+ Cache caching.FederationSenderCache
+ Writer sqlutil.Writer
+ FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
+ FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
+ FederationSenderQueueJSON tables.FederationSenderQueueJSON
+ FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
+ FederationSenderRooms tables.FederationSenderRooms
+ FederationSenderBlacklist tables.FederationSenderBlacklist
+ FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
+ FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
}
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@@ -173,3 +175,43 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}
+
+func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
+ return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
+}
+
+func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
+ return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
+}
+
+func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
+ return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
+}
+
+func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
+ return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
+}
diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationsender/storage/sqlite3/inbound_peeks_table.go
new file mode 100644
index 00000000..d5eacf9e
--- /dev/null
+++ b/federationsender/storage/sqlite3/inbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts INTEGER NOT NULL,
+ renewed_ts INTEGER NOT NULL,
+ renewal_interval INTEGER NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertInboundPeekSQL = "" +
+ "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectInboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectInboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+const renewInboundPeekSQL = "" +
+ "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteInboundPeekSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteInboundPeeksSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+type inboundPeeksStatements struct {
+ db *sql.DB
+ insertInboundPeekStmt *sql.Stmt
+ selectInboundPeekStmt *sql.Stmt
+ selectInboundPeeksStmt *sql.Stmt
+ renewInboundPeekStmt *sql.Stmt
+ deleteInboundPeekStmt *sql.Stmt
+ deleteInboundPeeksStmt *sql.Stmt
+}
+
+func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
+ s = &inboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(inboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inboundPeeksStatements) InsertInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *inboundPeeksStatements) RenewInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.InboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
+ inboundPeek := types.InboundPeek{}
+ err := row.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &inboundPeek, nil
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (inboundPeeks []types.InboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ inboundPeek := types.InboundPeek{}
+ if err = rows.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ inboundPeeks = append(inboundPeeks, inboundPeek)
+ }
+
+ return inboundPeeks, rows.Err()
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationsender/storage/sqlite3/outbound_peeks_table.go
new file mode 100644
index 00000000..02aefce7
--- /dev/null
+++ b/federationsender/storage/sqlite3/outbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const outboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts INTEGER NOT NULL,
+ renewed_ts INTEGER NOT NULL,
+ renewal_interval INTEGER NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertOutboundPeekSQL = "" +
+ "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectOutboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectOutboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+const renewOutboundPeekSQL = "" +
+ "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteOutboundPeekSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteOutboundPeeksSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+type outboundPeeksStatements struct {
+ db *sql.DB
+ insertOutboundPeekStmt *sql.Stmt
+ selectOutboundPeekStmt *sql.Stmt
+ selectOutboundPeeksStmt *sql.Stmt
+ renewOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeeksStmt *sql.Stmt
+}
+
+func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
+ s = &outboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(outboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *outboundPeeksStatements) InsertOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *outboundPeeksStatements) RenewOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.OutboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
+ outboundPeek := types.OutboundPeek{}
+ err := row.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &outboundPeek, nil
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (outboundPeeks []types.OutboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ outboundPeek := types.OutboundPeek{}
+ if err = rows.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ outboundPeeks = append(outboundPeeks, outboundPeek)
+ }
+
+ return outboundPeeks, rows.Err()
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index e66d7690..2b135858 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -66,16 +66,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil {
return nil, err
}
+ outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ inboundPeeks, err := NewSQLiteInboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
- DB: d.db,
- Cache: cache,
- Writer: d.writer,
- FederationSenderJoinedHosts: joinedHosts,
- FederationSenderQueuePDUs: queuePDUs,
- FederationSenderQueueEDUs: queueEDUs,
- FederationSenderQueueJSON: queueJSON,
- FederationSenderRooms: rooms,
- FederationSenderBlacklist: blacklist,
+ DB: d.db,
+ Cache: cache,
+ Writer: d.writer,
+ FederationSenderJoinedHosts: joinedHosts,
+ FederationSenderQueuePDUs: queuePDUs,
+ FederationSenderQueueEDUs: queueEDUs,
+ FederationSenderQueueJSON: queueJSON,
+ FederationSenderRooms: rooms,
+ FederationSenderBlacklist: blacklist,
+ FederationSenderOutboundPeeks: outboundPeeks,
+ FederationSenderInboundPeeks: inboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go
index 69e952de..22fd5554 100644
--- a/federationsender/storage/tables/interface.go
+++ b/federationsender/storage/tables/interface.go
@@ -67,3 +67,21 @@ type FederationSenderBlacklist interface {
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
}
+
+type FederationSenderOutboundPeeks interface {
+ InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
+ SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
+ DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
+ DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
+}
+
+type FederationSenderInboundPeeks interface {
+ InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
+ SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error)
+ DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
+ DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
+}