aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/backwards_extremities_table.go27
-rw-r--r--syncapi/storage/postgres/invites_table.go31
-rw-r--r--syncapi/storage/postgres/memberships_table.go22
-rw-r--r--syncapi/storage/postgres/notification_data_table.go12
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go12
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go42
-rw-r--r--syncapi/storage/postgres/peeks_table.go39
-rw-r--r--syncapi/storage/postgres/receipt_table.go27
8 files changed, 135 insertions, 77 deletions
diff --git a/syncapi/storage/postgres/backwards_extremities_table.go b/syncapi/storage/postgres/backwards_extremities_table.go
index 8fc92091..c20d860a 100644
--- a/syncapi/storage/postgres/backwards_extremities_table.go
+++ b/syncapi/storage/postgres/backwards_extremities_table.go
@@ -47,10 +47,14 @@ const selectBackwardExtremitiesForRoomSQL = "" +
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
+const purgeBackwardExtremitiesSQL = "" +
+ "DELETE FROM syncapi_backward_extremities WHERE room_id = $1"
+
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
+ purgeBackwardExtremitiesStmt *sql.Stmt
}
func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
@@ -59,16 +63,12 @@ func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremiti
if err != nil {
return nil, err
}
- if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil {
- return nil, err
- }
- if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
- return nil, err
- }
- if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.insertBackwardExtremityStmt, insertBackwardExtremitySQL},
+ {&s.selectBackwardExtremitiesForRoomStmt, selectBackwardExtremitiesForRoomSQL},
+ {&s.deleteBackwardExtremityStmt, deleteBackwardExtremitySQL},
+ {&s.purgeBackwardExtremitiesStmt, purgeBackwardExtremitiesSQL},
+ }.Prepare(db)
}
func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
@@ -106,3 +106,10 @@ func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return
}
+
+func (s *backwardExtremitiesStatements) PurgeBackwardExtremities(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeBackwardExtremitiesStmt).ExecContext(ctx, roomID)
+ return err
+}
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index aada70d5..151bffa5 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -62,11 +62,15 @@ const selectInviteEventsInRangeSQL = "" +
const selectMaxInviteIDSQL = "" +
"SELECT MAX(id) FROM syncapi_invite_events"
+const purgeInvitesSQL = "" +
+ "DELETE FROM syncapi_invite_events WHERE room_id = $1"
+
type inviteEventsStatements struct {
insertInviteEventStmt *sql.Stmt
selectInviteEventsInRangeStmt *sql.Stmt
deleteInviteEventStmt *sql.Stmt
selectMaxInviteIDStmt *sql.Stmt
+ purgeInvitesStmt *sql.Stmt
}
func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
@@ -75,19 +79,13 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
if err != nil {
return nil, err
}
- if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil {
- return nil, err
- }
- if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil {
- return nil, err
- }
- if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil {
- return nil, err
- }
- if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.insertInviteEventStmt, insertInviteEventSQL},
+ {&s.selectInviteEventsInRangeStmt, selectInviteEventsInRangeSQL},
+ {&s.deleteInviteEventStmt, deleteInviteEventSQL},
+ {&s.selectMaxInviteIDStmt, selectMaxInviteIDSQL},
+ {&s.purgeInvitesStmt, purgeInvitesSQL},
+ }.Prepare(db)
}
func (s *inviteEventsStatements) InsertInviteEvent(
@@ -181,3 +179,10 @@ func (s *inviteEventsStatements) SelectMaxInviteID(
}
return
}
+
+func (s *inviteEventsStatements) PurgeInvites(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomID)
+ return err
+}
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index ac44b235..47833893 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -65,18 +65,22 @@ const selectMembershipCountSQL = "" +
const selectMembershipBeforeSQL = "" +
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
+const purgeMembershipsSQL = "" +
+ "DELETE FROM syncapi_memberships WHERE room_id = $1"
+
const selectMembersSQL = `
-SELECT event_id FROM (
- SELECT DISTINCT ON (room_id, user_id) room_id, user_id, event_id, membership FROM syncapi_memberships WHERE room_id = $1 AND topological_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC
-) t
-WHERE ($3::text IS NULL OR t.membership = $3)
- AND ($4::text IS NULL OR t.membership <> $4)
+ SELECT event_id FROM (
+ SELECT DISTINCT ON (room_id, user_id) room_id, user_id, event_id, membership FROM syncapi_memberships WHERE room_id = $1 AND topological_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC
+ ) t
+ WHERE ($3::text IS NULL OR t.membership = $3)
+ AND ($4::text IS NULL OR t.membership <> $4)
`
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
selectMembershipForUserStmt *sql.Stmt
+ purgeMembershipsStmt *sql.Stmt
selectMembersStmt *sql.Stmt
}
@@ -90,6 +94,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
+ {&s.purgeMembershipsStmt, purgeMembershipsSQL},
{&s.selectMembersStmt, selectMembersSQL},
}.Prepare(db)
}
@@ -139,6 +144,13 @@ func (s *membershipsStatements) SelectMembershipForUser(
return membership, topologyPos, nil
}
+func (s *membershipsStatements) PurgeMemberships(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomID)
+ return err
+}
+
func (s *membershipsStatements) SelectMemberships(
ctx context.Context, txn *sql.Tx,
roomID string, pos types.TopologyToken,
diff --git a/syncapi/storage/postgres/notification_data_table.go b/syncapi/storage/postgres/notification_data_table.go
index 2c7b2480..7edfd54a 100644
--- a/syncapi/storage/postgres/notification_data_table.go
+++ b/syncapi/storage/postgres/notification_data_table.go
@@ -37,6 +37,7 @@ func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, erro
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCountsForRooms, selectUserUnreadNotificationsForRooms},
{&r.selectMaxID, selectMaxNotificationIDSQL},
+ {&r.purgeNotificationData, purgeNotificationDataSQL},
}.Prepare(db)
}
@@ -44,6 +45,7 @@ type notificationDataStatements struct {
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCountsForRooms *sql.Stmt
selectMaxID *sql.Stmt
+ purgeNotificationData *sql.Stmt
}
const notificationDataSchema = `
@@ -70,6 +72,9 @@ const selectUserUnreadNotificationsForRooms = `SELECT room_id, notification_coun
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
+const purgeNotificationDataSQL = "" +
+ "DELETE FROM syncapi_notification_data WHERE room_id = $1"
+
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = sqlutil.TxStmt(txn, r.upsertRoomUnreadCounts).QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
return
@@ -106,3 +111,10 @@ func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.T
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
return id, err
}
+
+func (s *notificationDataStatements) PurgeNotificationData(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeNotificationData).ExecContext(ctx, roomID)
+ return err
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 3b69b26f..0075fc8d 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -176,6 +176,9 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
+const purgeEventsSQL = "" +
+ "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
+
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
type outputRoomEventsStatements struct {
@@ -193,6 +196,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
+ purgeEventsStmt *sql.Stmt
selectSearchStmt *sql.Stmt
}
@@ -230,6 +234,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ {&s.purgeEventsStmt, purgeEventsSQL},
{&s.selectSearchStmt, selectSearchSQL},
}.Prepare(db)
}
@@ -658,6 +663,13 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return result, rows.Err()
}
+func (s *outputRoomEventsStatements) PurgeEvents(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomID)
+ return err
+}
+
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
if err != nil {
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index d0e99f26..2382fca5 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -18,11 +18,12 @@ import (
"context"
"database/sql"
+ "github.com/matrix-org/gomatrixserverlib"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const outputRoomEventsTopologySchema = `
@@ -71,6 +72,9 @@ const selectStreamToTopologicalPositionAscSQL = "" +
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
+const purgeEventsTopologySQL = "" +
+ "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
+
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
@@ -78,6 +82,7 @@ type outputRoomEventsTopologyStatements struct {
selectPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
+ purgeEventsTopologyStmt *sql.Stmt
}
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
@@ -86,25 +91,15 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if err != nil {
return nil, err
}
- if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
- return nil, err
- }
- if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
- return nil, err
- }
- if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
- return nil, err
- }
- if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
- return nil, err
- }
- if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
- return nil, err
- }
- if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.insertEventInTopologyStmt, insertEventInTopologySQL},
+ {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL},
+ {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL},
+ {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL},
+ {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL},
+ {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL},
+ {&s.purgeEventsTopologyStmt, purgeEventsTopologySQL},
+ }.Prepare(db)
}
// InsertEventInTopology inserts the given event in the room's topology, based
@@ -177,3 +172,10 @@ func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
}
return
}
+
+func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeEventsTopologyStmt).ExecContext(ctx, roomID)
+ return err
+}
diff --git a/syncapi/storage/postgres/peeks_table.go b/syncapi/storage/postgres/peeks_table.go
index e20a4882..64183073 100644
--- a/syncapi/storage/postgres/peeks_table.go
+++ b/syncapi/storage/postgres/peeks_table.go
@@ -65,6 +65,9 @@ const selectPeekingDevicesSQL = "" +
const selectMaxPeekIDSQL = "" +
"SELECT MAX(id) FROM syncapi_peeks"
+const purgePeeksSQL = "" +
+ "DELETE FROM syncapi_peeks WHERE room_id = $1"
+
type peekStatements struct {
db *sql.DB
insertPeekStmt *sql.Stmt
@@ -73,6 +76,7 @@ type peekStatements struct {
selectPeeksInRangeStmt *sql.Stmt
selectPeekingDevicesStmt *sql.Stmt
selectMaxPeekIDStmt *sql.Stmt
+ purgePeeksStmt *sql.Stmt
}
func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
@@ -83,25 +87,15 @@ func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
s := &peekStatements{
db: db,
}
- if s.insertPeekStmt, err = db.Prepare(insertPeekSQL); err != nil {
- return nil, err
- }
- if s.deletePeekStmt, err = db.Prepare(deletePeekSQL); err != nil {
- return nil, err
- }
- if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil {
- return nil, err
- }
- if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil {
- return nil, err
- }
- if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil {
- return nil, err
- }
- if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.insertPeekStmt, insertPeekSQL},
+ {&s.deletePeekStmt, deletePeekSQL},
+ {&s.deletePeeksStmt, deletePeeksSQL},
+ {&s.selectPeeksInRangeStmt, selectPeeksInRangeSQL},
+ {&s.selectPeekingDevicesStmt, selectPeekingDevicesSQL},
+ {&s.selectMaxPeekIDStmt, selectMaxPeekIDSQL},
+ {&s.purgePeeksStmt, purgePeeksSQL},
+ }.Prepare(db)
}
func (s *peekStatements) InsertPeek(
@@ -184,3 +178,10 @@ func (s *peekStatements) SelectMaxPeekID(
}
return
}
+
+func (s *peekStatements) PurgePeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgePeeksStmt).ExecContext(ctx, roomID)
+ return err
+}
diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go
index 327a7a37..0fcbebfc 100644
--- a/syncapi/storage/postgres/receipt_table.go
+++ b/syncapi/storage/postgres/receipt_table.go
@@ -62,11 +62,15 @@ const selectRoomReceipts = "" +
const selectMaxReceiptIDSQL = "" +
"SELECT MAX(id) FROM syncapi_receipts"
+const purgeReceiptsSQL = "" +
+ "DELETE FROM syncapi_receipts WHERE room_id = $1"
+
type receiptStatements struct {
db *sql.DB
upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt
selectMaxReceiptID *sql.Stmt
+ purgeReceiptsStmt *sql.Stmt
}
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
@@ -86,16 +90,12 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
r := &receiptStatements{
db: db,
}
- if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
- return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
- }
- if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
- return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
- }
- if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
- return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
- }
- return r, nil
+ return r, sqlutil.StatementList{
+ {&r.upsertReceipt, upsertReceipt},
+ {&r.selectRoomReceipts, selectRoomReceipts},
+ {&r.selectMaxReceiptID, selectMaxReceiptIDSQL},
+ {&r.purgeReceiptsStmt, purgeReceiptsSQL},
+ }.Prepare(db)
}
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
@@ -138,3 +138,10 @@ func (s *receiptStatements) SelectMaxReceiptID(
}
return
}
+
+func (s *receiptStatements) PurgeReceipts(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeReceiptsStmt).ExecContext(ctx, roomID)
+ return err
+}