aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-08-11 18:23:35 +0200
committerGitHub <noreply@github.com>2022-08-11 18:23:35 +0200
commit05cafbd197c99c0e116c9b61447e70ba5af992a3 (patch)
treef96dbf70e30b2a255f2b19574188115dda8e6145 /syncapi/storage/postgres
parent371336c6b5ffd510802d06b193a48b01a5e78d0c (diff)
Implement history visibility on `/messages`, `/context`, `/sync` (#2511)
* Add possibility to set history_visibility and user AccountType * Add new DB queries * Add actual history_visibility changes for /messages * Add passing tests * Extract check function * Cleanup * Cleanup * Fix build on 386 * Move ApplyHistoryVisibilityFilter to internal * Move queries to topology table * Add filtering to /sync and /context Some cleanup * Add passing tests; Remove failing tests :( * Re-add passing tests * Move filtering to own function to avoid duplication * Re-add passing test * Use newly added GMSL HistoryVisibility * Update gomatrixserverlib * Set the visibility when creating events * Default to shared history visibility * Remove unused query * Update history visibility checks to use gmsl Update tests * Remove unused statement * Update migrations to set "correct" history visibility * Add method to fetch the membership at a given event * Tweaks and logging * Use actual internal rsAPI, default to shared visibility in tests * Revert "Move queries to topology table" This reverts commit 4f0d41be9c194a46379796435ce73e79203edbd6. * Remove noise/unneeded code * More cleanup * Try to optimize database requests * Fix imports * PR peview fixes/changes * Move setting history visibility to own migration, be more restrictive * Fix unit tests * Lint * Fix missing entries * Tweaks for incremental syncs * Adapt generic changes Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> Co-authored-by: kegsay <kegan@matrix.org>
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go55
-rw-r--r--syncapi/storage/postgres/memberships_table.go28
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go10
-rw-r--r--syncapi/storage/postgres/syncserver.go15
4 files changed, 101 insertions, 7 deletions
diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
index 29008ade..d68ed8d5 100644
--- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
@@ -17,7 +17,10 @@ package deltas
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
+
+ "github.com/matrix-org/gomatrixserverlib"
)
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
@@ -31,6 +34,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T
return nil
}
+// UpSetHistoryVisibility sets the history visibility for already stored events.
+// Requires current_room_state and output_room_events to be created.
+func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error {
+ // get the current room history visibilities
+ historyVisibilities, err := currentHistoryVisibilities(ctx, tx)
+ if err != nil {
+ return err
+ }
+
+ // update the history visibility
+ for roomID, hisVis := range historyVisibilities {
+ _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1
+ WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID)
+ if err != nil {
+ return fmt.Errorf("failed to update history visibility: %w", err)
+ }
+ }
+
+ return nil
+}
+
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
@@ -39,9 +63,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
+
return nil
}
+// currentHistoryVisibilities returns a map from roomID to current history visibility.
+// If the history visibility was changed after room creation, defaults to joined.
+func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) {
+ rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state
+ WHERE type = 'm.room.history_visibility' AND state_key = '';
+`)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query current room state: %w", err)
+ }
+ defer rows.Close() // nolint: errcheck
+ var eventBytes []byte
+ var roomID string
+ var event gomatrixserverlib.HeaderedEvent
+ var hisVis gomatrixserverlib.HistoryVisibility
+ historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
+ for rows.Next() {
+ if err = rows.Scan(&roomID, &eventBytes); err != nil {
+ return nil, fmt.Errorf("failed to scan row: %w", err)
+ }
+ if err = json.Unmarshal(eventBytes, &event); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal event: %w", err)
+ }
+ historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined
+ if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 {
+ historyVisibilities[roomID] = hisVis
+ }
+ }
+ return historyVisibilities, nil
+}
+
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 00223c57..939d6b3f 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -66,10 +66,14 @@ const selectMembershipCountSQL = "" +
const selectHeroesSQL = "" +
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"
+const selectMembershipBeforeSQL = "" +
+ "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
+
type membershipsStatements struct {
- upsertMembershipStmt *sql.Stmt
- selectMembershipCountStmt *sql.Stmt
- selectHeroesStmt *sql.Stmt
+ upsertMembershipStmt *sql.Stmt
+ selectMembershipCountStmt *sql.Stmt
+ selectHeroesStmt *sql.Stmt
+ selectMembershipForUserStmt *sql.Stmt
}
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@@ -82,6 +86,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectHeroesStmt, selectHeroesSQL},
+ {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
}.Prepare(db)
}
@@ -132,3 +137,20 @@ func (s *membershipsStatements) SelectHeroes(
}
return heroes, rows.Err()
}
+
+// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
+// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
+// string as the membership.
+func (s *membershipsStatements) SelectMembershipForUser(
+ ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64,
+) (membership string, topologyPos int, err error) {
+ stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt)
+ err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return "leave", 0, nil
+ }
+ return "", 0, err
+ }
+ return membership, topologyPos, nil
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 34ff6700..8f633640 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -191,10 +191,12 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
}
m := sqlutil.NewMigrator(db)
- m.AddMigrations(sqlutil.Migration{
- Version: "syncapi: add history visibility column (output_room_events)",
- Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
- })
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "syncapi: add history visibility column (output_room_events)",
+ Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
+ },
+ )
err = m.Up(context.Background())
if err != nil {
return nil, err
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index a044716c..979ff664 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
)
@@ -97,6 +98,20 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if err != nil {
return nil, err
}
+
+ // apply migrations which need multiple tables
+ m := sqlutil.NewMigrator(d.db)
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "syncapi: set history visibility for existing events",
+ Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
+ },
+ )
+ err = m.Up(base.Context())
+ if err != nil {
+ return nil, err
+ }
+
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,