aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-08-09 11:15:58 +0200
committerGitHub <noreply@github.com>2022-08-09 11:15:58 +0200
commit240ae257deb74b7be8a17500b77d5e1bca56e8f5 (patch)
treeaff298447bf98b601cc6bfb7b14f1ada006db83a /federationapi
parente930959e4911453ec51bba0b1469f4b15c594f32 (diff)
Add housekeeping function to delete old/expired EDUs (#2399)
* Add housekeeping function to delete old/expired EDUs * Add migrations * Evict EDUs from cache * Fix queries * Fix upgrade * Use map[string]time.Duration to specify different expiry times * Fix copy & paste mistake * Set expires_at to tomorrow * Don't allow NULL * Add comment * Add tests * Use new testrig package * Fix migrations * Never expire m.direct_to_device Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> Co-authored-by: kegsay <kegan@matrix.org>
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/federationapi.go13
-rw-r--r--federationapi/queue/destinationqueue.go1
-rw-r--r--federationapi/storage/interface.go5
-rw-r--r--federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go44
-rw-r--r--federationapi/storage/postgres/queue_edus_table.go99
-rw-r--r--federationapi/storage/postgres/storage.go3
-rw-r--r--federationapi/storage/shared/storage_edus.go49
-rw-r--r--federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go68
-rw-r--r--federationapi/storage/sqlite3/queue_edus_table.go100
-rw-r--r--federationapi/storage/sqlite3/storage.go3
-rw-r--r--federationapi/storage/storage_test.go81
-rw-r--r--federationapi/storage/tables/interface.go5
12 files changed, 422 insertions, 49 deletions
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 97bcc12a..ff01b195 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -15,6 +15,8 @@
package federationapi
import (
+ "time"
+
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/federationapi/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
@@ -167,5 +169,16 @@ func NewInternalAPI(
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start presence consumer")
}
+
+ var cleanExpiredEDUs func()
+ cleanExpiredEDUs = func() {
+ logrus.Infof("Cleaning expired EDUs")
+ if err := federationDB.DeleteExpiredEDUs(base.Context()); err != nil {
+ logrus.WithError(err).Error("Failed to clean expired EDUs")
+ }
+ time.AfterFunc(time.Hour, cleanExpiredEDUs)
+ }
+ time.AfterFunc(time.Minute, cleanExpiredEDUs)
+
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing)
}
diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go
index b6edec5d..0d937ffa 100644
--- a/federationapi/queue/destinationqueue.go
+++ b/federationapi/queue/destinationqueue.go
@@ -127,6 +127,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
oq.destination, // the destination server name
receipt, // NIDs from federationapi_queue_json table
event.Type,
+ nil, // this will use the default expireEDUTypes map
); err != nil {
logrus.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
return
diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go
index 29254948..b8109b43 100644
--- a/federationapi/storage/interface.go
+++ b/federationapi/storage/interface.go
@@ -16,6 +16,7 @@ package storage
import (
"context"
+ "time"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
"github.com/matrix-org/dendrite/federationapi/types"
@@ -38,7 +39,7 @@ type Database interface {
GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error)
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
- AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt, eduType string) error
+ AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt, eduType string, expireEDUTypes map[string]time.Duration) error
CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
@@ -70,4 +71,6 @@ type Database interface {
// Query the notary for the server keys for the given server. If `optKeyIDs` is not empty, multiple server keys may be returned (between 1 - len(optKeyIDs))
// such that the combination of all server keys will include all the `optKeyIDs`.
GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
+ // DeleteExpiredEDUs cleans up expired EDUs
+ DeleteExpiredEDUs(ctx context.Context) error
}
diff --git a/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go
new file mode 100644
index 00000000..53a7a025
--- /dev/null
+++ b/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go
@@ -0,0 +1,44 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus ADD COLUMN IF NOT EXISTS expires_at BIGINT NOT NULL DEFAULT 0;")
+ if err != nil {
+ return fmt.Errorf("failed to execute upgrade: %w", err)
+ }
+ _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
+ if err != nil {
+ return fmt.Errorf("failed to update queue_edus: %w", err)
+ }
+ return nil
+}
+
+func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;")
+ if err != nil {
+ return fmt.Errorf("failed to execute downgrade: %w", err)
+ }
+ return nil
+}
diff --git a/federationapi/storage/postgres/queue_edus_table.go b/federationapi/storage/postgres/queue_edus_table.go
index 1fedf0ef..d6507e13 100644
--- a/federationapi/storage/postgres/queue_edus_table.go
+++ b/federationapi/storage/postgres/queue_edus_table.go
@@ -19,9 +19,11 @@ import (
"database/sql"
"github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/gomatrixserverlib"
)
const queueEDUsSchema = `
@@ -31,7 +33,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus (
-- The domain part of the user ID the EDU event is for.
server_name TEXT NOT NULL,
-- The JSON NID from the federationsender_queue_edus_json table.
- json_nid BIGINT NOT NULL
+ json_nid BIGINT NOT NULL,
+ -- The expiry time of this edu, if any.
+ expires_at BIGINT NOT NULL DEFAULT 0
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx
@@ -43,8 +47,8 @@ CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx
`
const insertQueueEDUSQL = "" +
- "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid)" +
- " VALUES ($1, $2, $3)"
+ "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid, expires_at)" +
+ " VALUES ($1, $2, $3, $4)"
const deleteQueueEDUSQL = "" +
"DELETE FROM federationsender_queue_edus WHERE server_name = $1 AND json_nid = ANY($2)"
@@ -65,6 +69,12 @@ const selectQueueEDUCountSQL = "" +
const selectQueueServerNamesSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_queue_edus"
+const selectExpiredEDUsSQL = "" +
+ "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
+
+const deleteExpiredEDUsSQL = "" +
+ "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
+
type queueEDUsStatements struct {
db *sql.DB
insertQueueEDUStmt *sql.Stmt
@@ -73,6 +83,8 @@ type queueEDUsStatements struct {
selectQueueEDUReferenceJSONCountStmt *sql.Stmt
selectQueueEDUCountStmt *sql.Stmt
selectQueueEDUServerNamesStmt *sql.Stmt
+ selectExpiredEDUsStmt *sql.Stmt
+ deleteExpiredEDUsStmt *sql.Stmt
}
func NewPostgresQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
@@ -81,27 +93,34 @@ func NewPostgresQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
}
_, err = s.db.Exec(queueEDUsSchema)
if err != nil {
- return
- }
- if s.insertQueueEDUStmt, err = s.db.Prepare(insertQueueEDUSQL); err != nil {
- return
- }
- if s.deleteQueueEDUStmt, err = s.db.Prepare(deleteQueueEDUSQL); err != nil {
- return
- }
- if s.selectQueueEDUStmt, err = s.db.Prepare(selectQueueEDUSQL); err != nil {
- return
- }
- if s.selectQueueEDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueueEDUReferenceJSONCountSQL); err != nil {
- return
+ return s, err
}
- if s.selectQueueEDUCountStmt, err = s.db.Prepare(selectQueueEDUCountSQL); err != nil {
- return
- }
- if s.selectQueueEDUServerNamesStmt, err = s.db.Prepare(selectQueueServerNamesSQL); err != nil {
- return
+
+ m := sqlutil.NewMigrator(db)
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "federationapi: add expiresat column",
+ Up: deltas.UpAddexpiresat,
+ },
+ )
+ if err := m.Up(context.Background()); err != nil {
+ return s, err
}
- return
+
+ return s, nil
+}
+
+func (s *queueEDUsStatements) Prepare() error {
+ return sqlutil.StatementList{
+ {&s.insertQueueEDUStmt, insertQueueEDUSQL},
+ {&s.deleteQueueEDUStmt, deleteQueueEDUSQL},
+ {&s.selectQueueEDUStmt, selectQueueEDUSQL},
+ {&s.selectQueueEDUReferenceJSONCountStmt, selectQueueEDUReferenceJSONCountSQL},
+ {&s.selectQueueEDUCountStmt, selectQueueEDUCountSQL},
+ {&s.selectQueueEDUServerNamesStmt, selectQueueServerNamesSQL},
+ {&s.selectExpiredEDUsStmt, selectExpiredEDUsSQL},
+ {&s.deleteExpiredEDUsStmt, deleteExpiredEDUsSQL},
+ }.Prepare(s.db)
}
func (s *queueEDUsStatements) InsertQueueEDU(
@@ -110,6 +129,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
eduType string,
serverName gomatrixserverlib.ServerName,
nid int64,
+ expiresAt gomatrixserverlib.Timestamp,
) error {
stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
_, err := stmt.ExecContext(
@@ -117,6 +137,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
eduType, // the EDU type
serverName, // destination server name
nid, // JSON blob NID
+ expiresAt, // timestamp of expiry
)
return err
}
@@ -150,7 +171,7 @@ func (s *queueEDUsStatements) SelectQueueEDUs(
}
result = append(result, nid)
}
- return result, nil
+ return result, rows.Err()
}
func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount(
@@ -200,3 +221,33 @@ func (s *queueEDUsStatements) SelectQueueEDUServerNames(
return result, rows.Err()
}
+
+func (s *queueEDUsStatements) SelectExpiredEDUs(
+ ctx context.Context, txn *sql.Tx,
+ expiredBefore gomatrixserverlib.Timestamp,
+) ([]int64, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectExpiredEDUsStmt)
+ rows, err := stmt.QueryContext(ctx, expiredBefore)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectExpiredEDUs: rows.close() failed")
+ var result []int64
+ var nid int64
+ for rows.Next() {
+ if err = rows.Scan(&nid); err != nil {
+ return nil, err
+ }
+ result = append(result, nid)
+ }
+ return result, rows.Err()
+}
+
+func (s *queueEDUsStatements) DeleteExpiredEDUs(
+ ctx context.Context, txn *sql.Tx,
+ expiredBefore gomatrixserverlib.Timestamp,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteExpiredEDUsStmt)
+ _, err := stmt.ExecContext(ctx, expiredBefore)
+ return err
+}
diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go
index 7c2883c1..6e208d09 100644
--- a/federationapi/storage/postgres/storage.go
+++ b/federationapi/storage/postgres/storage.go
@@ -91,6 +91,9 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil {
return nil, err
}
+ if err = queueEDUs.Prepare(); err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
ServerName: serverName,
diff --git a/federationapi/storage/shared/storage_edus.go b/federationapi/storage/shared/storage_edus.go
index 02a23338..b62e5d9c 100644
--- a/federationapi/storage/shared/storage_edus.go
+++ b/federationapi/storage/shared/storage_edus.go
@@ -20,10 +20,21 @@ import (
"encoding/json"
"errors"
"fmt"
+ "time"
"github.com/matrix-org/gomatrixserverlib"
)
+// defaultExpiry for EDUs if not listed below
+var defaultExpiry = time.Hour * 24
+
+// defaultExpireEDUTypes contains EDUs which can/should be expired after a given time
+// if the target server isn't reachable for some reason.
+var defaultExpireEDUTypes = map[string]time.Duration{
+ gomatrixserverlib.MTyping: time.Minute,
+ gomatrixserverlib.MPresence: time.Minute * 10,
+}
+
// AssociateEDUWithDestination creates an association that the
// destination queues will use to determine which JSON blobs to send
// to which servers.
@@ -32,7 +43,21 @@ func (d *Database) AssociateEDUWithDestination(
serverName gomatrixserverlib.ServerName,
receipt *Receipt,
eduType string,
+ expireEDUTypes map[string]time.Duration,
) error {
+ if expireEDUTypes == nil {
+ expireEDUTypes = defaultExpireEDUTypes
+ }
+ expiresAt := gomatrixserverlib.AsTimestamp(time.Now().Add(defaultExpiry))
+ if duration, ok := expireEDUTypes[eduType]; ok {
+ // Keep EDUs for at least x minutes before deleting them
+ expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration))
+ }
+ // We forcibly set m.direct_to_device events to 0, as we always want them
+ // to be delivered. (required for E2EE)
+ if eduType == gomatrixserverlib.MDirectToDevice {
+ expiresAt = 0
+ }
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationQueueEDUs.InsertQueueEDU(
ctx, // context
@@ -40,6 +65,7 @@ func (d *Database) AssociateEDUWithDestination(
eduType, // EDU type for coalescing
serverName, // destination server name
receipt.nid, // NID from the federationapi_queue_json table
+ expiresAt, // The timestamp this EDU will expire
); err != nil {
return fmt.Errorf("InsertQueueEDU: %w", err)
}
@@ -150,3 +176,26 @@ func (d *Database) GetPendingEDUServerNames(
) ([]gomatrixserverlib.ServerName, error) {
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
}
+
+// DeleteExpiredEDUs deletes expired EDUs
+func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ expiredBefore := gomatrixserverlib.AsTimestamp(time.Now())
+ jsonNIDs, err := d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore)
+ if err != nil {
+ return err
+ }
+ if len(jsonNIDs) == 0 {
+ return nil
+ }
+ for i := range jsonNIDs {
+ d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
+ }
+
+ if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil {
+ return err
+ }
+
+ return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore)
+ })
+}
diff --git a/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go
new file mode 100644
index 00000000..c5030163
--- /dev/null
+++ b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go
@@ -0,0 +1,68 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus RENAME TO federationsender_queue_edus_old;")
+ if err != nil {
+ return fmt.Errorf("failed to rename table: %w", err)
+ }
+
+ _, err = tx.ExecContext(ctx, `
+CREATE TABLE IF NOT EXISTS federationsender_queue_edus (
+ edu_type TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ json_nid BIGINT NOT NULL,
+ expires_at BIGINT NOT NULL DEFAULT 0
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx
+ ON federationsender_queue_edus (json_nid, server_name);
+`)
+ if err != nil {
+ return fmt.Errorf("failed to create new table: %w", err)
+ }
+ _, err = tx.ExecContext(ctx, `
+INSERT
+ INTO federationsender_queue_edus (
+ edu_type, server_name, json_nid, expires_at
+ ) SELECT edu_type, server_name, json_nid, 0 FROM federationsender_queue_edus_old;
+`)
+ if err != nil {
+ return fmt.Errorf("failed to update queue_edus: %w", err)
+ }
+ _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
+ if err != nil {
+ return fmt.Errorf("failed to update queue_edus: %w", err)
+ }
+ return nil
+}
+
+func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;")
+ if err != nil {
+ return fmt.Errorf("failed to rename table: %w", err)
+ }
+ return nil
+}
diff --git a/federationapi/storage/sqlite3/queue_edus_table.go b/federationapi/storage/sqlite3/queue_edus_table.go
index f4c84f09..8e7e7901 100644
--- a/federationapi/storage/sqlite3/queue_edus_table.go
+++ b/federationapi/storage/sqlite3/queue_edus_table.go
@@ -20,9 +20,11 @@ import (
"fmt"
"strings"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/gomatrixserverlib"
)
const queueEDUsSchema = `
@@ -32,7 +34,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus (
-- The domain part of the user ID the EDU event is for.
server_name TEXT NOT NULL,
-- The JSON NID from the federationsender_queue_edus_json table.
- json_nid BIGINT NOT NULL
+ json_nid BIGINT NOT NULL,
+ -- The expiry time of this edu, if any.
+ expires_at BIGINT NOT NULL DEFAULT 0
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx
@@ -44,8 +48,8 @@ CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx
`
const insertQueueEDUSQL = "" +
- "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid)" +
- " VALUES ($1, $2, $3)"
+ "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid, expires_at)" +
+ " VALUES ($1, $2, $3, $4)"
const deleteQueueEDUsSQL = "" +
"DELETE FROM federationsender_queue_edus WHERE server_name = $1 AND json_nid IN ($2)"
@@ -66,13 +70,22 @@ const selectQueueEDUCountSQL = "" +
const selectQueueServerNamesSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_queue_edus"
+const selectExpiredEDUsSQL = "" +
+ "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
+
+const deleteExpiredEDUsSQL = "" +
+ "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
+
type queueEDUsStatements struct {
- db *sql.DB
- insertQueueEDUStmt *sql.Stmt
+ db *sql.DB
+ insertQueueEDUStmt *sql.Stmt
+ // deleteQueueEDUStmt *sql.Stmt - prepared at runtime due to variadic
selectQueueEDUStmt *sql.Stmt
selectQueueEDUReferenceJSONCountStmt *sql.Stmt
selectQueueEDUCountStmt *sql.Stmt
selectQueueEDUServerNamesStmt *sql.Stmt
+ selectExpiredEDUsStmt *sql.Stmt
+ deleteExpiredEDUsStmt *sql.Stmt
}
func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
@@ -81,24 +94,33 @@ func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
}
_, err = db.Exec(queueEDUsSchema)
if err != nil {
- return
- }
- if s.insertQueueEDUStmt, err = db.Prepare(insertQueueEDUSQL); err != nil {
- return
- }
- if s.selectQueueEDUStmt, err = db.Prepare(selectQueueEDUSQL); err != nil {
- return
- }
- if s.selectQueueEDUReferenceJSONCountStmt, err = db.Prepare(selectQueueEDUReferenceJSONCountSQL); err != nil {
- return
- }
- if s.selectQueueEDUCountStmt, err = db.Prepare(selectQueueEDUCountSQL); err != nil {
- return
+ return s, err
}
- if s.selectQueueEDUServerNamesStmt, err = db.Prepare(selectQueueServerNamesSQL); err != nil {
- return
+
+ m := sqlutil.NewMigrator(db)
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "federationapi: add expiresat column",
+ Up: deltas.UpAddexpiresat,
+ },
+ )
+ if err := m.Up(context.Background()); err != nil {
+ return s, err
}
- return
+
+ return s, nil
+}
+
+func (s *queueEDUsStatements) Prepare() error {
+ return sqlutil.StatementList{
+ {&s.insertQueueEDUStmt, insertQueueEDUSQL},
+ {&s.selectQueueEDUStmt, selectQueueEDUSQL},
+ {&s.selectQueueEDUReferenceJSONCountStmt, selectQueueEDUReferenceJSONCountSQL},
+ {&s.selectQueueEDUCountStmt, selectQueueEDUCountSQL},
+ {&s.selectQueueEDUServerNamesStmt, selectQueueServerNamesSQL},
+ {&s.selectExpiredEDUsStmt, selectExpiredEDUsSQL},
+ {&s.deleteExpiredEDUsStmt, deleteExpiredEDUsSQL},
+ }.Prepare(s.db)
}
func (s *queueEDUsStatements) InsertQueueEDU(
@@ -107,6 +129,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
eduType string,
serverName gomatrixserverlib.ServerName,
nid int64,
+ expiresAt gomatrixserverlib.Timestamp,
) error {
stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
_, err := stmt.ExecContext(
@@ -114,6 +137,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
eduType, // the EDU type
serverName, // destination server name
nid, // JSON blob NID
+ expiresAt, // timestamp of expiry
)
return err
}
@@ -159,7 +183,7 @@ func (s *queueEDUsStatements) SelectQueueEDUs(
}
result = append(result, nid)
}
- return result, nil
+ return result, rows.Err()
}
func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount(
@@ -209,3 +233,33 @@ func (s *queueEDUsStatements) SelectQueueEDUServerNames(
return result, rows.Err()
}
+
+func (s *queueEDUsStatements) SelectExpiredEDUs(
+ ctx context.Context, txn *sql.Tx,
+ expiredBefore gomatrixserverlib.Timestamp,
+) ([]int64, error) {
+ stmt := sqlutil.TxStmt(txn, s.selectExpiredEDUsStmt)
+ rows, err := stmt.QueryContext(ctx, expiredBefore)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectExpiredEDUs: rows.close() failed")
+ var result []int64
+ var nid int64
+ for rows.Next() {
+ if err = rows.Scan(&nid); err != nil {
+ return nil, err
+ }
+ result = append(result, nid)
+ }
+ return result, rows.Err()
+}
+
+func (s *queueEDUsStatements) DeleteExpiredEDUs(
+ ctx context.Context, txn *sql.Tx,
+ expiredBefore gomatrixserverlib.Timestamp,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteExpiredEDUsStmt)
+ _, err := stmt.ExecContext(ctx, expiredBefore)
+ return err
+}
diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go
index 9594aaec..c89cb6be 100644
--- a/federationapi/storage/sqlite3/storage.go
+++ b/federationapi/storage/sqlite3/storage.go
@@ -90,6 +90,9 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil {
return nil, err
}
+ if err = queueEDUs.Prepare(); err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
ServerName: serverName,
diff --git a/federationapi/storage/storage_test.go b/federationapi/storage/storage_test.go
new file mode 100644
index 00000000..7eba2cbe
--- /dev/null
+++ b/federationapi/storage/storage_test.go
@@ -0,0 +1,81 @@
+package storage_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/matrix-org/dendrite/federationapi/storage"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
+)
+
+func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
+ b, baseClose := testrig.CreateBaseDendrite(t, dbType)
+ connStr, dbClose := test.PrepareDBConnectionString(t, dbType)
+ db, err := storage.NewDatabase(b, &config.DatabaseOptions{
+ ConnectionString: config.DataSource(connStr),
+ }, b.Caches, b.Cfg.Global.ServerName)
+ if err != nil {
+ t.Fatalf("NewDatabase returned %s", err)
+ }
+ return db, func() {
+ dbClose()
+ baseClose()
+ }
+}
+
+func TestExpireEDUs(t *testing.T) {
+ var expireEDUTypes = map[string]time.Duration{
+ gomatrixserverlib.MReceipt: time.Millisecond,
+ }
+
+ ctx := context.Background()
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ db, close := mustCreateFederationDatabase(t, dbType)
+ defer close()
+ // insert some data
+ for i := 0; i < 100; i++ {
+ receipt, err := db.StoreJSON(ctx, "{}")
+ assert.NoError(t, err)
+
+ err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MReceipt, expireEDUTypes)
+ assert.NoError(t, err)
+ }
+ // add data without expiry
+ receipt, err := db.StoreJSON(ctx, "{}")
+ assert.NoError(t, err)
+
+ // m.read_marker gets the default expiry of 24h, so won't be deleted further down in this test
+ err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.read_marker", expireEDUTypes)
+ assert.NoError(t, err)
+
+ // Delete expired EDUs
+ err = db.DeleteExpiredEDUs(ctx)
+ assert.NoError(t, err)
+
+ // verify the data is gone
+ data, err := db.GetPendingEDUs(ctx, "localhost", 100)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(data))
+
+ // check that m.direct_to_device is never expired
+ receipt, err = db.StoreJSON(ctx, "{}")
+ assert.NoError(t, err)
+
+ err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MDirectToDevice, expireEDUTypes)
+ assert.NoError(t, err)
+
+ err = db.DeleteExpiredEDUs(ctx)
+ assert.NoError(t, err)
+
+ // We should get two EDUs, the m.read_marker and the m.direct_to_device
+ data, err = db.GetPendingEDUs(ctx, "localhost", 100)
+ assert.NoError(t, err)
+ assert.Equal(t, 2, len(data))
+ })
+}
diff --git a/federationapi/storage/tables/interface.go b/federationapi/storage/tables/interface.go
index 19357393..3c116a1d 100644
--- a/federationapi/storage/tables/interface.go
+++ b/federationapi/storage/tables/interface.go
@@ -34,12 +34,15 @@ type FederationQueuePDUs interface {
}
type FederationQueueEDUs interface {
- InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error
+ InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64, expiresAt gomatrixserverlib.Timestamp) error
DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error)
SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)
SelectQueueEDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error)
SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
+ SelectExpiredEDUs(ctx context.Context, txn *sql.Tx, expiredBefore gomatrixserverlib.Timestamp) ([]int64, error)
+ DeleteExpiredEDUs(ctx context.Context, txn *sql.Tx, expiredBefore gomatrixserverlib.Timestamp) error
+ Prepare() error
}
type FederationQueueJSON interface {