diff options
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/internal/history_visibility.go | 217 | ||||
-rw-r--r-- | syncapi/routing/context.go | 99 | ||||
-rw-r--r-- | syncapi/routing/messages.go | 100 | ||||
-rw-r--r-- | syncapi/storage/interface.go | 4 | ||||
-rw-r--r-- | syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go | 55 | ||||
-rw-r--r-- | syncapi/storage/postgres/memberships_table.go | 28 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 10 | ||||
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 15 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 49 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go | 55 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/memberships_table.go | 22 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 10 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 19 | ||||
-rw-r--r-- | syncapi/storage/storage_test.go | 20 | ||||
-rw-r--r-- | syncapi/storage/tables/interface.go | 1 | ||||
-rw-r--r-- | syncapi/streams/stream_pdu.go | 123 | ||||
-rw-r--r-- | syncapi/syncapi_test.go | 186 |
17 files changed, 818 insertions, 195 deletions
diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go new file mode 100644 index 00000000..e73c004e --- /dev/null +++ b/syncapi/internal/history_visibility.go @@ -0,0 +1,217 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "math" + "time" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" + "github.com/tidwall/gjson" +) + +func init() { + prometheus.MustRegister(calculateHistoryVisibilityDuration) +} + +// calculateHistoryVisibilityDuration stores the time it takes to +// calculate the history visibility. In polylith mode the roundtrip +// to the roomserver is included in this time. +var calculateHistoryVisibilityDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dendrite", + Subsystem: "syncapi", + Name: "calculateHistoryVisibility_duration_millis", + Help: "How long it takes to calculate the history visibility", + Buckets: []float64{ // milliseconds + 5, 10, 25, 50, 75, 100, 250, 500, + 1000, 2000, 3000, 4000, 5000, 6000, + 7000, 8000, 9000, 10000, 15000, 20000, + }, + }, + []string{"api"}, +) + +var historyVisibilityPriority = map[gomatrixserverlib.HistoryVisibility]uint8{ + gomatrixserverlib.WorldReadable: 0, + gomatrixserverlib.HistoryVisibilityShared: 1, + gomatrixserverlib.HistoryVisibilityInvited: 2, + gomatrixserverlib.HistoryVisibilityJoined: 3, +} + +// eventVisibility contains the history visibility and membership state at a given event +type eventVisibility struct { + visibility gomatrixserverlib.HistoryVisibility + membershipAtEvent string + membershipCurrent string +} + +// allowed checks the eventVisibility if the user is allowed to see the event. +// Rules as defined by https://spec.matrix.org/v1.3/client-server-api/#server-behaviour-5 +func (ev eventVisibility) allowed() (allowed bool) { + switch ev.visibility { + case gomatrixserverlib.HistoryVisibilityWorldReadable: + // If the history_visibility was set to world_readable, allow. + return true + case gomatrixserverlib.HistoryVisibilityJoined: + // If the user’s membership was join, allow. + if ev.membershipAtEvent == gomatrixserverlib.Join { + return true + } + return false + case gomatrixserverlib.HistoryVisibilityShared: + // If the user’s membership was join, allow. + // If history_visibility was set to shared, and the user joined the room at any point after the event was sent, allow. + if ev.membershipAtEvent == gomatrixserverlib.Join || ev.membershipCurrent == gomatrixserverlib.Join { + return true + } + return false + case gomatrixserverlib.HistoryVisibilityInvited: + // If the user’s membership was join, allow. + if ev.membershipAtEvent == gomatrixserverlib.Join { + return true + } + if ev.membershipAtEvent == gomatrixserverlib.Invite { + return true + } + return false + default: + return false + } +} + +// ApplyHistoryVisibilityFilter applies the room history visibility filter on gomatrixserverlib.HeaderedEvents. +// Returns the filtered events and an error, if any. +func ApplyHistoryVisibilityFilter( + ctx context.Context, + syncDB storage.Database, + rsAPI api.SyncRoomserverAPI, + events []*gomatrixserverlib.HeaderedEvent, + alwaysIncludeEventIDs map[string]struct{}, + userID, endpoint string, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + if len(events) == 0 { + return events, nil + } + start := time.Now() + + // try to get the current membership of the user + membershipCurrent, _, err := syncDB.SelectMembershipForUser(ctx, events[0].RoomID(), userID, math.MaxInt64) + if err != nil { + return nil, err + } + + // Get the mapping from eventID -> eventVisibility + eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) + visibilities, err := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID()) + if err != nil { + return eventsFiltered, err + } + for _, ev := range events { + evVis := visibilities[ev.EventID()] + evVis.membershipCurrent = membershipCurrent + // Always include specific state events for /sync responses + if alwaysIncludeEventIDs != nil { + if _, ok := alwaysIncludeEventIDs[ev.EventID()]; ok { + eventsFiltered = append(eventsFiltered, ev) + continue + } + } + // NOTSPEC: Always allow user to see their own membership events (spec contains more "rules") + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(userID) { + eventsFiltered = append(eventsFiltered, ev) + continue + } + // Always allow history evVis events on boundaries. This is done + // by setting the effective evVis to the least restrictive + // of the old vs new. + // https://spec.matrix.org/v1.3/client-server-api/#server-behaviour-5 + if hisVis, err := ev.HistoryVisibility(); err == nil { + prevHisVis := gjson.GetBytes(ev.Unsigned(), "prev_content.history_visibility").String() + oldPrio, ok := historyVisibilityPriority[gomatrixserverlib.HistoryVisibility(prevHisVis)] + // if we can't get the previous history visibility, default to shared. + if !ok { + oldPrio = historyVisibilityPriority[gomatrixserverlib.HistoryVisibilityShared] + } + // no OK check, since this should have been validated when setting the value + newPrio := historyVisibilityPriority[hisVis] + if oldPrio < newPrio { + evVis.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis) + } + } + // do the actual check + allowed := evVis.allowed() + if allowed { + eventsFiltered = append(eventsFiltered, ev) + } + } + calculateHistoryVisibilityDuration.With(prometheus.Labels{"api": endpoint}).Observe(float64(time.Since(start).Milliseconds())) + return eventsFiltered, nil +} + +// visibilityForEvents returns a map from eventID to eventVisibility containing the visibility and the membership +// of `userID` at the given event. +// Returns an error if the roomserver can't calculate the memberships. +func visibilityForEvents( + ctx context.Context, + rsAPI api.SyncRoomserverAPI, + events []*gomatrixserverlib.HeaderedEvent, + userID, roomID string, +) (map[string]eventVisibility, error) { + eventIDs := make([]string, len(events)) + for i := range events { + eventIDs[i] = events[i].EventID() + } + + result := make(map[string]eventVisibility, len(eventIDs)) + + // get the membership events for all eventIDs + membershipResp := &api.QueryMembershipAtEventResponse{} + err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembershipAtEventRequest{ + RoomID: roomID, + EventIDs: eventIDs, + UserID: userID, + }, membershipResp) + if err != nil { + return result, err + } + + // Create a map from eventID -> eventVisibility + for _, event := range events { + eventID := event.EventID() + vis := eventVisibility{ + membershipAtEvent: gomatrixserverlib.Leave, // default to leave, to not expose events by accident + visibility: event.Visibility, + } + membershipEvs, ok := membershipResp.Memberships[eventID] + if !ok { + result[eventID] = vis + continue + } + for _, ev := range membershipEvs { + membership, err := ev.Membership() + if err != nil { + return result, err + } + vis.membershipAtEvent = membership + } + result[eventID] = vis + } + return result, nil +} diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index f6b4d15e..13c4e9d8 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -21,10 +21,12 @@ import ( "fmt" "net/http" "strconv" + "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" roomserver "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -95,24 +97,6 @@ func Context( ContainsURL: filter.ContainsURL, } - // TODO: Get the actual state at the last event returned by SelectContextAfterEvent - state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) - // verify the user is allowed to see the context for this room/event - for _, x := range state { - var hisVis gomatrixserverlib.HistoryVisibility - hisVis, err = x.HistoryVisibility() - if err != nil { - continue - } - allowed := hisVis == gomatrixserverlib.WorldReadable || membershipRes.Membership == gomatrixserverlib.Join - if !allowed { - return util.JSONResponse{ - Code: http.StatusForbidden, - JSON: jsonerror.Forbidden("User is not allowed to query context"), - } - } - } - id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID) if err != nil { if err == sql.ErrNoRows { @@ -125,6 +109,24 @@ func Context( return jsonerror.InternalServerError() } + // verify the user is allowed to see the context for this room/event + startTime := time.Now() + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context") + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + return jsonerror.InternalServerError() + } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (context)") + if len(filteredEvents) == 0 { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("User is not allowed to query context"), + } + } + eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter) if err != nil && err != sql.ErrNoRows { logrus.WithError(err).Error("unable to fetch before events") @@ -137,8 +139,27 @@ func Context( return jsonerror.InternalServerError() } - eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll) - eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll) + startTime = time.Now() + eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, syncDB, rsAPI, eventsBefore, eventsAfter, device.UserID) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + return jsonerror.InternalServerError() + } + + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (context eventsBefore/eventsAfter)") + + // TODO: Get the actual state at the last event returned by SelectContextAfterEvent + state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) + if err != nil { + logrus.WithError(err).Error("unable to fetch current room state") + return jsonerror.InternalServerError() + } + + eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll) + eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll) newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache) response := ContextRespsonse{ @@ -162,6 +183,44 @@ func Context( } } +// applyHistoryVisibilityOnContextEvents is a helper function to avoid roundtrips to the roomserver +// by combining the events before and after the context event. Returns the filtered events, +// and an error, if any. +func applyHistoryVisibilityOnContextEvents( + ctx context.Context, syncDB storage.Database, rsAPI roomserver.SyncRoomserverAPI, + eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent, + userID string, +) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) { + eventIDsBefore := make(map[string]struct{}, len(eventsBefore)) + eventIDsAfter := make(map[string]struct{}, len(eventsAfter)) + + // Remember before/after eventIDs, so we can restore them + // after applying history visibility checks + for _, ev := range eventsBefore { + eventIDsBefore[ev.EventID()] = struct{}{} + } + for _, ev := range eventsAfter { + eventIDsAfter[ev.EventID()] = struct{}{} + } + + allEvents := append(eventsBefore, eventsAfter...) + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, allEvents, nil, userID, "context") + if err != nil { + return nil, nil, err + } + + // "Restore" events in the correct context + for _, ev := range filteredEvents { + if _, ok := eventIDsBefore[ev.EventID()]; ok { + filteredBefore = append(filteredBefore, ev) + } + if _, ok := eventIDsAfter[ev.EventID()]; ok { + filteredAfter = append(filteredAfter, ev) + } + } + return filteredBefore, filteredAfter, nil +} + func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { if len(startEvents) > 0 { start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index b4c9a542..9db3d8e1 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -19,6 +19,7 @@ import ( "fmt" "net/http" "sort" + "time" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -28,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" @@ -324,6 +326,9 @@ func (r *messagesReq) retrieveEvents() ( // reliable way to define it), it would be easier and less troublesome to // only have to change it in one place, i.e. the database. start, end, err = r.getStartEnd(events) + if err != nil { + return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, err + } // Sort the events to ensure we send them in the right order. if r.backwardOrdering { @@ -337,97 +342,18 @@ func (r *messagesReq) retrieveEvents() ( } events = reversed(events) } - events = r.filterHistoryVisible(events) if len(events) == 0 { return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, nil } - // Convert all of the events into client events. - clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll) - return clientEvents, start, end, err -} - -func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { - // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the - // user shouldn't see, we check the recent events and remove any prior to the join event of the user - // which is equiv to history_visibility: joined - joinEventIndex := -1 - for i, ev := range events { - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(r.device.UserID) { - membership, _ := ev.Membership() - if membership == "join" { - joinEventIndex = i - break - } - } - } - - var result []*gomatrixserverlib.HeaderedEvent - var eventsToCheck []*gomatrixserverlib.HeaderedEvent - if joinEventIndex != -1 { - if r.backwardOrdering { - result = events[:joinEventIndex+1] - eventsToCheck = append(eventsToCheck, result[0]) - } else { - result = events[joinEventIndex:] - eventsToCheck = append(eventsToCheck, result[len(result)-1]) - } - } else { - eventsToCheck = []*gomatrixserverlib.HeaderedEvent{events[0], events[len(events)-1]} - result = events - } - // make sure the user was in the room for both the earliest and latest events, we need this because - // some backpagination results will not have the join event (e.g if they hit /messages at the join event itself) - wasJoined := true - for _, ev := range eventsToCheck { - var queryRes api.QueryStateAfterEventsResponse - err := r.rsAPI.QueryStateAfterEvents(r.ctx, &api.QueryStateAfterEventsRequest{ - RoomID: ev.RoomID(), - PrevEventIDs: ev.PrevEventIDs(), - StateToFetch: []gomatrixserverlib.StateKeyTuple{ - {EventType: gomatrixserverlib.MRoomMember, StateKey: r.device.UserID}, - {EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}, - }, - }, &queryRes) - if err != nil { - wasJoined = false - break - } - var hisVisEvent, membershipEvent *gomatrixserverlib.HeaderedEvent - for i := range queryRes.StateEvents { - switch queryRes.StateEvents[i].Type() { - case gomatrixserverlib.MRoomMember: - membershipEvent = queryRes.StateEvents[i] - case gomatrixserverlib.MRoomHistoryVisibility: - hisVisEvent = queryRes.StateEvents[i] - } - } - if hisVisEvent == nil { - return events // apply no filtering as it defaults to Shared. - } - hisVis, _ := hisVisEvent.HistoryVisibility() - if hisVis == "shared" || hisVis == "world_readable" { - return events // apply no filtering - } - if membershipEvent == nil { - wasJoined = false - break - } - membership, err := membershipEvent.Membership() - if err != nil { - wasJoined = false - break - } - if membership != "join" { - wasJoined = false - break - } - } - if !wasJoined { - util.GetLogger(r.ctx).WithField("num_events", len(events)).Warnf("%s was not joined to room during these events, omitting them", r.device.UserID) - return []*gomatrixserverlib.HeaderedEvent{} - } - return result + // Apply room history visibility filter + startTime := time.Now() + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages") + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": r.roomID, + }).Debug("applied history visibility (messages)") + return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err } func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9751670b..43a75da9 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -161,6 +161,10 @@ type Database interface { IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error + // SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found + // returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty + // string as the membership. + SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) } type Presence interface { diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go index 29008ade..d68ed8d5 100644 --- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go @@ -17,7 +17,10 @@ package deltas import ( "context" "database/sql" + "encoding/json" "fmt" + + "github.com/matrix-org/gomatrixserverlib" ) func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error { @@ -31,6 +34,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T return nil } +// UpSetHistoryVisibility sets the history visibility for already stored events. +// Requires current_room_state and output_room_events to be created. +func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error { + // get the current room history visibilities + historyVisibilities, err := currentHistoryVisibilities(ctx, tx) + if err != nil { + return err + } + + // update the history visibility + for roomID, hisVis := range historyVisibilities { + _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1 + WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID) + if err != nil { + return fmt.Errorf("failed to update history visibility: %w", err) + } + } + + return nil +} + func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error { _, err := tx.ExecContext(ctx, ` ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; @@ -39,9 +63,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T if err != nil { return fmt.Errorf("failed to execute upgrade: %w", err) } + return nil } +// currentHistoryVisibilities returns a map from roomID to current history visibility. +// If the history visibility was changed after room creation, defaults to joined. +func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) { + rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state + WHERE type = 'm.room.history_visibility' AND state_key = ''; +`) + if err != nil { + return nil, fmt.Errorf("failed to query current room state: %w", err) + } + defer rows.Close() // nolint: errcheck + var eventBytes []byte + var roomID string + var event gomatrixserverlib.HeaderedEvent + var hisVis gomatrixserverlib.HistoryVisibility + historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) + for rows.Next() { + if err = rows.Scan(&roomID, &eventBytes); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + if err = json.Unmarshal(eventBytes, &event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined + if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 { + historyVisibilities[roomID] = hisVis + } + } + return historyVisibilities, nil +} + func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error { _, err := tx.ExecContext(ctx, ` ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility; diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go index 00223c57..939d6b3f 100644 --- a/syncapi/storage/postgres/memberships_table.go +++ b/syncapi/storage/postgres/memberships_table.go @@ -66,10 +66,14 @@ const selectMembershipCountSQL = "" + const selectHeroesSQL = "" + "SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5" +const selectMembershipBeforeSQL = "" + + "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1" + type membershipsStatements struct { - upsertMembershipStmt *sql.Stmt - selectMembershipCountStmt *sql.Stmt - selectHeroesStmt *sql.Stmt + upsertMembershipStmt *sql.Stmt + selectMembershipCountStmt *sql.Stmt + selectHeroesStmt *sql.Stmt + selectMembershipForUserStmt *sql.Stmt } func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { @@ -82,6 +86,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { {&s.upsertMembershipStmt, upsertMembershipSQL}, {&s.selectMembershipCountStmt, selectMembershipCountSQL}, {&s.selectHeroesStmt, selectHeroesSQL}, + {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL}, }.Prepare(db) } @@ -132,3 +137,20 @@ func (s *membershipsStatements) SelectHeroes( } return heroes, rows.Err() } + +// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found +// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty +// string as the membership. +func (s *membershipsStatements) SelectMembershipForUser( + ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64, +) (membership string, topologyPos int, err error) { + stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt) + err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos) + if err != nil { + if err == sql.ErrNoRows { + return "leave", 0, nil + } + return "", 0, err + } + return membership, topologyPos, nil +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 34ff6700..8f633640 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -191,10 +191,12 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "syncapi: add history visibility column (output_room_events)", - Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, - }) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: add history visibility column (output_room_events)", + Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, + }, + ) err = m.Up(context.Background()) if err != nil { return nil, err diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index a044716c..979ff664 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" ) @@ -97,6 +98,20 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err != nil { return nil, err } + + // apply migrations which need multiple tables + m := sqlutil.NewMigrator(d.db) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: set history visibility for existing events", + Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created. + }, + ) + err = m.Up(base.Context()) + if err != nil { + return nil, err + } + d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 932bda16..a46e5525 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -231,7 +231,7 @@ func (d *Database) AddPeek( return } -// DeletePeeks tracks the fact that a user has stopped peeking from the specified +// DeletePeek tracks the fact that a user has stopped peeking from the specified // device. If the peeks was successfully deleted this returns the stream ID it was // stored at. Returns an error if there was a problem communicating with the database. func (d *Database) DeletePeek( @@ -372,6 +372,7 @@ func (d *Database) WriteEvent( ) (pduPosition types.StreamPosition, returnErr error) { returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { var err error + ev.Visibility = historyVisibility pos, err := d.OutputEvents.InsertEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility, ) @@ -563,7 +564,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda return err } -// Retrieve the backward topology position, i.e. the position of the +// GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the // oldest event in the room's topology. func (d *Database) GetBackwardTopologyPos( ctx context.Context, @@ -674,7 +675,7 @@ func (d *Database) fetchMissingStateEvents( return events, nil } -// getStateDeltas returns the state deltas between fromPos and toPos, +// GetStateDeltas returns the state deltas between fromPos and toPos, // exclusive of oldPos, inclusive of newPos, for the rooms in which // the user has new membership events. // A list of joined room IDs is also returned in case the caller needs it. @@ -812,7 +813,7 @@ func (d *Database) GetStateDeltas( return deltas, joinedRoomIDs, nil } -// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// GetStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync // requests with full_state=true. // Fetches full state for all joined rooms and uses selectStateInRange to get // updates for other rooms. @@ -1039,37 +1040,41 @@ func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID s return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to) } -func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID) +func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID) } -func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter) +func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter) } -func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) +func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) } -func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) { - return s.Ignores.SelectIgnores(ctx, userID) +func (d *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) { + return d.Ignores.SelectIgnores(ctx, userID) } -func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error { - return s.Ignores.UpsertIgnores(ctx, userID, ignores) +func (d *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error { + return d.Ignores.UpsertIgnores(ctx, userID, ignores) } -func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { - return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) +func (d *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { + return d.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) } -func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { - return s.Presence.GetPresenceForUser(ctx, nil, userID) +func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { + return d.Presence.GetPresenceForUser(ctx, nil, userID) } -func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) { - return s.Presence.GetPresenceAfter(ctx, nil, after, filter) +func (d *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) { + return d.Presence.GetPresenceAfter(ctx, nil, after, filter) } -func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { - return s.Presence.GetMaxPresenceID(ctx, nil) +func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { + return d.Presence.GetMaxPresenceID(ctx, nil) +} + +func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) { + return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos) } diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go index 07917721..d23f0756 100644 --- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go @@ -17,7 +17,10 @@ package deltas import ( "context" "database/sql" + "encoding/json" "fmt" + + "github.com/matrix-org/gomatrixserverlib" ) func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error { @@ -37,6 +40,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T return nil } +// UpSetHistoryVisibility sets the history visibility for already stored events. +// Requires current_room_state and output_room_events to be created. +func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error { + // get the current room history visibilities + historyVisibilities, err := currentHistoryVisibilities(ctx, tx) + if err != nil { + return err + } + + // update the history visibility + for roomID, hisVis := range historyVisibilities { + _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1 + WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID) + if err != nil { + return fmt.Errorf("failed to update history visibility: %w", err) + } + } + + return nil +} + func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error { // SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists. // Required for unit tests, as otherwise a duplicate column error will show up. @@ -51,9 +75,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T if err != nil { return fmt.Errorf("failed to execute upgrade: %w", err) } + return nil } +// currentHistoryVisibilities returns a map from roomID to current history visibility. +// If the history visibility was changed after room creation, defaults to joined. +func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) { + rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state + WHERE type = 'm.room.history_visibility' AND state_key = ''; +`) + if err != nil { + return nil, fmt.Errorf("failed to query current room state: %w", err) + } + defer rows.Close() // nolint: errcheck + var eventBytes []byte + var roomID string + var event gomatrixserverlib.HeaderedEvent + var hisVis gomatrixserverlib.HistoryVisibility + historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) + for rows.Next() { + if err = rows.Scan(&roomID, &eventBytes); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + if err = json.Unmarshal(eventBytes, &event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined + if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 { + historyVisibilities[roomID] = hisVis + } + } + return historyVisibilities, nil +} + func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error { // SQLite doesn't have "if exists", so check if the column exists. _, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_output_room_events LIMIT 1") diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go index e4daa99c..0c966fca 100644 --- a/syncapi/storage/sqlite3/memberships_table.go +++ b/syncapi/storage/sqlite3/memberships_table.go @@ -66,11 +66,15 @@ const selectMembershipCountSQL = "" + const selectHeroesSQL = "" + "SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5" +const selectMembershipBeforeSQL = "" + + "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1" + type membershipsStatements struct { db *sql.DB upsertMembershipStmt *sql.Stmt selectMembershipCountStmt *sql.Stmt //selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic + selectMembershipForUserStmt *sql.Stmt } func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { @@ -84,6 +88,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { return s, sqlutil.StatementList{ {&s.upsertMembershipStmt, upsertMembershipSQL}, {&s.selectMembershipCountStmt, selectMembershipCountSQL}, + {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL}, // {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic }.Prepare(db) } @@ -148,3 +153,20 @@ func (s *membershipsStatements) SelectHeroes( } return heroes, rows.Err() } + +// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found +// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty +// string as the membership. +func (s *membershipsStatements) SelectMembershipForUser( + ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64, +) (membership string, topologyPos int, err error) { + stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt) + err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos) + if err != nil { + if err == sql.ErrNoRows { + return "leave", 0, nil + } + return "", 0, err + } + return membership, topologyPos, nil +} diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index de389fa9..91fd35b5 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -139,10 +139,12 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "syncapi: add history visibility column (output_room_events)", - Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, - }) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: add history visibility column (output_room_events)", + Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, + }, + ) err = m.Up(context.Background()) if err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 5c5eb0f5..a84e2bd1 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -16,12 +16,14 @@ package sqlite3 import ( + "context" "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" ) // SyncServerDatasource represents a sync server datasource which manages @@ -41,13 +43,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } - if err = d.prepare(); err != nil { + if err = d.prepare(base.Context()); err != nil { return nil, err } return &d, nil } -func (d *SyncServerDatasource) prepare() (err error) { +func (d *SyncServerDatasource) prepare(ctx context.Context) (err error) { if err = d.streamID.Prepare(d.db); err != nil { return err } @@ -107,6 +109,19 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } + + // apply migrations which need multiple tables + m := sqlutil.NewMigrator(d.db) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: set history visibility for existing events", + Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created. + }, + ) + err = m.Up(ctx) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index eda5ef3e..a62818e9 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -12,20 +12,22 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" + "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" ) var ctx = context.Background() -func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { +func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{ + base, closeBase := testrig.CreateBaseDendrite(t, dbType) + db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } - return db, close + return db, close, closeBase } func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) { @@ -51,8 +53,9 @@ func TestWriteEvents(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { alice := test.NewUser(t) r := test.NewRoom(t, alice) - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() MustWriteEvents(t, db, r.Events()) }) } @@ -60,8 +63,9 @@ func TestWriteEvents(t *testing.T) { // These tests assert basic functionality of RecentEvents for PDUs func TestRecentEventsPDU(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() alice := test.NewUser(t) // dummy room to make sure SQL queries are filtering on room ID MustWriteEvents(t, db, test.NewRoom(t, alice).Events()) @@ -163,8 +167,9 @@ func TestRecentEventsPDU(t *testing.T) { // The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token func TestGetEventsInRangeWithTopologyToken(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() alice := test.NewUser(t) r := test.NewRoom(t, alice) for i := 0; i < 10; i++ { @@ -404,8 +409,9 @@ func TestSendToDeviceBehaviour(t *testing.T) { bob := test.NewUser(t) deviceID := "one" test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() // At this point there should be no messages. We haven't sent anything // yet. _, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index d68351d4..468d26ac 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -185,6 +185,7 @@ type Memberships interface { UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error) SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error) + SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) } type NotificationData interface { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1ad3adc4..136cbea5 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -10,10 +10,13 @@ import ( "github.com/matrix-org/dendrite/internal/caching" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "go.uber.org/atomic" @@ -123,7 +126,7 @@ func (p *PDUStreamProvider) CompleteSync( defer reqWaitGroup.Done() jr, jerr := p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, ) if jerr != nil { req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") @@ -149,7 +152,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -281,12 +284,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } } - if len(recentEvents) > 0 { - updateLatestPosition(recentEvents[len(recentEvents)-1].EventID()) - } - if len(delta.StateEvents) > 0 { - updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) - } if stateFilter.LazyLoadMembers { delta.StateEvents, err = p.lazyLoadMembers( @@ -306,6 +303,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } + // Applies the history visibility rules + events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + } + + if len(events) > 0 { + updateLatestPosition(events[len(events)-1].EventID()) + } + if len(delta.StateEvents) > 0 { + updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) + } + switch delta.Membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() @@ -313,14 +323,17 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition) } jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + jr.Timeline.Limited = limited && len(events) == len(recentEvents) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.RoomID] = *jr case gomatrixserverlib.Peek: jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch + // TODO: Apply history visibility on peeked rooms jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) @@ -330,12 +343,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( fallthrough // transitions to leave are the same as ban case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.PrevBatch = &prevBatch - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + lr.Timeline.Limited = limited && len(events) == len(recentEvents) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Leave[delta.RoomID] = *lr } @@ -343,6 +356,41 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } +// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make +// sure we always return the required events in the timeline. +func applyHistoryVisibilityFilter( + ctx context.Context, + db storage.Database, + rsAPI roomserverAPI.SyncRoomserverAPI, + roomID, userID string, + limit int, + recentEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + // We need to make sure we always include the latest states events, if they are in the timeline. + // We grep at least limit * 2 events, to ensure we really get the needed events. + stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil) + if err != nil { + // Not a fatal error, we can continue without the stateEvents, + // they are only needed if there are state events in the timeline. + logrus.WithError(err).Warnf("failed to get current room state") + } + alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) + for _, ev := range stateEvents { + alwaysIncludeIDs[ev.EventID()] = struct{}{} + } + startTime := time.Now() + events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") + if err != nil { + + return nil, err + } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (sync)") + return events, nil +} + func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { // Work out how many members are in the room. joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) @@ -390,6 +438,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( eventFilter *gomatrixserverlib.RoomEventFilter, wantFullState bool, device *userapi.Device, + isPeek bool, ) (jr *types.JoinResponse, err error) { jr = types.NewJoinResponse() // TODO: When filters are added, we may need to call this multiple times to get enough events. @@ -404,33 +453,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return } - // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the - // user shouldn't see, we check the recent events and remove any prior to the join event of the user - // which is equiv to history_visibility: joined - joinEventIndex := -1 - for i := len(recentStreamEvents) - 1; i >= 0; i-- { - ev := recentStreamEvents[i] - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { - membership, _ := ev.Membership() - if membership == "join" { - joinEventIndex = i - if i > 0 { - // the create event happens before the first join, so we should cut it at that point instead - if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { - joinEventIndex = i - 1 - break - } - } - break - } - } - } - if joinEventIndex != -1 { - // cut all events earlier than the join (but not the join itself) - recentStreamEvents = recentStreamEvents[joinEventIndex:] - limited = false // so clients know not to try to backpaginate - } - // Work our way through the timeline events and pick out the event IDs // of any state events that appear in the timeline. We'll specifically // exclude them at the next step, so that we don't get duplicate state @@ -474,6 +496,19 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + events := recentEvents + // Only apply history visibility checks if the response is for joined rooms + if !isPeek { + events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + } + } + + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + limited = limited && len(events) == len(recentEvents) + if stateFilter.LazyLoadMembers { if err != nil { return nil, err @@ -488,8 +523,10 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } jr.Timeline.PrevBatch = prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + jr.Timeline.Limited = limited && len(events) == len(recentEvents) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) return jr, nil } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 931fef88..dc073a16 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" @@ -54,6 +55,16 @@ func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsap return nil } +func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error { + res.IsRoomForgotten = false + res.RoomExists = true + return nil +} + +func (s *syncRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, req *rsapi.QueryMembershipAtEventRequest, res *rsapi.QueryMembershipAtEventResponse) error { + return nil +} + type syncUserAPI struct { userapi.SyncUserAPI accounts []userapi.Device @@ -107,7 +118,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) - msgs := toNATSMsgs(t, base, room.Events()) + msgs := toNATSMsgs(t, base, room.Events()...) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) testrig.MustPublishMsgs(t, jsctx, msgs...) @@ -200,7 +211,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { // m.room.power_levels // m.room.join_rules // m.room.history_visibility - msgs := toNATSMsgs(t, base, room.Events()) + msgs := toNATSMsgs(t, base, room.Events()...) sinceTokens := make([]string, len(msgs)) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) for i, msg := range msgs { @@ -315,6 +326,174 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { } +// This is mainly what Sytest is doing in "test_history_visibility" +func TestMessageHistoryVisibility(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testHistoryVisibility(t, dbType) + }) +} + +func testHistoryVisibility(t *testing.T, dbType test.DBType) { + type result struct { + seeWithoutJoin bool + seeBeforeJoin bool + seeAfterInvite bool + } + + // create the users + alice := test.NewUser(t) + bob := test.NewUser(t) + + bobDev := userapi.Device{ + ID: "BOBID", + UserID: bob.ID, + AccessToken: "BOD_BEARER_TOKEN", + DisplayName: "BOB", + } + + ctx := context.Background() + // check guest and normal user accounts + for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} { + testCases := []struct { + historyVisibility gomatrixserverlib.HistoryVisibility + wantResult result + }{ + { + historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable, + wantResult: result{ + seeWithoutJoin: true, + seeBeforeJoin: true, + seeAfterInvite: true, + }, + }, + { + historyVisibility: gomatrixserverlib.HistoryVisibilityShared, + wantResult: result{ + seeWithoutJoin: false, + seeBeforeJoin: true, + seeAfterInvite: true, + }, + }, + { + historyVisibility: gomatrixserverlib.HistoryVisibilityInvited, + wantResult: result{ + seeWithoutJoin: false, + seeBeforeJoin: false, + seeAfterInvite: true, + }, + }, + { + historyVisibility: gomatrixserverlib.HistoryVisibilityJoined, + wantResult: result{ + seeWithoutJoin: false, + seeBeforeJoin: false, + seeAfterInvite: false, + }, + }, + } + + bobDev.AccountType = accType + userType := "guest" + if accType == userapi.AccountTypeUser { + userType = "real user" + } + + base, close := testrig.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + + // Use the actual internal roomserver API + rsAPI := roomserver.NewInternalAPI(base) + rsAPI.SetFederationAPI(nil, nil) + + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{}) + + for _, tc := range testCases { + testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) + t.Run(testname, func(t *testing.T) { + // create a room with the given visibility + room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility)) + + // send the events/messages to NATS to create the rooms + beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)}) + eventsToSend := append(room.Events(), beforeJoinEv) + if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + // There is only one event, we expect only to be able to see this, if the room is world_readable + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ + "access_token": bobDev.AccessToken, + "dir": "b", + }))) + if w.Code != 200 { + t.Logf("%s", w.Body.String()) + t.Fatalf("got HTTP %d want %d", w.Code, 200) + } + // We only care about the returned events at this point + var res struct { + Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` + } + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + + verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk) + + // Create invite, a message, join the room and create another message. + inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID)) + afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)}) + joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID)) + msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)}) + + eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv) + + if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + // Verify the messages after/before invite are visible or not + w = httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ + "access_token": bobDev.AccessToken, + "dir": "b", + }))) + if w.Code != 200 { + t.Logf("%s", w.Body.String()) + t.Fatalf("got HTTP %d want %d", w.Code, 200) + } + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + // verify results + verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk) + verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk) + }) + } + } +} + +func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []gomatrixserverlib.ClientEvent) { + t.Helper() + if wantVisible { + for _, ev := range chunk { + if ev.EventID == wantVisibleEvent.EventID() { + return + } + } + t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk) + } else { + for _, ev := range chunk { + if ev.EventID == wantVisibleEvent.EventID() { + t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content)) + } + } + } +} + func TestSendToDevice(t *testing.T) { test.WithAllDatabases(t, testSendToDevice) } @@ -448,7 +627,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { } } -func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg { +func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg { result := make([]*nats.Msg, len(input)) for i, ev := range input { var addsStateIDs []string @@ -460,6 +639,7 @@ func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverli NewRoomEvent: &rsapi.OutputNewRoomEvent{ Event: ev, AddsStateEventIDs: addsStateIDs, + HistoryVisibility: ev.Visibility, }, }) } |