diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-03-03 16:45:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-03 16:45:06 +0000 |
commit | 5592322e13d0bf741130425079e37979f7637564 (patch) | |
tree | 10f52efd3a33f520085d643c1498c9407f9b969f /userapi | |
parent | c44029f269d0b17102dbbf31e50669c7c93fe50e (diff) |
Clean old notifications regularly (#2244)
* Clean old notifications regularly
We'll keep highlights for a month and non-highlights for a day, to stop the `userapi_notifications` table from growing indefinitely.
We'll also allow storing events even if no pushers are present, because apparently Element Web expects to work that way.
* Fix the milliseconds
* Use process context
* Update sytest lists
* Fix build issue
Diffstat (limited to 'userapi')
-rw-r--r-- | userapi/consumers/syncapi_streamevent.go | 3 | ||||
-rw-r--r-- | userapi/storage/interface.go | 1 | ||||
-rw-r--r-- | userapi/storage/postgres/notifications_table.go | 28 | ||||
-rw-r--r-- | userapi/storage/shared/storage.go | 4 | ||||
-rw-r--r-- | userapi/storage/sqlite3/notifications_table.go | 28 | ||||
-rw-r--r-- | userapi/storage/tables/interface.go | 1 | ||||
-rw-r--r-- | userapi/userapi.go | 12 |
7 files changed, 62 insertions, 15 deletions
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index d86078cb..11081327 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g // removing it means we can send all notifications to // e.g. Element's Push gateway in one go. for _, mem := range members { - if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 { - continue - } if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil { log.WithFields(log.Fields{ "localpart": mem.Localpart, diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go index 6d22fea9..77706710 100644 --- a/userapi/storage/interface.go +++ b/userapi/storage/interface.go @@ -97,6 +97,7 @@ type Database interface { GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) + DeleteOldNotifications(ctx context.Context) error UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go index 7bcc0f9c..a27c1125 100644 --- a/userapi/storage/postgres/notifications_table.go +++ b/userapi/storage/postgres/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index a58974b4..febf0322 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID) } +func (d *Database) DeleteOldNotifications(ctx context.Context) error { + return d.Notifications.Clean(ctx, nil) +} + func (d *Database) UpsertPusher( ctx context.Context, p api.Pusher, localpart string, ) error { diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go index fcfb1aad..df826025 100644 --- a/userapi/storage/sqlite3/notifications_table.go +++ b/userapi/storage/sqlite3/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go index 815e5119..99c907b8 100644 --- a/userapi/storage/tables/interface.go +++ b/userapi/storage/tables/interface.go @@ -103,6 +103,7 @@ type PusherTable interface { } type NotificationTable interface { + Clean(ctx context.Context, txn *sql.Tx) error Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) diff --git a/userapi/userapi.go b/userapi/userapi.go index 8dbc095f..251a4eda 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -15,6 +15,8 @@ package userapi import ( + "time" + "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/pushgateway" keyapi "github.com/matrix-org/dendrite/keyserver/api" @@ -79,5 +81,15 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start user API streamed event consumer") } + var cleanOldNotifs func() + cleanOldNotifs = func() { + logrus.Infof("Cleaning old notifications") + if err := db.DeleteOldNotifications(base.Context()); err != nil { + logrus.WithError(err).Error("Failed to clean old notifications") + } + time.AfterFunc(time.Hour, cleanOldNotifs) + } + time.AfterFunc(time.Minute, cleanOldNotifs) + return userAPI } |