aboutsummaryrefslogtreecommitdiff
path: root/userapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-03 16:45:06 +0000
committerGitHub <noreply@github.com>2022-03-03 16:45:06 +0000
commit5592322e13d0bf741130425079e37979f7637564 (patch)
tree10f52efd3a33f520085d643c1498c9407f9b969f /userapi
parentc44029f269d0b17102dbbf31e50669c7c93fe50e (diff)
Clean old notifications regularly (#2244)
* Clean old notifications regularly We'll keep highlights for a month and non-highlights for a day, to stop the `userapi_notifications` table from growing indefinitely. We'll also allow storing events even if no pushers are present, because apparently Element Web expects to work that way. * Fix the milliseconds * Use process context * Update sytest lists * Fix build issue
Diffstat (limited to 'userapi')
-rw-r--r--userapi/consumers/syncapi_streamevent.go3
-rw-r--r--userapi/storage/interface.go1
-rw-r--r--userapi/storage/postgres/notifications_table.go28
-rw-r--r--userapi/storage/shared/storage.go4
-rw-r--r--userapi/storage/sqlite3/notifications_table.go28
-rw-r--r--userapi/storage/tables/interface.go1
-rw-r--r--userapi/userapi.go12
7 files changed, 62 insertions, 15 deletions
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go
index d86078cb..11081327 100644
--- a/userapi/consumers/syncapi_streamevent.go
+++ b/userapi/consumers/syncapi_streamevent.go
@@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g
// removing it means we can send all notifications to
// e.g. Element's Push gateway in one go.
for _, mem := range members {
- if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 {
- continue
- }
if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
log.WithFields(log.Fields{
"localpart": mem.Localpart,
diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go
index 6d22fea9..77706710 100644
--- a/userapi/storage/interface.go
+++ b/userapi/storage/interface.go
@@ -97,6 +97,7 @@ type Database interface {
GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error)
GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error)
GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error)
+ DeleteOldNotifications(ctx context.Context) error
UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error
GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error)
diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go
index 7bcc0f9c..a27c1125 100644
--- a/userapi/storage/postgres/notifications_table.go
+++ b/userapi/storage/postgres/notifications_table.go
@@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
+ "time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@@ -28,12 +29,13 @@ import (
)
type notificationsStatements struct {
- insertStmt *sql.Stmt
- deleteUpToStmt *sql.Stmt
- updateReadStmt *sql.Stmt
- selectStmt *sql.Stmt
- selectCountStmt *sql.Stmt
- selectRoomCountsStmt *sql.Stmt
+ insertStmt *sql.Stmt
+ deleteUpToStmt *sql.Stmt
+ updateReadStmt *sql.Stmt
+ selectStmt *sql.Stmt
+ selectCountStmt *sql.Stmt
+ selectRoomCountsStmt *sql.Stmt
+ cleanNotificationsStmt *sql.Stmt
}
const notificationSchema = `
@@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read"
+const cleanNotificationsSQL = "" +
+ "DELETE FROM userapi_notifications WHERE" +
+ " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
+
func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{}
_, err := db.Exec(notificationSchema)
@@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error)
{&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
+ {&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db)
}
+func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
+ _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
+ ctx,
+ time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
+ time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
+ )
+ return err
+}
+
// Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS
diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go
index a58974b4..febf0322 100644
--- a/userapi/storage/shared/storage.go
+++ b/userapi/storage/shared/storage.go
@@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo
return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID)
}
+func (d *Database) DeleteOldNotifications(ctx context.Context) error {
+ return d.Notifications.Clean(ctx, nil)
+}
+
func (d *Database) UpsertPusher(
ctx context.Context, p api.Pusher, localpart string,
) error {
diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go
index fcfb1aad..df826025 100644
--- a/userapi/storage/sqlite3/notifications_table.go
+++ b/userapi/storage/sqlite3/notifications_table.go
@@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
+ "time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@@ -28,12 +29,13 @@ import (
)
type notificationsStatements struct {
- insertStmt *sql.Stmt
- deleteUpToStmt *sql.Stmt
- updateReadStmt *sql.Stmt
- selectStmt *sql.Stmt
- selectCountStmt *sql.Stmt
- selectRoomCountsStmt *sql.Stmt
+ insertStmt *sql.Stmt
+ deleteUpToStmt *sql.Stmt
+ updateReadStmt *sql.Stmt
+ selectStmt *sql.Stmt
+ selectCountStmt *sql.Stmt
+ selectRoomCountsStmt *sql.Stmt
+ cleanNotificationsStmt *sql.Stmt
}
const notificationSchema = `
@@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read"
+const cleanNotificationsSQL = "" +
+ "DELETE FROM userapi_notifications WHERE" +
+ " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
+
func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{}
_, err := db.Exec(notificationSchema)
@@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
{&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
+ {&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db)
}
+func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
+ _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
+ ctx,
+ time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
+ time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
+ )
+ return err
+}
+
// Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS
diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go
index 815e5119..99c907b8 100644
--- a/userapi/storage/tables/interface.go
+++ b/userapi/storage/tables/interface.go
@@ -103,6 +103,7 @@ type PusherTable interface {
}
type NotificationTable interface {
+ Clean(ctx context.Context, txn *sql.Tx) error
Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error
DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error)
UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error)
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 8dbc095f..251a4eda 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -15,6 +15,8 @@
package userapi
import (
+ "time"
+
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/pushgateway"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
@@ -79,5 +81,15 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start user API streamed event consumer")
}
+ var cleanOldNotifs func()
+ cleanOldNotifs = func() {
+ logrus.Infof("Cleaning old notifications")
+ if err := db.DeleteOldNotifications(base.Context()); err != nil {
+ logrus.WithError(err).Error("Failed to clean old notifications")
+ }
+ time.AfterFunc(time.Hour, cleanOldNotifs)
+ }
+ time.AfterFunc(time.Minute, cleanOldNotifs)
+
return userAPI
}