aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/api.go1
-rw-r--r--roomserver/api/api_trace.go10
-rw-r--r--roomserver/api/output.go8
-rw-r--r--roomserver/api/perform.go8
-rw-r--r--roomserver/internal/perform/perform_admin.go37
-rw-r--r--roomserver/inthttp/client.go12
-rw-r--r--roomserver/inthttp/server.go5
-rw-r--r--roomserver/roomserver_test.go165
-rw-r--r--roomserver/storage/interface.go1
-rw-r--r--roomserver/storage/postgres/purge_statements.go133
-rw-r--r--roomserver/storage/postgres/rooms_table.go14
-rw-r--r--roomserver/storage/postgres/storage.go5
-rw-r--r--roomserver/storage/shared/storage.go16
-rw-r--r--roomserver/storage/sqlite3/purge_statements.go153
-rw-r--r--roomserver/storage/sqlite3/rooms_table.go14
-rw-r--r--roomserver/storage/sqlite3/state_block_table.go3
-rw-r--r--roomserver/storage/sqlite3/state_snapshot_table.go33
-rw-r--r--roomserver/storage/sqlite3/storage.go6
-rw-r--r--roomserver/storage/tables/interface.go7
19 files changed, 628 insertions, 3 deletions
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index 420ef278..a8228ae8 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -151,6 +151,7 @@ type ClientRoomserverAPI interface {
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) error
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) error
+ PerformAdminPurgeRoom(ctx context.Context, req *PerformAdminPurgeRoomRequest, res *PerformAdminPurgeRoomResponse) error
PerformAdminDownloadState(ctx context.Context, req *PerformAdminDownloadStateRequest, res *PerformAdminDownloadStateResponse) error
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error
diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go
index b23263d1..166b651a 100644
--- a/roomserver/api/api_trace.go
+++ b/roomserver/api/api_trace.go
@@ -137,6 +137,16 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
return err
}
+func (t *RoomserverInternalAPITrace) PerformAdminPurgeRoom(
+ ctx context.Context,
+ req *PerformAdminPurgeRoomRequest,
+ res *PerformAdminPurgeRoomResponse,
+) error {
+ err := t.Impl.PerformAdminPurgeRoom(ctx, req, res)
+ util.GetLogger(ctx).WithError(err).Infof("PerformAdminPurgeRoom req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
func (t *RoomserverInternalAPITrace) PerformAdminDownloadState(
ctx context.Context,
req *PerformAdminDownloadStateRequest,
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index 36d0625c..0c0f52c4 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -55,6 +55,8 @@ const (
OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
OutputTypeRetirePeek OutputType = "retire_peek"
+ // OutputTypePurgeRoom indicates the event is an OutputPurgeRoom
+ OutputTypePurgeRoom OutputType = "purge_room"
)
// An OutputEvent is an entry in the roomserver output kafka log.
@@ -78,6 +80,8 @@ type OutputEvent struct {
NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
// The content of event with type OutputTypeRetirePeek
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
+ // The content of the event with type OutputPurgeRoom
+ PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"`
}
// Type of the OutputNewRoomEvent.
@@ -257,3 +261,7 @@ type OutputRetirePeek struct {
UserID string
DeviceID string
}
+
+type OutputPurgeRoom struct {
+ RoomID string
+}
diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go
index e789b956..83cb0460 100644
--- a/roomserver/api/perform.go
+++ b/roomserver/api/perform.go
@@ -241,6 +241,14 @@ type PerformAdminEvacuateUserResponse struct {
Error *PerformError
}
+type PerformAdminPurgeRoomRequest struct {
+ RoomID string `json:"room_id"`
+}
+
+type PerformAdminPurgeRoomResponse struct {
+ Error *PerformError `json:"error,omitempty"`
+}
+
type PerformAdminDownloadStateRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go
index d42f4e45..3256162b 100644
--- a/roomserver/internal/perform/perform_admin.go
+++ b/roomserver/internal/perform/perform_admin.go
@@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
)
type Admin struct {
@@ -242,6 +243,42 @@ func (r *Admin) PerformAdminEvacuateUser(
return nil
}
+func (r *Admin) PerformAdminPurgeRoom(
+ ctx context.Context,
+ req *api.PerformAdminPurgeRoomRequest,
+ res *api.PerformAdminPurgeRoomResponse,
+) error {
+ // Validate we actually got a room ID and nothing else
+ if _, _, err := gomatrixserverlib.SplitID('!', req.RoomID); err != nil {
+ res.Error = &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("Malformed room ID: %s", err),
+ }
+ return nil
+ }
+
+ logrus.WithField("room_id", req.RoomID).Warn("Purging room from roomserver")
+ if err := r.DB.PurgeRoom(ctx, req.RoomID); err != nil {
+ logrus.WithField("room_id", req.RoomID).WithError(err).Warn("Failed to purge room from roomserver")
+ res.Error = &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: err.Error(),
+ }
+ return nil
+ }
+
+ logrus.WithField("room_id", req.RoomID).Warn("Room purged from roomserver")
+
+ return r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
+ {
+ Type: api.OutputTypePurgeRoom,
+ PurgeRoom: &api.OutputPurgeRoom{
+ RoomID: req.RoomID,
+ },
+ },
+ })
+}
+
func (r *Admin) PerformAdminDownloadState(
ctx context.Context,
req *api.PerformAdminDownloadStateRequest,
diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go
index 8a2e0a03..556a137b 100644
--- a/roomserver/inthttp/client.go
+++ b/roomserver/inthttp/client.go
@@ -40,6 +40,7 @@ const (
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
RoomserverPerformAdminDownloadStatePath = "/roomserver/performAdminDownloadState"
+ RoomserverPerformAdminPurgeRoomPath = "/roomserver/performAdminPurgeRoom"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@@ -285,6 +286,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
)
}
+func (h *httpRoomserverInternalAPI) PerformAdminPurgeRoom(
+ ctx context.Context,
+ request *api.PerformAdminPurgeRoomRequest,
+ response *api.PerformAdminPurgeRoomResponse,
+) error {
+ return httputil.CallInternalRPCAPI(
+ "PerformAdminPurgeRoom", h.roomserverURL+RoomserverPerformAdminPurgeRoomPath,
+ h.httpClient, ctx, request, response,
+ )
+}
+
// QueryLatestEventsAndState implements RoomserverQueryAPI
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
ctx context.Context,
diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go
index 4d21909b..f3a51b0b 100644
--- a/roomserver/inthttp/server.go
+++ b/roomserver/inthttp/server.go
@@ -66,6 +66,11 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router, enableMe
)
internalAPIMux.Handle(
+ RoomserverPerformAdminPurgeRoomPath,
+ httputil.MakeInternalRPCAPI("RoomserverPerformAdminPurgeRoom", enableMetrics, r.PerformAdminPurgeRoom),
+ )
+
+ internalAPIMux.Handle(
RoomserverPerformAdminDownloadStatePath,
httputil.MakeInternalRPCAPI("RoomserverPerformAdminDownloadState", enableMetrics, r.PerformAdminDownloadState),
)
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index 595ceb52..3ec2560d 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -14,6 +14,10 @@ import (
userAPI "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/dendrite/federationapi"
+ "github.com/matrix-org/dendrite/keyserver"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/roomserver"
@@ -223,3 +227,164 @@ func Test_QueryLeftUsers(t *testing.T) {
})
}
+
+func TestPurgeRoom(t *testing.T) {
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+ room := test.NewRoom(t, alice, test.RoomPreset(test.PresetTrustedPrivateChat))
+
+ // Invite Bob
+ inviteEvent := room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
+ "membership": "invite",
+ }, test.WithStateKey(bob.ID))
+
+ ctx := context.Background()
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ base, db, close := mustCreateDatabase(t, dbType)
+ defer close()
+
+ jsCtx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream)
+
+ fedClient := base.CreateFederationClient()
+ rsAPI := roomserver.NewInternalAPI(base)
+ keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fedClient, rsAPI)
+ userAPI := userapi.NewInternalAPI(base, &base.Cfg.UserAPI, nil, keyAPI, rsAPI, nil)
+
+ // this starts the JetStream consumers
+ syncapi.AddPublicRoutes(base, userAPI, rsAPI, keyAPI)
+ federationapi.NewInternalAPI(base, fedClient, rsAPI, base.Caches, nil, true)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ // Create the room
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ // some dummy entries to validate after purging
+ publishResp := &api.PerformPublishResponse{}
+ if err := rsAPI.PerformPublish(ctx, &api.PerformPublishRequest{RoomID: room.ID, Visibility: "public"}, publishResp); err != nil {
+ t.Fatal(err)
+ }
+ if publishResp.Error != nil {
+ t.Fatal(publishResp.Error)
+ }
+
+ isPublished, err := db.GetPublishedRoom(ctx, room.ID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !isPublished {
+ t.Fatalf("room should be published before purging")
+ }
+
+ aliasResp := &api.SetRoomAliasResponse{}
+ if err = rsAPI.SetRoomAlias(ctx, &api.SetRoomAliasRequest{RoomID: room.ID, Alias: "myalias", UserID: alice.ID}, aliasResp); err != nil {
+ t.Fatal(err)
+ }
+ // check the alias is actually there
+ aliasesResp := &api.GetAliasesForRoomIDResponse{}
+ if err = rsAPI.GetAliasesForRoomID(ctx, &api.GetAliasesForRoomIDRequest{RoomID: room.ID}, aliasesResp); err != nil {
+ t.Fatal(err)
+ }
+ wantAliases := 1
+ if gotAliases := len(aliasesResp.Aliases); gotAliases != wantAliases {
+ t.Fatalf("expected %d aliases, got %d", wantAliases, gotAliases)
+ }
+
+ // validate the room exists before purging
+ roomInfo, err := db.RoomInfo(ctx, room.ID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if roomInfo == nil {
+ t.Fatalf("room does not exist")
+ }
+ // remember the roomInfo before purging
+ existingRoomInfo := roomInfo
+
+ // validate there is an invite for bob
+ nids, err := db.EventStateKeyNIDs(ctx, []string{bob.ID})
+ if err != nil {
+ t.Fatal(err)
+ }
+ bobNID, ok := nids[bob.ID]
+ if !ok {
+ t.Fatalf("%s does not exist", bob.ID)
+ }
+
+ _, inviteEventIDs, _, err := db.GetInvitesForUser(ctx, roomInfo.RoomNID, bobNID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantInviteCount := 1
+ if inviteCount := len(inviteEventIDs); inviteCount != wantInviteCount {
+ t.Fatalf("expected there to be only %d invite events, got %d", wantInviteCount, inviteCount)
+ }
+ if inviteEventIDs[0] != inviteEvent.EventID() {
+ t.Fatalf("expected invite event ID %s, got %s", inviteEvent.EventID(), inviteEventIDs[0])
+ }
+
+ // purge the room from the database
+ purgeResp := &api.PerformAdminPurgeRoomResponse{}
+ if err = rsAPI.PerformAdminPurgeRoom(ctx, &api.PerformAdminPurgeRoomRequest{RoomID: room.ID}, purgeResp); err != nil {
+ t.Fatal(err)
+ }
+
+ // wait for all consumers to process the purge event
+ var sum = 1
+ timeout := time.Second * 5
+ deadline, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ for sum > 0 {
+ if deadline.Err() != nil {
+ t.Fatalf("test timed out after %s", timeout)
+ }
+ sum = 0
+ consumerCh := jsCtx.Consumers(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
+ for x := range consumerCh {
+ sum += x.NumAckPending
+ }
+ time.Sleep(time.Millisecond)
+ }
+
+ roomInfo, err = db.RoomInfo(ctx, room.ID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if roomInfo != nil {
+ t.Fatalf("room should not exist after purging: %+v", roomInfo)
+ }
+
+ // validation below
+
+ // There should be no invite left
+ _, inviteEventIDs, _, err = db.GetInvitesForUser(ctx, existingRoomInfo.RoomNID, bobNID)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if inviteCount := len(inviteEventIDs); inviteCount > 0 {
+ t.Fatalf("expected there to be only %d invite events, got %d", wantInviteCount, inviteCount)
+ }
+
+ // aliases should be deleted
+ aliases, err := db.GetAliasesForRoomID(ctx, room.ID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if aliasCount := len(aliases); aliasCount > 0 {
+ t.Fatalf("expected there to be only %d invite events, got %d", 0, aliasCount)
+ }
+
+ // published room should be deleted
+ isPublished, err = db.GetPublishedRoom(ctx, room.ID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if isPublished {
+ t.Fatalf("room should not be published after purging")
+ }
+ })
+}
diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go
index 92bc2e66..e0b9c56b 100644
--- a/roomserver/storage/interface.go
+++ b/roomserver/storage/interface.go
@@ -173,5 +173,6 @@ type Database interface {
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error)
GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error)
+ PurgeRoom(ctx context.Context, roomID string) error
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
}
diff --git a/roomserver/storage/postgres/purge_statements.go b/roomserver/storage/postgres/purge_statements.go
new file mode 100644
index 00000000..efba439b
--- /dev/null
+++ b/roomserver/storage/postgres/purge_statements.go
@@ -0,0 +1,133 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+const purgeEventJSONSQL = "" +
+ "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" +
+ " SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeEventsSQL = "" +
+ "DELETE FROM roomserver_events WHERE room_nid = $1"
+
+const purgeInvitesSQL = "" +
+ "DELETE FROM roomserver_invites WHERE room_nid = $1"
+
+const purgeMembershipsSQL = "" +
+ "DELETE FROM roomserver_membership WHERE room_nid = $1"
+
+const purgePreviousEventsSQL = "" +
+ "DELETE FROM roomserver_previous_events WHERE event_nids && ANY(" +
+ " SELECT ARRAY_AGG(event_nid) FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgePublishedSQL = "" +
+ "DELETE FROM roomserver_published WHERE room_id = $1"
+
+const purgeRedactionsSQL = "" +
+ "DELETE FROM roomserver_redactions WHERE redaction_event_id = ANY(" +
+ " SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeRoomAliasesSQL = "" +
+ "DELETE FROM roomserver_room_aliases WHERE room_id = $1"
+
+const purgeRoomSQL = "" +
+ "DELETE FROM roomserver_rooms WHERE room_nid = $1"
+
+const purgeStateBlockEntriesSQL = "" +
+ "DELETE FROM roomserver_state_block WHERE state_block_nid = ANY(" +
+ " SELECT DISTINCT UNNEST(state_block_nids) FROM roomserver_state_snapshots WHERE room_nid = $1" +
+ ")"
+
+const purgeStateSnapshotEntriesSQL = "" +
+ "DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
+
+type purgeStatements struct {
+ purgeEventJSONStmt *sql.Stmt
+ purgeEventsStmt *sql.Stmt
+ purgeInvitesStmt *sql.Stmt
+ purgeMembershipsStmt *sql.Stmt
+ purgePreviousEventsStmt *sql.Stmt
+ purgePublishedStmt *sql.Stmt
+ purgeRedactionStmt *sql.Stmt
+ purgeRoomAliasesStmt *sql.Stmt
+ purgeRoomStmt *sql.Stmt
+ purgeStateBlockEntriesStmt *sql.Stmt
+ purgeStateSnapshotEntriesStmt *sql.Stmt
+}
+
+func PreparePurgeStatements(db *sql.DB) (*purgeStatements, error) {
+ s := &purgeStatements{}
+
+ return s, sqlutil.StatementList{
+ {&s.purgeEventJSONStmt, purgeEventJSONSQL},
+ {&s.purgeEventsStmt, purgeEventsSQL},
+ {&s.purgeInvitesStmt, purgeInvitesSQL},
+ {&s.purgeMembershipsStmt, purgeMembershipsSQL},
+ {&s.purgePublishedStmt, purgePublishedSQL},
+ {&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
+ {&s.purgeRedactionStmt, purgeRedactionsSQL},
+ {&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
+ {&s.purgeRoomStmt, purgeRoomSQL},
+ {&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
+ {&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
+ }.Prepare(db)
+}
+
+func (s *purgeStatements) PurgeRoom(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string,
+) error {
+
+ // purge by roomID
+ purgeByRoomID := []*sql.Stmt{
+ s.purgeRoomAliasesStmt,
+ s.purgePublishedStmt,
+ }
+ for _, stmt := range purgeByRoomID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomID)
+ if err != nil {
+ return err
+ }
+ }
+
+ // purge by roomNID
+ purgeByRoomNID := []*sql.Stmt{
+ s.purgeStateBlockEntriesStmt,
+ s.purgeStateSnapshotEntriesStmt,
+ s.purgeInvitesStmt,
+ s.purgeMembershipsStmt,
+ s.purgePreviousEventsStmt,
+ s.purgeEventJSONStmt,
+ s.purgeRedactionStmt,
+ s.purgeEventsStmt,
+ s.purgeRoomStmt,
+ }
+ for _, stmt := range purgeByRoomNID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/roomserver/storage/postgres/rooms_table.go b/roomserver/storage/postgres/rooms_table.go
index 99439953..c8346733 100644
--- a/roomserver/storage/postgres/rooms_table.go
+++ b/roomserver/storage/postgres/rooms_table.go
@@ -58,6 +58,9 @@ const insertRoomNIDSQL = "" +
const selectRoomNIDSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
+const selectRoomNIDForUpdateSQL = "" +
+ "SELECT room_nid FROM roomserver_rooms WHERE room_id = $1 FOR UPDATE"
+
const selectLatestEventNIDsSQL = "" +
"SELECT latest_event_nids, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
@@ -85,6 +88,7 @@ const bulkSelectRoomNIDsSQL = "" +
type roomStatements struct {
insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
+ selectRoomNIDForUpdateStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt
@@ -106,6 +110,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
return s, sqlutil.StatementList{
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
+ {&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
@@ -169,6 +174,15 @@ func (s *roomStatements) SelectRoomNID(
return types.RoomNID(roomNID), err
}
+func (s *roomStatements) SelectRoomNIDForUpdate(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (types.RoomNID, error) {
+ var roomNID int64
+ stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
+ err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
+ return types.RoomNID(roomNID), err
+}
+
func (s *roomStatements) SelectLatestEventNIDs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.EventNID, types.StateSnapshotNID, error) {
diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go
index 23a5f79e..87208438 100644
--- a/roomserver/storage/postgres/storage.go
+++ b/roomserver/storage/postgres/storage.go
@@ -189,6 +189,10 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
if err != nil {
return err
}
+ purge, err := PreparePurgeStatements(db)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: db,
Cache: cache,
@@ -206,6 +210,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
MembershipTable: membership,
PublishedTable: published,
RedactionsTable: redactions,
+ Purge: purge,
}
return nil
}
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index 725cc5bc..654b078d 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -43,6 +43,7 @@ type Database struct {
MembershipTable tables.Membership
PublishedTable tables.Published
RedactionsTable tables.Redactions
+ Purge tables.Purge
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
}
@@ -1445,6 +1446,21 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget
})
}
+// PurgeRoom removes all information about a given room from the roomserver.
+// For large rooms this operation may take a considerable amount of time.
+func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return fmt.Errorf("room %s does not exist", roomID)
+ }
+ return fmt.Errorf("failed to lock the room: %w", err)
+ }
+ return d.Purge.PurgeRoom(ctx, txn, roomNID, roomID)
+ })
+}
+
func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
diff --git a/roomserver/storage/sqlite3/purge_statements.go b/roomserver/storage/sqlite3/purge_statements.go
new file mode 100644
index 00000000..c7b4d27a
--- /dev/null
+++ b/roomserver/storage/sqlite3/purge_statements.go
@@ -0,0 +1,153 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/types"
+)
+
+const purgeEventJSONSQL = "" +
+ "DELETE FROM roomserver_event_json WHERE event_nid IN (" +
+ " SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeEventsSQL = "" +
+ "DELETE FROM roomserver_events WHERE room_nid = $1"
+
+const purgeInvitesSQL = "" +
+ "DELETE FROM roomserver_invites WHERE room_nid = $1"
+
+const purgeMembershipsSQL = "" +
+ "DELETE FROM roomserver_membership WHERE room_nid = $1"
+
+const purgePreviousEventsSQL = "" +
+ "DELETE FROM roomserver_previous_events WHERE event_nids IN(" +
+ " SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgePublishedSQL = "" +
+ "DELETE FROM roomserver_published WHERE room_id = $1"
+
+const purgeRedactionsSQL = "" +
+ "DELETE FROM roomserver_redactions WHERE redaction_event_id IN(" +
+ " SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
+ ")"
+
+const purgeRoomAliasesSQL = "" +
+ "DELETE FROM roomserver_room_aliases WHERE room_id = $1"
+
+const purgeRoomSQL = "" +
+ "DELETE FROM roomserver_rooms WHERE room_nid = $1"
+
+const purgeStateSnapshotEntriesSQL = "" +
+ "DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
+
+type purgeStatements struct {
+ purgeEventJSONStmt *sql.Stmt
+ purgeEventsStmt *sql.Stmt
+ purgeInvitesStmt *sql.Stmt
+ purgeMembershipsStmt *sql.Stmt
+ purgePreviousEventsStmt *sql.Stmt
+ purgePublishedStmt *sql.Stmt
+ purgeRedactionStmt *sql.Stmt
+ purgeRoomAliasesStmt *sql.Stmt
+ purgeRoomStmt *sql.Stmt
+ purgeStateSnapshotEntriesStmt *sql.Stmt
+ stateSnapshot *stateSnapshotStatements
+}
+
+func PreparePurgeStatements(db *sql.DB, stateSnapshot *stateSnapshotStatements) (*purgeStatements, error) {
+ s := &purgeStatements{stateSnapshot: stateSnapshot}
+ return s, sqlutil.StatementList{
+ {&s.purgeEventJSONStmt, purgeEventJSONSQL},
+ {&s.purgeEventsStmt, purgeEventsSQL},
+ {&s.purgeInvitesStmt, purgeInvitesSQL},
+ {&s.purgeMembershipsStmt, purgeMembershipsSQL},
+ {&s.purgePublishedStmt, purgePublishedSQL},
+ {&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
+ {&s.purgeRedactionStmt, purgeRedactionsSQL},
+ {&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
+ {&s.purgeRoomStmt, purgeRoomSQL},
+ //{&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
+ {&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
+ }.Prepare(db)
+}
+
+func (s *purgeStatements) PurgeRoom(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string,
+) error {
+
+ // purge by roomID
+ purgeByRoomID := []*sql.Stmt{
+ s.purgeRoomAliasesStmt,
+ s.purgePublishedStmt,
+ }
+ for _, stmt := range purgeByRoomID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomID)
+ if err != nil {
+ return err
+ }
+ }
+
+ // purge by roomNID
+ if err := s.purgeStateBlocks(ctx, txn, roomNID); err != nil {
+ return err
+ }
+
+ purgeByRoomNID := []*sql.Stmt{
+ s.purgeStateSnapshotEntriesStmt,
+ s.purgeInvitesStmt,
+ s.purgeMembershipsStmt,
+ s.purgePreviousEventsStmt,
+ s.purgeEventJSONStmt,
+ s.purgeRedactionStmt,
+ s.purgeEventsStmt,
+ s.purgeRoomStmt,
+ }
+ for _, stmt := range purgeByRoomNID {
+ _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *purgeStatements) purgeStateBlocks(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
+) error {
+ // Get all stateBlockNIDs
+ stateBlockNIDs, err := s.stateSnapshot.selectStateBlockNIDsForRoomNID(ctx, txn, roomNID)
+ if err != nil {
+ return err
+ }
+ params := make([]interface{}, len(stateBlockNIDs))
+ seenNIDs := make(map[types.StateBlockNID]struct{}, len(stateBlockNIDs))
+ // dedupe NIDs
+ for k, v := range stateBlockNIDs {
+ if _, ok := seenNIDs[v]; ok {
+ continue
+ }
+ params[k] = v
+ seenNIDs[v] = struct{}{}
+ }
+
+ query := "DELETE FROM roomserver_state_block WHERE state_block_nid IN($1)"
+ return sqlutil.RunLimitedVariablesExec(ctx, query, txn, params, sqlutil.SQLite3MaxVariables)
+}
diff --git a/roomserver/storage/sqlite3/rooms_table.go b/roomserver/storage/sqlite3/rooms_table.go
index 25b611b3..7556b346 100644
--- a/roomserver/storage/sqlite3/rooms_table.go
+++ b/roomserver/storage/sqlite3/rooms_table.go
@@ -74,10 +74,14 @@ const bulkSelectRoomIDsSQL = "" +
const bulkSelectRoomNIDsSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id IN ($1)"
+const selectRoomNIDForUpdateSQL = "" +
+ "SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
+
type roomStatements struct {
db *sql.DB
insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
+ selectRoomNIDForUpdateStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt
@@ -105,6 +109,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
//{&s.selectRoomVersionForRoomNIDsStmt, selectRoomVersionForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL},
+ {&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
}.Prepare(db)
}
@@ -169,6 +174,15 @@ func (s *roomStatements) SelectRoomNID(
return types.RoomNID(roomNID), err
}
+func (s *roomStatements) SelectRoomNIDForUpdate(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (types.RoomNID, error) {
+ var roomNID int64
+ stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
+ err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
+ return types.RoomNID(roomNID), err
+}
+
func (s *roomStatements) SelectLatestEventNIDs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.EventNID, types.StateSnapshotNID, error) {
diff --git a/roomserver/storage/sqlite3/state_block_table.go b/roomserver/storage/sqlite3/state_block_table.go
index 4e67d4da..ae8181cf 100644
--- a/roomserver/storage/sqlite3/state_block_table.go
+++ b/roomserver/storage/sqlite3/state_block_table.go
@@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util"
)
@@ -68,7 +67,7 @@ func CreateStateBlockTable(db *sql.DB) error {
return err
}
-func PrepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
+func PrepareStateBlockTable(db *sql.DB) (*stateBlockStatements, error) {
s := &stateBlockStatements{
db: db,
}
diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go
index 73827522..930ad14d 100644
--- a/roomserver/storage/sqlite3/state_snapshot_table.go
+++ b/roomserver/storage/sqlite3/state_snapshot_table.go
@@ -62,10 +62,14 @@ const bulkSelectStateBlockNIDsSQL = "" +
"SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" +
" WHERE state_snapshot_nid IN ($1) ORDER BY state_snapshot_nid ASC"
+const selectStateBlockNIDsForRoomNID = "" +
+ "SELECT state_block_nids FROM roomserver_state_snapshots WHERE room_nid = $1"
+
type stateSnapshotStatements struct {
db *sql.DB
insertStateStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
+ selectStateBlockNIDsStmt *sql.Stmt
}
func CreateStateSnapshotTable(db *sql.DB) error {
@@ -73,7 +77,7 @@ func CreateStateSnapshotTable(db *sql.DB) error {
return err
}
-func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
+func PrepareStateSnapshotTable(db *sql.DB) (*stateSnapshotStatements, error) {
s := &stateSnapshotStatements{
db: db,
}
@@ -81,6 +85,7 @@ func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
return s, sqlutil.StatementList{
{&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
+ {&s.selectStateBlockNIDsStmt, selectStateBlockNIDsForRoomNID},
}.Prepare(db)
}
@@ -146,3 +151,29 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
) ([]types.EventNID, error) {
return nil, tables.OptimisationNotSupportedError
}
+
+func (s *stateSnapshotStatements) selectStateBlockNIDsForRoomNID(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
+) ([]types.StateBlockNID, error) {
+ var res []types.StateBlockNID
+ rows, err := sqlutil.TxStmt(txn, s.selectStateBlockNIDsStmt).QueryContext(ctx, roomNID)
+ if err != nil {
+ return res, nil
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectStateBlockNIDsForRoomNID: rows.close() failed")
+
+ var stateBlockNIDs []types.StateBlockNID
+ var stateBlockNIDsJSON string
+ for rows.Next() {
+ if err = rows.Scan(&stateBlockNIDsJSON); err != nil {
+ return nil, err
+ }
+ if err = json.Unmarshal([]byte(stateBlockNIDsJSON), &stateBlockNIDs); err != nil {
+ return nil, err
+ }
+
+ res = append(res, stateBlockNIDs...)
+ }
+
+ return res, rows.Err()
+}
diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go
index 01c3f879..392edd28 100644
--- a/roomserver/storage/sqlite3/storage.go
+++ b/roomserver/storage/sqlite3/storage.go
@@ -197,6 +197,11 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
if err != nil {
return err
}
+ purge, err := PreparePurgeStatements(db, stateSnapshot)
+ if err != nil {
+ return err
+ }
+
d.Database = shared.Database{
DB: db,
Cache: cache,
@@ -215,6 +220,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
PublishedTable: published,
RedactionsTable: redactions,
GetRoomUpdaterFn: d.GetRoomUpdater,
+ Purge: purge,
}
return nil
}
diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go
index 80fcf72d..64145f83 100644
--- a/roomserver/storage/tables/interface.go
+++ b/roomserver/storage/tables/interface.go
@@ -73,6 +73,7 @@ type Events interface {
type Rooms interface {
InsertRoomNID(ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion) (types.RoomNID, error)
SelectRoomNID(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
+ SelectRoomNIDForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
@@ -173,6 +174,12 @@ type Redactions interface {
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
}
+type Purge interface {
+ PurgeRoom(
+ ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string,
+ ) error
+}
+
// StrippedEvent represents a stripped event for returning extracted content values.
type StrippedEvent struct {
RoomID string