aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-07-18 14:46:15 +0200
committerGitHub <noreply@github.com>2022-07-18 14:46:15 +0200
commita7e92f8cb99105c2a579f73fbc972b7fb0b35678 (patch)
treed9df6aa7b9aa2321f9833fd964c108f2481153ff /syncapi/storage
parentc0c909d30642d03600fd48bd052a22040a67267b (diff)
History visibility database changes (#2533)
* Add new history_visibility column * Update SQL queries to include history_visibility * Store the history visibilty calculated by the roomserver * Update GMSL * Update migrations * Fix migration * Update GMSL * Fix `go.sum` * Update GMSL to use sql.Scanner & sql.Valuer * Re-order migration/table creation * Update gomatrixserverlib * Add history_visibility column to current_room_state * Fix migrations * Return error instead of Fatal log Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/interface.go4
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go12
-rw-r--r--syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go50
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go82
-rw-r--r--syncapi/storage/postgres/syncserver.go18
-rw-r--r--syncapi/storage/shared/syncserver.go7
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go12
-rw-r--r--syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go82
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go83
-rw-r--r--syncapi/storage/sqlite3/syncserver.go18
-rw-r--r--syncapi/storage/storage_test.go2
-rw-r--r--syncapi/storage/tables/interface.go9
-rw-r--r--syncapi/storage/tables/output_room_events_test.go4
13 files changed, 286 insertions, 97 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 05542603..9751670b 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -69,7 +69,9 @@ type Database interface {
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event.
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
- addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error)
+ addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool,
+ historyVisibility gomatrixserverlib.HistoryVisibility,
+ ) (types.StreamPosition, error)
// PurgeRoomState completely purges room state from the sync API. This is done when
// receiving an output event that completely resets the state.
PurgeRoomState(ctx context.Context, roomID string) error
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index c4667baf..aae2d8c3 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
-- The serial ID of the output_room_events table when this event became
-- part of the current state of the room.
added_at BIGINT,
+ history_visibility SMALLINT NOT NULL DEFAULT 2,
-- Clobber based on 3-uple of room_id, type and state_key
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
);
@@ -63,8 +64,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync
`
const upsertRoomStateSQL = "" +
- "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
- " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
+ "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
@@ -100,11 +101,11 @@ const selectStateEventSQL = "" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
- // TODO: The session_id and transaction_id blanks are here because otherwise
- // the rowsToStreamEvents expects there to be exactly six columns. We need to
+ // TODO: The session_id and transaction_id blanks are here because
+ // the rowsToStreamEvents expects there to be exactly seven columns. We need to
// figure out if these really need to be in the DB, and if so, we need a
// better permanent fix for this. - neilalexander, 2 Jan 2020
- "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
+ "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
const selectSharedUsersSQL = "" +
@@ -336,6 +337,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
headeredJSON,
membership,
addedAt,
+ event.Visibility,
)
return err
}
diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
new file mode 100644
index 00000000..2b4b6528
--- /dev/null
+++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
@@ -0,0 +1,50 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
+ m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
+}
+
+func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
+ _, err := tx.Exec(`
+ ALTER TABLE syncapi_output_room_events ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
+ UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
+ ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
+ UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
+ `)
+ if err != nil {
+ return fmt.Errorf("failed to execute upgrade: %w", err)
+ }
+ return nil
+}
+
+func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
+ _, err := tx.Exec(`
+ ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
+ ALTER TABLE syncapi_current_room_state DROP COLUMN IF EXISTS history_visibility;
+ `)
+ if err != nil {
+ return fmt.Errorf("failed to execute downgrade: %w", err)
+ }
+ return nil
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index d84d0cfa..d0474d4c 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -67,7 +67,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- events retrieved through backfilling that have a position in the stream
-- that relates to the moment these were retrieved rather than the moment these
-- were emitted.
- exclude_from_sync BOOL DEFAULT FALSE
+ exclude_from_sync BOOL DEFAULT FALSE,
+ -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
+ history_visibility SMALLINT NOT NULL DEFAULT 2
);
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
@@ -78,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
- "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
- ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
+ "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
+ ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"RETURNING id"
const selectEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectEventsWithFilterSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
@@ -96,7 +98,7 @@ const selectEventsWithFilterSQL = "" +
" LIMIT $7"
const selectRecentEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
@@ -105,7 +107,7 @@ const selectRecentEventsSQL = "" +
" ORDER BY id DESC LIMIT $8"
const selectRecentEventsForSyncSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
@@ -114,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" +
" ORDER BY id DESC LIMIT $8"
const selectEarlyEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
@@ -130,7 +132,7 @@ const updateEventJSONSQL = "" +
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
- "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
+ "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND room_id = ANY($3)" +
@@ -146,10 +148,10 @@ const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectContextEventSQL = "" +
- "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
+ "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
const selectContextBeforeEventSQL = "" +
- "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
+ "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
@@ -157,7 +159,7 @@ const selectContextBeforeEventSQL = "" +
" ORDER BY id DESC LIMIT $3"
const selectContextAfterEventSQL = "" +
- "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
+ "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
@@ -246,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for rows.Next() {
var (
- eventID string
- streamPos types.StreamPosition
- eventBytes []byte
- excludeFromSync bool
- addIDs pq.StringArray
- delIDs pq.StringArray
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ addIDs pq.StringArray
+ delIDs pq.StringArray
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
+ if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil {
return nil, nil, err
}
// Sanity check for deleted state and whine if we see it. We don't need to do anything
@@ -283,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
needSet[id] = true
}
stateNeeded[ev.RoomID()] = needSet
+ ev.Visibility = historyVisibility
eventIDToEvent[eventID] = types.StreamEvent{
HeaderedEvent: &ev,
@@ -314,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
- transactionID *api.TransactionID, excludeFromSync bool,
+ transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (streamPos types.StreamPosition, err error) {
var txnID *string
var sessionID *int64
@@ -351,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
sessionID,
txnID,
excludeFromSync,
+ historyVisibility,
).Scan(&streamPos)
return
}
@@ -504,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
- if err = row.Scan(&id, &eventAsString); err != nil {
+ var historyVisibility gomatrixserverlib.HistoryVisibility
+ if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
return 0, evt, err
}
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
return 0, evt, err
}
+ evt.Visibility = historyVisibility
return id, evt, nil
}
@@ -532,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
- eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ eventBytes []byte
+ evt *gomatrixserverlib.HeaderedEvent
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err = rows.Scan(&eventBytes); err != nil {
+ if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
return evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return evts, err
}
+ evt.Visibility = historyVisibility
evts = append(evts, evt)
}
@@ -565,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
- eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ eventBytes []byte
+ evt *gomatrixserverlib.HeaderedEvent
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err = rows.Scan(&lastID, &eventBytes); err != nil {
+ if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
return 0, evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return 0, evts, err
}
+ evt.Visibility = historyVisibility
evts = append(evts, evt)
}
@@ -584,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {
var (
- eventID string
- streamPos types.StreamPosition
- eventBytes []byte
- excludeFromSync bool
- sessionID *int64
- txnID *string
- transactionID *api.TransactionID
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
+ if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
return nil, err
}
// TODO: Handle redacted events
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 9cfe7c07..c6121e41 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -42,15 +42,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
return nil, err
}
- accountData, err := NewPostgresAccountDataTable(d.db)
- if err != nil {
+ if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
return nil, err
}
- events, err := NewPostgresEventsTable(d.db)
- if err != nil {
+ if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
return nil, err
}
- currState, err := NewPostgresCurrentRoomStateTable(d.db)
+ accountData, err := NewPostgresAccountDataTable(d.db)
if err != nil {
return nil, err
}
@@ -101,9 +99,19 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
+ deltas.LoadAddHistoryVisibilityColumn(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
+ // prepare statements after the migrations have run
+ events, err := NewPostgresEventsTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ currState, err := NewPostgresCurrentRoomStateTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index d1c5e2d1..932bda16 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -368,11 +368,12 @@ func (d *Database) WriteEvent(
addStateEvents []*gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
+ historyVisibility gomatrixserverlib.HistoryVisibility,
) (pduPosition types.StreamPosition, returnErr error) {
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
var err error
pos, err := d.OutputEvents.InsertEvent(
- ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
+ ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility,
)
if err != nil {
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
@@ -391,7 +392,9 @@ func (d *Database) WriteEvent(
// Nothing to do, the event may have just been a message event.
return nil
}
-
+ for i := range addStateEvents {
+ addStateEvents[i].Visibility = historyVisibility
+ }
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
})
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index 376c3a3d..501d0ee9 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -41,6 +41,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
headered_event_json TEXT NOT NULL,
membership TEXT,
added_at BIGINT,
+ history_visibility SMALLINT NOT NULL DEFAULT 2, -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
UNIQUE (room_id, type, state_key)
);
-- for event deletion
@@ -52,8 +53,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync
`
const upsertRoomStateSQL = "" +
- "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
- " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
+ "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" +
+ " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
" ON CONFLICT (room_id, type, state_key)" +
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
@@ -84,11 +85,11 @@ const selectStateEventSQL = "" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
- // TODO: The session_id and transaction_id blanks are here because otherwise
- // the rowsToStreamEvents expects there to be exactly six columns. We need to
+ // TODO: The session_id and transaction_id blanks are here because
+ // the rowsToStreamEvents expects there to be exactly seven columns. We need to
// figure out if these really need to be in the DB, and if so, we need a
// better permanent fix for this. - neilalexander, 2 Jan 2020
- "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
+ "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
const selectSharedUsersSQL = "" +
@@ -328,6 +329,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
headeredJSON,
membership,
addedAt,
+ event.Visibility,
)
return err
}
diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
new file mode 100644
index 00000000..f6bcaddf
--- /dev/null
+++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
@@ -0,0 +1,82 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
+ m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
+}
+
+func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
+ // SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists.
+ // Required for unit tests, as otherwise a duplicate column error will show up.
+ _, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
+ if err == nil {
+ return nil
+ }
+ _, err = tx.Exec(`
+ ALTER TABLE syncapi_output_room_events ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2;
+ UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
+ `)
+ if err != nil {
+ return fmt.Errorf("failed to execute upgrade: %w", err)
+ }
+
+ _, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
+ if err == nil {
+ return nil
+ }
+ _, err = tx.Exec(`
+ ALTER TABLE syncapi_current_room_state ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2;
+ UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
+ `)
+ if err != nil {
+ return fmt.Errorf("failed to execute upgrade: %w", err)
+ }
+ return nil
+}
+
+func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
+ // SQLite doesn't have "if exists", so check if the column exists.
+ _, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
+ if err != nil {
+ // The column probably doesn't exist
+ return nil
+ }
+ _, err = tx.Exec(`
+ ALTER TABLE syncapi_output_room_events DROP COLUMN history_visibility;
+ `)
+ if err != nil {
+ return fmt.Errorf("failed to execute downgrade: %w", err)
+ }
+ _, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
+ if err != nil {
+ // The column probably doesn't exist
+ return nil
+ }
+ _, err = tx.Exec(`
+ ALTER TABLE syncapi_current_room_state DROP COLUMN history_visibility;
+ `)
+ if err != nil {
+ return fmt.Errorf("failed to execute downgrade: %w", err)
+ }
+ return nil
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index f9961a9d..b3dcb44c 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -47,7 +47,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
remove_state_ids TEXT, -- JSON encoded string array
session_id BIGINT,
transaction_id TEXT,
- exclude_from_sync BOOL NOT NULL DEFAULT FALSE
+ exclude_from_sync BOOL NOT NULL DEFAULT FALSE,
+ history_visibility SMALLINT NOT NULL DEFAULT 2 -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
);
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
@@ -58,27 +59,27 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
- "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
- ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
- "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $13)"
+ "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
+ ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) " +
+ "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $14)"
const selectEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id IN ($1)"
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id IN ($1)"
const selectRecentEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
+ "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
@@ -90,7 +91,7 @@ const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
const selectStateInRangeSQL = "" +
- "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
+ "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" +
" AND room_id IN ($3)" +
@@ -102,15 +103,15 @@ const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectContextEventSQL = "" +
- "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
+ "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
const selectContextBeforeEventSQL = "" +
- "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
+ "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectContextAfterEventSQL = "" +
- "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
+ "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
@@ -196,14 +197,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for rows.Next() {
var (
- eventID string
- streamPos types.StreamPosition
- eventBytes []byte
- excludeFromSync bool
- addIDsJSON string
- delIDsJSON string
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ addIDsJSON string
+ delIDsJSON string
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
+ if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON, &historyVisibility); err != nil {
return nil, nil, err
}
@@ -239,6 +241,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
needSet[id] = true
}
stateNeeded[ev.RoomID()] = needSet
+ ev.Visibility = historyVisibility
eventIDToEvent[eventID] = types.StreamEvent{
HeaderedEvent: &ev,
@@ -270,7 +273,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
- transactionID *api.TransactionID, excludeFromSync bool,
+ transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
) (types.StreamPosition, error) {
var txnID *string
var sessionID *int64
@@ -326,6 +329,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
sessionID,
txnID,
excludeFromSync,
+ historyVisibility,
excludeFromSync,
)
return streamPos, err
@@ -481,15 +485,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {
var (
- eventID string
- streamPos types.StreamPosition
- eventBytes []byte
- excludeFromSync bool
- sessionID *int64
- txnID *string
- transactionID *api.TransactionID
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
+ if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
return nil, err
}
// TODO: Handle redacted events
@@ -505,6 +510,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
}
+ ev.Visibility = historyVisibility
+
result = append(result, types.StreamEvent{
HeaderedEvent: &ev,
StreamPosition: streamPos,
@@ -519,13 +526,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
- if err = row.Scan(&id, &eventAsString); err != nil {
+ var historyVisibility gomatrixserverlib.HistoryVisibility
+ if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
return 0, evt, err
}
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
return 0, evt, err
}
+ evt.Visibility = historyVisibility
return id, evt, nil
}
@@ -550,15 +559,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
for rows.Next() {
var (
- eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ eventBytes []byte
+ evt *gomatrixserverlib.HeaderedEvent
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err = rows.Scan(&eventBytes); err != nil {
+ if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
return evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return evts, err
}
+ evt.Visibility = historyVisibility
evts = append(evts, evt)
}
@@ -586,15 +597,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
for rows.Next() {
var (
- eventBytes []byte
- evt *gomatrixserverlib.HeaderedEvent
+ eventBytes []byte
+ evt *gomatrixserverlib.HeaderedEvent
+ historyVisibility gomatrixserverlib.HistoryVisibility
)
- if err = rows.Scan(&lastID, &eventBytes); err != nil {
+ if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
return 0, evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return 0, evts, err
}
+ evt.Visibility = historyVisibility
evts = append(evts, evt)
}
return lastID, evts, rows.Err()
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index e08a0ba8..39ceec81 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -52,15 +52,13 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err = d.streamID.Prepare(d.db); err != nil {
return err
}
- accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
- if err != nil {
+ if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
return err
}
- events, err := NewSqliteEventsTable(d.db, &d.streamID)
- if err != nil {
+ if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
return err
}
- roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
+ accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
if err != nil {
return err
}
@@ -111,9 +109,19 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
+ deltas.LoadAddHistoryVisibilityColumn(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return err
}
+ // prepare statements after the migrations have run
+ events, err := NewSqliteEventsTable(d.db, &d.streamID)
+ if err != nil {
+ return err
+ }
+ roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index c7415170..df03a33c 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -37,7 +37,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver
addStateEvents = append(addStateEvents, ev)
addStateEventIDs = append(addStateEventIDs, ev.EventID())
}
- pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false)
+ pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false, gomatrixserverlib.HistoryVisibilityShared)
if err != nil {
t.Fatalf("WriteEvent failed: %s", err)
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 08568d9a..d8fc8f50 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -52,7 +52,14 @@ type Peeks interface {
type Events interface {
SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string) (map[string]map[string]bool, map[string]types.StreamEvent, error)
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
- InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
+ InsertEvent(
+ ctx context.Context, txn *sql.Tx,
+ event *gomatrixserverlib.HeaderedEvent,
+ addState, removeState []string,
+ transactionID *api.TransactionID,
+ excludeFromSync bool,
+ historyVisibility gomatrixserverlib.HistoryVisibility,
+ ) (streamPos types.StreamPosition, err error)
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go
index 69bbd04c..bdb17ae2 100644
--- a/syncapi/storage/tables/output_room_events_test.go
+++ b/syncapi/storage/tables/output_room_events_test.go
@@ -53,7 +53,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
events := room.Events()
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
for _, ev := range events {
- _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false)
+ _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
if err != nil {
return fmt.Errorf("failed to InsertEvent: %s", err)
}
@@ -79,7 +79,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
"body": "test.txt",
"url": "mxc://test.txt",
})
- if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false); err != nil {
+ if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared); err != nil {
return fmt.Errorf("failed to InsertEvent: %s", err)
}
wantEventID := []string{urlEv.EventID()}