diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-08-09 11:15:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-09 11:15:58 +0200 |
commit | 240ae257deb74b7be8a17500b77d5e1bca56e8f5 (patch) | |
tree | aff298447bf98b601cc6bfb7b14f1ada006db83a /federationapi/storage/sqlite3 | |
parent | e930959e4911453ec51bba0b1469f4b15c594f32 (diff) |
Add housekeeping function to delete old/expired EDUs (#2399)
* Add housekeeping function to delete old/expired EDUs
* Add migrations
* Evict EDUs from cache
* Fix queries
* Fix upgrade
* Use map[string]time.Duration to specify different expiry times
* Fix copy & paste mistake
* Set expires_at to tomorrow
* Don't allow NULL
* Add comment
* Add tests
* Use new testrig package
* Fix migrations
* Never expire m.direct_to_device
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Co-authored-by: kegsay <kegan@matrix.org>
Diffstat (limited to 'federationapi/storage/sqlite3')
-rw-r--r-- | federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go | 68 | ||||
-rw-r--r-- | federationapi/storage/sqlite3/queue_edus_table.go | 100 | ||||
-rw-r--r-- | federationapi/storage/sqlite3/storage.go | 3 |
3 files changed, 148 insertions, 23 deletions
diff --git a/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go new file mode 100644 index 00000000..c5030163 --- /dev/null +++ b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go @@ -0,0 +1,68 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus RENAME TO federationsender_queue_edus_old;") + if err != nil { + return fmt.Errorf("failed to rename table: %w", err) + } + + _, err = tx.ExecContext(ctx, ` +CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( + edu_type TEXT NOT NULL, + server_name TEXT NOT NULL, + json_nid BIGINT NOT NULL, + expires_at BIGINT NOT NULL DEFAULT 0 +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx + ON federationsender_queue_edus (json_nid, server_name); +`) + if err != nil { + return fmt.Errorf("failed to create new table: %w", err) + } + _, err = tx.ExecContext(ctx, ` +INSERT + INTO federationsender_queue_edus ( + edu_type, server_name, json_nid, expires_at + ) SELECT edu_type, server_name, json_nid, 0 FROM federationsender_queue_edus_old; +`) + if err != nil { + return fmt.Errorf("failed to update queue_edus: %w", err) + } + _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) + if err != nil { + return fmt.Errorf("failed to update queue_edus: %w", err) + } + return nil +} + +func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;") + if err != nil { + return fmt.Errorf("failed to rename table: %w", err) + } + return nil +} diff --git a/federationapi/storage/sqlite3/queue_edus_table.go b/federationapi/storage/sqlite3/queue_edus_table.go index f4c84f09..8e7e7901 100644 --- a/federationapi/storage/sqlite3/queue_edus_table.go +++ b/federationapi/storage/sqlite3/queue_edus_table.go @@ -20,9 +20,11 @@ import ( "fmt" "strings" + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" ) const queueEDUsSchema = ` @@ -32,7 +34,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( -- The domain part of the user ID the EDU event is for. server_name TEXT NOT NULL, -- The JSON NID from the federationsender_queue_edus_json table. - json_nid BIGINT NOT NULL + json_nid BIGINT NOT NULL, + -- The expiry time of this edu, if any. + expires_at BIGINT NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx @@ -44,8 +48,8 @@ CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx ` const insertQueueEDUSQL = "" + - "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid)" + - " VALUES ($1, $2, $3)" + "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid, expires_at)" + + " VALUES ($1, $2, $3, $4)" const deleteQueueEDUsSQL = "" + "DELETE FROM federationsender_queue_edus WHERE server_name = $1 AND json_nid IN ($2)" @@ -66,13 +70,22 @@ const selectQueueEDUCountSQL = "" + const selectQueueServerNamesSQL = "" + "SELECT DISTINCT server_name FROM federationsender_queue_edus" +const selectExpiredEDUsSQL = "" + + "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" + +const deleteExpiredEDUsSQL = "" + + "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" + type queueEDUsStatements struct { - db *sql.DB - insertQueueEDUStmt *sql.Stmt + db *sql.DB + insertQueueEDUStmt *sql.Stmt + // deleteQueueEDUStmt *sql.Stmt - prepared at runtime due to variadic selectQueueEDUStmt *sql.Stmt selectQueueEDUReferenceJSONCountStmt *sql.Stmt selectQueueEDUCountStmt *sql.Stmt selectQueueEDUServerNamesStmt *sql.Stmt + selectExpiredEDUsStmt *sql.Stmt + deleteExpiredEDUsStmt *sql.Stmt } func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { @@ -81,24 +94,33 @@ func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { } _, err = db.Exec(queueEDUsSchema) if err != nil { - return - } - if s.insertQueueEDUStmt, err = db.Prepare(insertQueueEDUSQL); err != nil { - return - } - if s.selectQueueEDUStmt, err = db.Prepare(selectQueueEDUSQL); err != nil { - return - } - if s.selectQueueEDUReferenceJSONCountStmt, err = db.Prepare(selectQueueEDUReferenceJSONCountSQL); err != nil { - return - } - if s.selectQueueEDUCountStmt, err = db.Prepare(selectQueueEDUCountSQL); err != nil { - return + return s, err } - if s.selectQueueEDUServerNamesStmt, err = db.Prepare(selectQueueServerNamesSQL); err != nil { - return + + m := sqlutil.NewMigrator(db) + m.AddMigrations( + sqlutil.Migration{ + Version: "federationapi: add expiresat column", + Up: deltas.UpAddexpiresat, + }, + ) + if err := m.Up(context.Background()); err != nil { + return s, err } - return + + return s, nil +} + +func (s *queueEDUsStatements) Prepare() error { + return sqlutil.StatementList{ + {&s.insertQueueEDUStmt, insertQueueEDUSQL}, + {&s.selectQueueEDUStmt, selectQueueEDUSQL}, + {&s.selectQueueEDUReferenceJSONCountStmt, selectQueueEDUReferenceJSONCountSQL}, + {&s.selectQueueEDUCountStmt, selectQueueEDUCountSQL}, + {&s.selectQueueEDUServerNamesStmt, selectQueueServerNamesSQL}, + {&s.selectExpiredEDUsStmt, selectExpiredEDUsSQL}, + {&s.deleteExpiredEDUsStmt, deleteExpiredEDUsSQL}, + }.Prepare(s.db) } func (s *queueEDUsStatements) InsertQueueEDU( @@ -107,6 +129,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType string, serverName gomatrixserverlib.ServerName, nid int64, + expiresAt gomatrixserverlib.Timestamp, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( @@ -114,6 +137,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType, // the EDU type serverName, // destination server name nid, // JSON blob NID + expiresAt, // timestamp of expiry ) return err } @@ -159,7 +183,7 @@ func (s *queueEDUsStatements) SelectQueueEDUs( } result = append(result, nid) } - return result, nil + return result, rows.Err() } func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( @@ -209,3 +233,33 @@ func (s *queueEDUsStatements) SelectQueueEDUServerNames( return result, rows.Err() } + +func (s *queueEDUsStatements) SelectExpiredEDUs( + ctx context.Context, txn *sql.Tx, + expiredBefore gomatrixserverlib.Timestamp, +) ([]int64, error) { + stmt := sqlutil.TxStmt(txn, s.selectExpiredEDUsStmt) + rows, err := stmt.QueryContext(ctx, expiredBefore) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectExpiredEDUs: rows.close() failed") + var result []int64 + var nid int64 + for rows.Next() { + if err = rows.Scan(&nid); err != nil { + return nil, err + } + result = append(result, nid) + } + return result, rows.Err() +} + +func (s *queueEDUsStatements) DeleteExpiredEDUs( + ctx context.Context, txn *sql.Tx, + expiredBefore gomatrixserverlib.Timestamp, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteExpiredEDUsStmt) + _, err := stmt.ExecContext(ctx, expiredBefore) + return err +} diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index 9594aaec..c89cb6be 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -90,6 +90,9 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, if err != nil { return nil, err } + if err = queueEDUs.Prepare(); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, ServerName: serverName, |