aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/output_room_events_topology_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_topology_table.go')
-rw-r--r--syncapi/storage/postgres/output_room_events_topology_table.go42
1 files changed, 22 insertions, 20 deletions
diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go
index d0e99f26..2382fca5 100644
--- a/syncapi/storage/postgres/output_room_events_topology_table.go
+++ b/syncapi/storage/postgres/output_room_events_topology_table.go
@@ -18,11 +18,12 @@ import (
"context"
"database/sql"
+ "github.com/matrix-org/gomatrixserverlib"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
)
const outputRoomEventsTopologySchema = `
@@ -71,6 +72,9 @@ const selectStreamToTopologicalPositionAscSQL = "" +
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
+const purgeEventsTopologySQL = "" +
+ "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
+
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
@@ -78,6 +82,7 @@ type outputRoomEventsTopologyStatements struct {
selectPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
+ purgeEventsTopologyStmt *sql.Stmt
}
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
@@ -86,25 +91,15 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if err != nil {
return nil, err
}
- if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
- return nil, err
- }
- if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
- return nil, err
- }
- if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
- return nil, err
- }
- if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
- return nil, err
- }
- if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
- return nil, err
- }
- if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.insertEventInTopologyStmt, insertEventInTopologySQL},
+ {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL},
+ {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL},
+ {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL},
+ {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL},
+ {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL},
+ {&s.purgeEventsTopologyStmt, purgeEventsTopologySQL},
+ }.Prepare(db)
}
// InsertEventInTopology inserts the given event in the room's topology, based
@@ -177,3 +172,10 @@ func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
}
return
}
+
+func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) error {
+ _, err := sqlutil.TxStmt(txn, s.purgeEventsTopologyStmt).ExecContext(ctx, roomID)
+ return err
+}