aboutsummaryrefslogtreecommitdiff
path: root/roomserver/storage
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver/storage')
-rw-r--r--roomserver/storage/interface.go1
-rw-r--r--roomserver/storage/postgres/purge_statements.go133
-rw-r--r--roomserver/storage/postgres/rooms_table.go14
-rw-r--r--roomserver/storage/postgres/storage.go5
-rw-r--r--roomserver/storage/shared/storage.go16
-rw-r--r--roomserver/storage/sqlite3/purge_statements.go153
-rw-r--r--roomserver/storage/sqlite3/rooms_table.go14
-rw-r--r--roomserver/storage/sqlite3/state_block_table.go3
-rw-r--r--roomserver/storage/sqlite3/state_snapshot_table.go33
-rw-r--r--roomserver/storage/sqlite3/storage.go6
-rw-r--r--roomserver/storage/tables/interface.go7
11 files changed, 382 insertions, 3 deletions
diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go
index 92bc2e66..e0b9c56b 100644
--- a/roomserver/storage/interface.go
+++ b/roomserver/storage/interface.go
@@ -173,5 +173,6 @@ type Database interface {
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error)
GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error)
+ PurgeRoom(ctx context.Context, roomID string) error
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
}
diff --git a/roomserver/storage/postgres/purge_statements.go b/roomserver/storage/postgres/purge_statements.go
new file mode 100644
index 00000000..efba439b
--- /dev/null
+++ b/roomserver/storage/postgres/purge_statements.go
@@ -0,0 +1,133 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+const purgeEventJSONSQL = "" +
+ "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" +
+ " SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeEventsSQL = "" +
+ "DELETE FROM roomserver_events WHERE room_nid = $1"
+
+const purgeInvitesSQL = "" +
+ "DELETE FROM roomserver_invites WHERE room_nid = $1"
+
+const purgeMembershipsSQL = "" +
+ "DELETE FROM roomserver_membership WHERE room_nid = $1"
+
+const purgePreviousEventsSQL = "" +
+ "DELETE FROM roomserver_previous_events WHERE event_nids && ANY(" +
+ " SELECT ARRAY_AGG(event_nid) FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgePublishedSQL = "" +
+ "DELETE FROM roomserver_published WHERE room_id = $1"
+
+const purgeRedactionsSQL = "" +
+ "DELETE FROM roomserver_redactions WHERE redaction_event_id = ANY(" +
+ " SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeRoomAliasesSQL = "" +
+ "DELETE FROM roomserver_room_aliases WHERE room_id = $1"
+
+const purgeRoomSQL = "" +
+ "DELETE FROM roomserver_rooms WHERE room_nid = $1"
+
+const purgeStateBlockEntriesSQL = "" +
+ "DELETE FROM roomserver_state_block WHERE state_block_nid = ANY(" +
+ " SELECT DISTINCT UNNEST(state_block_nids) FROM roomserver_state_snapshots WHERE room_nid = $1" +
+ ")"
+
+const purgeStateSnapshotEntriesSQL = "" +
+ "DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
+
+type purgeStatements struct {
+ purgeEventJSONStmt *sql.Stmt
+ purgeEventsStmt *sql.Stmt
+ purgeInvitesStmt *sql.Stmt
+ purgeMembershipsStmt *sql.Stmt
+ purgePreviousEventsStmt *sql.Stmt
+ purgePublishedStmt *sql.Stmt
+ purgeRedactionStmt *sql.Stmt
+ purgeRoomAliasesStmt *sql.Stmt
+ purgeRoomStmt *sql.Stmt
+ purgeStateBlockEntriesStmt *sql.Stmt
+ purgeStateSnapshotEntriesStmt *sql.Stmt
+}
+
+func PreparePurgeStatements(db *sql.DB) (*purgeStatements, error) {
+ s := &purgeStatements{}
+
+ return s, sqlutil.StatementList{
+ {&s.purgeEventJSONStmt, purgeEventJSONSQL},
+ {&s.purgeEventsStmt, purgeEventsSQL},
+ {&s.purgeInvitesStmt, purgeInvitesSQL},
+ {&s.purgeMembershipsStmt, purgeMembershipsSQL},
+ {&s.purgePublishedStmt, purgePublishedSQL},
+ {&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
+ {&s.purgeRedactionStmt, purgeRedactionsSQL},
+ {&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
+ {&s.purgeRoomStmt, purgeRoomSQL},
+ {&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
+ {&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
+ }.Prepare(db)
+}
+
+func (s *purgeStatements) PurgeRoom(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string,
+) error {
+
+ // purge by roomID
+ purgeByRoomID := []*sql.Stmt{
+ s.purgeRoomAliasesStmt,
+ s.purgePublishedStmt,
+ }
+ for _, stmt := range purgeByRoomID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomID)
+ if err != nil {
+ return err
+ }
+ }
+
+ // purge by roomNID
+ purgeByRoomNID := []*sql.Stmt{
+ s.purgeStateBlockEntriesStmt,
+ s.purgeStateSnapshotEntriesStmt,
+ s.purgeInvitesStmt,
+ s.purgeMembershipsStmt,
+ s.purgePreviousEventsStmt,
+ s.purgeEventJSONStmt,
+ s.purgeRedactionStmt,
+ s.purgeEventsStmt,
+ s.purgeRoomStmt,
+ }
+ for _, stmt := range purgeByRoomNID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/roomserver/storage/postgres/rooms_table.go b/roomserver/storage/postgres/rooms_table.go
index 99439953..c8346733 100644
--- a/roomserver/storage/postgres/rooms_table.go
+++ b/roomserver/storage/postgres/rooms_table.go
@@ -58,6 +58,9 @@ const insertRoomNIDSQL = "" +
const selectRoomNIDSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
+const selectRoomNIDForUpdateSQL = "" +
+ "SELECT room_nid FROM roomserver_rooms WHERE room_id = $1 FOR UPDATE"
+
const selectLatestEventNIDsSQL = "" +
"SELECT latest_event_nids, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
@@ -85,6 +88,7 @@ const bulkSelectRoomNIDsSQL = "" +
type roomStatements struct {
insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
+ selectRoomNIDForUpdateStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt
@@ -106,6 +110,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
return s, sqlutil.StatementList{
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
+ {&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
@@ -169,6 +174,15 @@ func (s *roomStatements) SelectRoomNID(
return types.RoomNID(roomNID), err
}
+func (s *roomStatements) SelectRoomNIDForUpdate(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (types.RoomNID, error) {
+ var roomNID int64
+ stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
+ err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
+ return types.RoomNID(roomNID), err
+}
+
func (s *roomStatements) SelectLatestEventNIDs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.EventNID, types.StateSnapshotNID, error) {
diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go
index 23a5f79e..87208438 100644
--- a/roomserver/storage/postgres/storage.go
+++ b/roomserver/storage/postgres/storage.go
@@ -189,6 +189,10 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
if err != nil {
return err
}
+ purge, err := PreparePurgeStatements(db)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: db,
Cache: cache,
@@ -206,6 +210,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
MembershipTable: membership,
PublishedTable: published,
RedactionsTable: redactions,
+ Purge: purge,
}
return nil
}
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index 725cc5bc..654b078d 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -43,6 +43,7 @@ type Database struct {
MembershipTable tables.Membership
PublishedTable tables.Published
RedactionsTable tables.Redactions
+ Purge tables.Purge
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
}
@@ -1445,6 +1446,21 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget
})
}
+// PurgeRoom removes all information about a given room from the roomserver.
+// For large rooms this operation may take a considerable amount of time.
+func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return fmt.Errorf("room %s does not exist", roomID)
+ }
+ return fmt.Errorf("failed to lock the room: %w", err)
+ }
+ return d.Purge.PurgeRoom(ctx, txn, roomNID, roomID)
+ })
+}
+
func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
diff --git a/roomserver/storage/sqlite3/purge_statements.go b/roomserver/storage/sqlite3/purge_statements.go
new file mode 100644
index 00000000..c7b4d27a
--- /dev/null
+++ b/roomserver/storage/sqlite3/purge_statements.go
@@ -0,0 +1,153 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+const purgeEventJSONSQL = "" +
+ "DELETE FROM roomserver_event_json WHERE event_nid IN (" +
+ " SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeEventsSQL = "" +
+ "DELETE FROM roomserver_events WHERE room_nid = $1"
+
+const purgeInvitesSQL = "" +
+ "DELETE FROM roomserver_invites WHERE room_nid = $1"
+
+const purgeMembershipsSQL = "" +
+ "DELETE FROM roomserver_membership WHERE room_nid = $1"
+
+const purgePreviousEventsSQL = "" +
+ "DELETE FROM roomserver_previous_events WHERE event_nids IN(" +
+ " SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgePublishedSQL = "" +
+ "DELETE FROM roomserver_published WHERE room_id = $1"
+
+const purgeRedactionsSQL = "" +
+ "DELETE FROM roomserver_redactions WHERE redaction_event_id IN(" +
+ " SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeRoomAliasesSQL = "" +
+ "DELETE FROM roomserver_room_aliases WHERE room_id = $1"
+
+const purgeRoomSQL = "" +
+ "DELETE FROM roomserver_rooms WHERE room_nid = $1"
+
+const purgeStateSnapshotEntriesSQL = "" +
+ "DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
+
+type purgeStatements struct {
+ purgeEventJSONStmt *sql.Stmt
+ purgeEventsStmt *sql.Stmt
+ purgeInvitesStmt *sql.Stmt
+ purgeMembershipsStmt *sql.Stmt
+ purgePreviousEventsStmt *sql.Stmt
+ purgePublishedStmt *sql.Stmt
+ purgeRedactionStmt *sql.Stmt
+ purgeRoomAliasesStmt *sql.Stmt
+ purgeRoomStmt *sql.Stmt
+ purgeStateSnapshotEntriesStmt *sql.Stmt
+ stateSnapshot *stateSnapshotStatements
+}
+
+func PreparePurgeStatements(db *sql.DB, stateSnapshot *stateSnapshotStatements) (*purgeStatements, error) {
+ s := &purgeStatements{stateSnapshot: stateSnapshot}
+ return s, sqlutil.StatementList{
+ {&s.purgeEventJSONStmt, purgeEventJSONSQL},
+ {&s.purgeEventsStmt, purgeEventsSQL},
+ {&s.purgeInvitesStmt, purgeInvitesSQL},
+ {&s.purgeMembershipsStmt, purgeMembershipsSQL},
+ {&s.purgePublishedStmt, purgePublishedSQL},
+ {&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
+ {&s.purgeRedactionStmt, purgeRedactionsSQL},
+ {&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
+ {&s.purgeRoomStmt, purgeRoomSQL},
+ //{&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
+ {&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
+ }.Prepare(db)
+}
+
+func (s *purgeStatements) PurgeRoom(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string,
+) error {
+
+ // purge by roomID
+ purgeByRoomID := []*sql.Stmt{
+ s.purgeRoomAliasesStmt,
+ s.purgePublishedStmt,
+ }
+ for _, stmt := range purgeByRoomID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomID)
+ if err != nil {
+ return err
+ }
+ }
+
+ // purge by roomNID
+ if err := s.purgeStateBlocks(ctx, txn, roomNID); err != nil {
+ return err
+ }
+
+ purgeByRoomNID := []*sql.Stmt{
+ s.purgeStateSnapshotEntriesStmt,
+ s.purgeInvitesStmt,
+ s.purgeMembershipsStmt,
+ s.purgePreviousEventsStmt,
+ s.purgeEventJSONStmt,
+ s.purgeRedactionStmt,
+ s.purgeEventsStmt,
+ s.purgeRoomStmt,
+ }
+ for _, stmt := range purgeByRoomNID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *purgeStatements) purgeStateBlocks(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
+) error {
+ // Get all stateBlockNIDs
+ stateBlockNIDs, err := s.stateSnapshot.selectStateBlockNIDsForRoomNID(ctx, txn, roomNID)
+ if err != nil {
+ return err
+ }
+ params := make([]interface{}, len(stateBlockNIDs))
+ seenNIDs := make(map[types.StateBlockNID]struct{}, len(stateBlockNIDs))
+ // dedupe NIDs
+ for k, v := range stateBlockNIDs {
+ if _, ok := seenNIDs[v]; ok {
+ continue
+ }
+ params[k] = v
+ seenNIDs[v] = struct{}{}
+ }
+
+ query := "DELETE FROM roomserver_state_block WHERE state_block_nid IN($1)"
+ return sqlutil.RunLimitedVariablesExec(ctx, query, txn, params, sqlutil.SQLite3MaxVariables)
+}
diff --git a/roomserver/storage/sqlite3/rooms_table.go b/roomserver/storage/sqlite3/rooms_table.go
index 25b611b3..7556b346 100644
--- a/roomserver/storage/sqlite3/rooms_table.go
+++ b/roomserver/storage/sqlite3/rooms_table.go
@@ -74,10 +74,14 @@ const bulkSelectRoomIDsSQL = "" +
const bulkSelectRoomNIDsSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id IN ($1)"
+const selectRoomNIDForUpdateSQL = "" +
+ "SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
+
type roomStatements struct {
db *sql.DB
insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
+ selectRoomNIDForUpdateStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt
@@ -105,6 +109,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
//{&s.selectRoomVersionForRoomNIDsStmt, selectRoomVersionForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL},
+ {&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
}.Prepare(db)
}
@@ -169,6 +174,15 @@ func (s *roomStatements) SelectRoomNID(
return types.RoomNID(roomNID), err
}
+func (s *roomStatements) SelectRoomNIDForUpdate(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (types.RoomNID, error) {
+ var roomNID int64
+ stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
+ err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
+ return types.RoomNID(roomNID), err
+}
+
func (s *roomStatements) SelectLatestEventNIDs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.EventNID, types.StateSnapshotNID, error) {
diff --git a/roomserver/storage/sqlite3/state_block_table.go b/roomserver/storage/sqlite3/state_block_table.go
index 4e67d4da..ae8181cf 100644
--- a/roomserver/storage/sqlite3/state_block_table.go
+++ b/roomserver/storage/sqlite3/state_block_table.go
@@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util"
)
@@ -68,7 +67,7 @@ func CreateStateBlockTable(db *sql.DB) error {
return err
}
-func PrepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
+func PrepareStateBlockTable(db *sql.DB) (*stateBlockStatements, error) {
s := &stateBlockStatements{
db: db,
}
diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go
index 73827522..930ad14d 100644
--- a/roomserver/storage/sqlite3/state_snapshot_table.go
+++ b/roomserver/storage/sqlite3/state_snapshot_table.go
@@ -62,10 +62,14 @@ const bulkSelectStateBlockNIDsSQL = "" +
"SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" +
" WHERE state_snapshot_nid IN ($1) ORDER BY state_snapshot_nid ASC"
+const selectStateBlockNIDsForRoomNID = "" +
+ "SELECT state_block_nids FROM roomserver_state_snapshots WHERE room_nid = $1"
+
type stateSnapshotStatements struct {
db *sql.DB
insertStateStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
+ selectStateBlockNIDsStmt *sql.Stmt
}
func CreateStateSnapshotTable(db *sql.DB) error {
@@ -73,7 +77,7 @@ func CreateStateSnapshotTable(db *sql.DB) error {
return err
}
-func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
+func PrepareStateSnapshotTable(db *sql.DB) (*stateSnapshotStatements, error) {
s := &stateSnapshotStatements{
db: db,
}
@@ -81,6 +85,7 @@ func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
return s, sqlutil.StatementList{
{&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
+ {&s.selectStateBlockNIDsStmt, selectStateBlockNIDsForRoomNID},
}.Prepare(db)
}
@@ -146,3 +151,29 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
) ([]types.EventNID, error) {
return nil, tables.OptimisationNotSupportedError
}
+
+func (s *stateSnapshotStatements) selectStateBlockNIDsForRoomNID(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
+) ([]types.StateBlockNID, error) {
+ var res []types.StateBlockNID
+ rows, err := sqlutil.TxStmt(txn, s.selectStateBlockNIDsStmt).QueryContext(ctx, roomNID)
+ if err != nil {
+ return res, nil
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectStateBlockNIDsForRoomNID: rows.close() failed")
+
+ var stateBlockNIDs []types.StateBlockNID
+ var stateBlockNIDsJSON string
+ for rows.Next() {
+ if err = rows.Scan(&stateBlockNIDsJSON); err != nil {
+ return nil, err
+ }
+ if err = json.Unmarshal([]byte(stateBlockNIDsJSON), &stateBlockNIDs); err != nil {
+ return nil, err
+ }
+
+ res = append(res, stateBlockNIDs...)
+ }
+
+ return res, rows.Err()
+}
diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go
index 01c3f879..392edd28 100644
--- a/roomserver/storage/sqlite3/storage.go
+++ b/roomserver/storage/sqlite3/storage.go
@@ -197,6 +197,11 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
if err != nil {
return err
}
+ purge, err := PreparePurgeStatements(db, stateSnapshot)
+ if err != nil {
+ return err
+ }
+
d.Database = shared.Database{
DB: db,
Cache: cache,
@@ -215,6 +220,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
PublishedTable: published,
RedactionsTable: redactions,
GetRoomUpdaterFn: d.GetRoomUpdater,
+ Purge: purge,
}
return nil
}
diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go
index 80fcf72d..64145f83 100644
--- a/roomserver/storage/tables/interface.go
+++ b/roomserver/storage/tables/interface.go
@@ -73,6 +73,7 @@ type Events interface {
type Rooms interface {
InsertRoomNID(ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion) (types.RoomNID, error)
SelectRoomNID(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
+ SelectRoomNIDForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
@@ -173,6 +174,12 @@ type Redactions interface {
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
}
+type Purge interface {
+ PurgeRoom(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string,
+ ) error
+}
+
// StrippedEvent represents a stripped event for returning extracted content values.
type StrippedEvent struct {
RoomID string