aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/internal/history_visibility.go217
-rw-r--r--syncapi/routing/context.go99
-rw-r--r--syncapi/routing/messages.go100
-rw-r--r--syncapi/storage/interface.go4
-rw-r--r--syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go55
-rw-r--r--syncapi/storage/postgres/memberships_table.go28
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go10
-rw-r--r--syncapi/storage/postgres/syncserver.go15
-rw-r--r--syncapi/storage/shared/syncserver.go49
-rw-r--r--syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go55
-rw-r--r--syncapi/storage/sqlite3/memberships_table.go22
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go10
-rw-r--r--syncapi/storage/sqlite3/syncserver.go19
-rw-r--r--syncapi/storage/storage_test.go20
-rw-r--r--syncapi/storage/tables/interface.go1
-rw-r--r--syncapi/streams/stream_pdu.go123
-rw-r--r--syncapi/syncapi_test.go186
17 files changed, 818 insertions, 195 deletions
diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go
new file mode 100644
index 00000000..e73c004e
--- /dev/null
+++ b/syncapi/internal/history_visibility.go
@@ -0,0 +1,217 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "math"
+ "time"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/tidwall/gjson"
+)
+
+func init() {
+ prometheus.MustRegister(calculateHistoryVisibilityDuration)
+}
+
+// calculateHistoryVisibilityDuration stores the time it takes to
+// calculate the history visibility. In polylith mode the roundtrip
+// to the roomserver is included in this time.
+var calculateHistoryVisibilityDuration = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "dendrite",
+ Subsystem: "syncapi",
+ Name: "calculateHistoryVisibility_duration_millis",
+ Help: "How long it takes to calculate the history visibility",
+ Buckets: []float64{ // milliseconds
+ 5, 10, 25, 50, 75, 100, 250, 500,
+ 1000, 2000, 3000, 4000, 5000, 6000,
+ 7000, 8000, 9000, 10000, 15000, 20000,
+ },
+ },
+ []string{"api"},
+)
+
+var historyVisibilityPriority = map[gomatrixserverlib.HistoryVisibility]uint8{
+ gomatrixserverlib.WorldReadable: 0,
+ gomatrixserverlib.HistoryVisibilityShared: 1,
+ gomatrixserverlib.HistoryVisibilityInvited: 2,
+ gomatrixserverlib.HistoryVisibilityJoined: 3,
+}
+
+// eventVisibility contains the history visibility and membership state at a given event
+type eventVisibility struct {
+ visibility gomatrixserverlib.HistoryVisibility
+ membershipAtEvent string
+ membershipCurrent string
+}
+
+// allowed checks the eventVisibility if the user is allowed to see the event.
+// Rules as defined by https://spec.matrix.org/v1.3/client-server-api/#server-behaviour-5
+func (ev eventVisibility) allowed() (allowed bool) {
+ switch ev.visibility {
+ case gomatrixserverlib.HistoryVisibilityWorldReadable:
+ // If the history_visibility was set to world_readable, allow.
+ return true
+ case gomatrixserverlib.HistoryVisibilityJoined:
+ // If the user’s membership was join, allow.
+ if ev.membershipAtEvent == gomatrixserverlib.Join {
+ return true
+ }
+ return false
+ case gomatrixserverlib.HistoryVisibilityShared:
+ // If the user’s membership was join, allow.
+ // If history_visibility was set to shared, and the user joined the room at any point after the event was sent, allow.
+ if ev.membershipAtEvent == gomatrixserverlib.Join || ev.membershipCurrent == gomatrixserverlib.Join {
+ return true
+ }
+ return false
+ case gomatrixserverlib.HistoryVisibilityInvited:
+ // If the user’s membership was join, allow.
+ if ev.membershipAtEvent == gomatrixserverlib.Join {
+ return true
+ }
+ if ev.membershipAtEvent == gomatrixserverlib.Invite {
+ return true
+ }
+ return false
+ default:
+ return false
+ }
+}
+
+// ApplyHistoryVisibilityFilter applies the room history visibility filter on gomatrixserverlib.HeaderedEvents.
+// Returns the filtered events and an error, if any.
+func ApplyHistoryVisibilityFilter(
+ ctx context.Context,
+ syncDB storage.Database,
+ rsAPI api.SyncRoomserverAPI,
+ events []*gomatrixserverlib.HeaderedEvent,
+ alwaysIncludeEventIDs map[string]struct{},
+ userID, endpoint string,
+) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ if len(events) == 0 {
+ return events, nil
+ }
+ start := time.Now()
+
+ // try to get the current membership of the user
+ membershipCurrent, _, err := syncDB.SelectMembershipForUser(ctx, events[0].RoomID(), userID, math.MaxInt64)
+ if err != nil {
+ return nil, err
+ }
+
+ // Get the mapping from eventID -> eventVisibility
+ eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
+ visibilities, err := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
+ if err != nil {
+ return eventsFiltered, err
+ }
+ for _, ev := range events {
+ evVis := visibilities[ev.EventID()]
+ evVis.membershipCurrent = membershipCurrent
+ // Always include specific state events for /sync responses
+ if alwaysIncludeEventIDs != nil {
+ if _, ok := alwaysIncludeEventIDs[ev.EventID()]; ok {
+ eventsFiltered = append(eventsFiltered, ev)
+ continue
+ }
+ }
+ // NOTSPEC: Always allow user to see their own membership events (spec contains more "rules")
+ if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(userID) {
+ eventsFiltered = append(eventsFiltered, ev)
+ continue
+ }
+ // Always allow history evVis events on boundaries. This is done
+ // by setting the effective evVis to the least restrictive
+ // of the old vs new.
+ // https://spec.matrix.org/v1.3/client-server-api/#server-behaviour-5
+ if hisVis, err := ev.HistoryVisibility(); err == nil {
+ prevHisVis := gjson.GetBytes(ev.Unsigned(), "prev_content.history_visibility").String()
+ oldPrio, ok := historyVisibilityPriority[gomatrixserverlib.HistoryVisibility(prevHisVis)]
+ // if we can't get the previous history visibility, default to shared.
+ if !ok {
+ oldPrio = historyVisibilityPriority[gomatrixserverlib.HistoryVisibilityShared]
+ }
+ // no OK check, since this should have been validated when setting the value
+ newPrio := historyVisibilityPriority[hisVis]
+ if oldPrio < newPrio {
+ evVis.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis)
+ }
+ }
+ // do the actual check
+ allowed := evVis.allowed()
+ if allowed {
+ eventsFiltered = append(eventsFiltered, ev)
+ }
+ }
+ calculateHistoryVisibilityDuration.With(prometheus.Labels{"api": endpoint}).Observe(float64(time.Since(start).Milliseconds()))
+ return eventsFiltered, nil
+}
+
+// visibilityForEvents returns a map from eventID to eventVisibility containing the visibility and the membership
+// of `userID` at the given event.
+// Returns an error if the roomserver can't calculate the memberships.
+func visibilityForEvents(
+ ctx context.Context,
+ rsAPI api.SyncRoomserverAPI,
+ events []*gomatrixserverlib.HeaderedEvent,
+ userID, roomID string,
+) (map[string]eventVisibility, error) {
+ eventIDs := make([]string, len(events))
+ for i := range events {
+ eventIDs[i] = events[i].EventID()
+ }
+
+ result := make(map[string]eventVisibility, len(eventIDs))
+
+ // get the membership events for all eventIDs
+ membershipResp := &api.QueryMembershipAtEventResponse{}
+ err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembershipAtEventRequest{
+ RoomID: roomID,
+ EventIDs: eventIDs,
+ UserID: userID,
+ }, membershipResp)
+ if err != nil {
+ return result, err
+ }
+
+ // Create a map from eventID -> eventVisibility
+ for _, event := range events {
+ eventID := event.EventID()
+ vis := eventVisibility{
+ membershipAtEvent: gomatrixserverlib.Leave, // default to leave, to not expose events by accident
+ visibility: event.Visibility,
+ }
+ membershipEvs, ok := membershipResp.Memberships[eventID]
+ if !ok {
+ result[eventID] = vis
+ continue
+ }
+ for _, ev := range membershipEvs {
+ membership, err := ev.Membership()
+ if err != nil {
+ return result, err
+ }
+ vis.membershipAtEvent = membership
+ }
+ result[eventID] = vis
+ }
+ return result, nil
+}
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index f6b4d15e..13c4e9d8 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -21,10 +21,12 @@ import (
"fmt"
"net/http"
"strconv"
+ "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -95,24 +97,6 @@ func Context(
ContainsURL: filter.ContainsURL,
}
- // TODO: Get the actual state at the last event returned by SelectContextAfterEvent
- state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
- // verify the user is allowed to see the context for this room/event
- for _, x := range state {
- var hisVis gomatrixserverlib.HistoryVisibility
- hisVis, err = x.HistoryVisibility()
- if err != nil {
- continue
- }
- allowed := hisVis == gomatrixserverlib.WorldReadable || membershipRes.Membership == gomatrixserverlib.Join
- if !allowed {
- return util.JSONResponse{
- Code: http.StatusForbidden,
- JSON: jsonerror.Forbidden("User is not allowed to query context"),
- }
- }
- }
-
id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID)
if err != nil {
if err == sql.ErrNoRows {
@@ -125,6 +109,24 @@ func Context(
return jsonerror.InternalServerError()
}
+ // verify the user is allowed to see the context for this room/event
+ startTime := time.Now()
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
+ if err != nil {
+ logrus.WithError(err).Error("unable to apply history visibility filter")
+ return jsonerror.InternalServerError()
+ }
+ logrus.WithFields(logrus.Fields{
+ "duration": time.Since(startTime),
+ "room_id": roomID,
+ }).Debug("applied history visibility (context)")
+ if len(filteredEvents) == 0 {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Forbidden("User is not allowed to query context"),
+ }
+ }
+
eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch before events")
@@ -137,8 +139,27 @@ func Context(
return jsonerror.InternalServerError()
}
- eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll)
- eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll)
+ startTime = time.Now()
+ eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, syncDB, rsAPI, eventsBefore, eventsAfter, device.UserID)
+ if err != nil {
+ logrus.WithError(err).Error("unable to apply history visibility filter")
+ return jsonerror.InternalServerError()
+ }
+
+ logrus.WithFields(logrus.Fields{
+ "duration": time.Since(startTime),
+ "room_id": roomID,
+ }).Debug("applied history visibility (context eventsBefore/eventsAfter)")
+
+ // TODO: Get the actual state at the last event returned by SelectContextAfterEvent
+ state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
+ if err != nil {
+ logrus.WithError(err).Error("unable to fetch current room state")
+ return jsonerror.InternalServerError()
+ }
+
+ eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll)
+ eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll)
newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache)
response := ContextRespsonse{
@@ -162,6 +183,44 @@ func Context(
}
}
+// applyHistoryVisibilityOnContextEvents is a helper function to avoid roundtrips to the roomserver
+// by combining the events before and after the context event. Returns the filtered events,
+// and an error, if any.
+func applyHistoryVisibilityOnContextEvents(
+ ctx context.Context, syncDB storage.Database, rsAPI roomserver.SyncRoomserverAPI,
+ eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent,
+ userID string,
+) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) {
+ eventIDsBefore := make(map[string]struct{}, len(eventsBefore))
+ eventIDsAfter := make(map[string]struct{}, len(eventsAfter))
+
+ // Remember before/after eventIDs, so we can restore them
+ // after applying history visibility checks
+ for _, ev := range eventsBefore {
+ eventIDsBefore[ev.EventID()] = struct{}{}
+ }
+ for _, ev := range eventsAfter {
+ eventIDsAfter[ev.EventID()] = struct{}{}
+ }
+
+ allEvents := append(eventsBefore, eventsAfter...)
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, allEvents, nil, userID, "context")
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // "Restore" events in the correct context
+ for _, ev := range filteredEvents {
+ if _, ok := eventIDsBefore[ev.EventID()]; ok {
+ filteredBefore = append(filteredBefore, ev)
+ }
+ if _, ok := eventIDsAfter[ev.EventID()]; ok {
+ filteredAfter = append(filteredAfter, ev)
+ }
+ }
+ return filteredBefore, filteredAfter, nil
+}
+
func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
if len(startEvents) > 0 {
start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID())
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index b4c9a542..9db3d8e1 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -19,6 +19,7 @@ import (
"fmt"
"net/http"
"sort"
+ "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -28,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -324,6 +326,9 @@ func (r *messagesReq) retrieveEvents() (
// reliable way to define it), it would be easier and less troublesome to
// only have to change it in one place, i.e. the database.
start, end, err = r.getStartEnd(events)
+ if err != nil {
+ return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, err
+ }
// Sort the events to ensure we send them in the right order.
if r.backwardOrdering {
@@ -337,97 +342,18 @@ func (r *messagesReq) retrieveEvents() (
}
events = reversed(events)
}
- events = r.filterHistoryVisible(events)
if len(events) == 0 {
return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, nil
}
- // Convert all of the events into client events.
- clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll)
- return clientEvents, start, end, err
-}
-
-func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
- // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
- // user shouldn't see, we check the recent events and remove any prior to the join event of the user
- // which is equiv to history_visibility: joined
- joinEventIndex := -1
- for i, ev := range events {
- if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(r.device.UserID) {
- membership, _ := ev.Membership()
- if membership == "join" {
- joinEventIndex = i
- break
- }
- }
- }
-
- var result []*gomatrixserverlib.HeaderedEvent
- var eventsToCheck []*gomatrixserverlib.HeaderedEvent
- if joinEventIndex != -1 {
- if r.backwardOrdering {
- result = events[:joinEventIndex+1]
- eventsToCheck = append(eventsToCheck, result[0])
- } else {
- result = events[joinEventIndex:]
- eventsToCheck = append(eventsToCheck, result[len(result)-1])
- }
- } else {
- eventsToCheck = []*gomatrixserverlib.HeaderedEvent{events[0], events[len(events)-1]}
- result = events
- }
- // make sure the user was in the room for both the earliest and latest events, we need this because
- // some backpagination results will not have the join event (e.g if they hit /messages at the join event itself)
- wasJoined := true
- for _, ev := range eventsToCheck {
- var queryRes api.QueryStateAfterEventsResponse
- err := r.rsAPI.QueryStateAfterEvents(r.ctx, &api.QueryStateAfterEventsRequest{
- RoomID: ev.RoomID(),
- PrevEventIDs: ev.PrevEventIDs(),
- StateToFetch: []gomatrixserverlib.StateKeyTuple{
- {EventType: gomatrixserverlib.MRoomMember, StateKey: r.device.UserID},
- {EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""},
- },
- }, &queryRes)
- if err != nil {
- wasJoined = false
- break
- }
- var hisVisEvent, membershipEvent *gomatrixserverlib.HeaderedEvent
- for i := range queryRes.StateEvents {
- switch queryRes.StateEvents[i].Type() {
- case gomatrixserverlib.MRoomMember:
- membershipEvent = queryRes.StateEvents[i]
- case gomatrixserverlib.MRoomHistoryVisibility:
- hisVisEvent = queryRes.StateEvents[i]
- }
- }
- if hisVisEvent == nil {
- return events // apply no filtering as it defaults to Shared.
- }
- hisVis, _ := hisVisEvent.HistoryVisibility()
- if hisVis == "shared" || hisVis == "world_readable" {
- return events // apply no filtering
- }
- if membershipEvent == nil {
- wasJoined = false
- break
- }
- membership, err := membershipEvent.Membership()
- if err != nil {
- wasJoined = false
- break
- }
- if membership != "join" {
- wasJoined = false
- break
- }
- }
- if !wasJoined {
- util.GetLogger(r.ctx).WithField("num_events", len(events)).Warnf("%s was not joined to room during these events, omitting them", r.device.UserID)
- return []*gomatrixserverlib.HeaderedEvent{}
- }
- return result
+ // Apply room history visibility filter
+ startTime := time.Now()
+ filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages")
+ logrus.WithFields(logrus.Fields{
+ "duration": time.Since(startTime),
+ "room_id": r.roomID,
+ }).Debug("applied history visibility (messages)")
+ return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err
}
func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 9751670b..43a75da9 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -161,6 +161,10 @@ type Database interface {
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
+ // SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
+ // returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
+ // string as the membership.
+ SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
}
type Presence interface {
diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
index 29008ade..d68ed8d5 100644
--- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go
@@ -17,7 +17,10 @@ package deltas
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
+
+ "github.com/matrix-org/gomatrixserverlib"
)
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
@@ -31,6 +34,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T
return nil
}
+// UpSetHistoryVisibility sets the history visibility for already stored events.
+// Requires current_room_state and output_room_events to be created.
+func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error {
+ // get the current room history visibilities
+ historyVisibilities, err := currentHistoryVisibilities(ctx, tx)
+ if err != nil {
+ return err
+ }
+
+ // update the history visibility
+ for roomID, hisVis := range historyVisibilities {
+ _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1
+ WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID)
+ if err != nil {
+ return fmt.Errorf("failed to update history visibility: %w", err)
+ }
+ }
+
+ return nil
+}
+
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
@@ -39,9 +63,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
+
return nil
}
+// currentHistoryVisibilities returns a map from roomID to current history visibility.
+// If the history visibility was changed after room creation, defaults to joined.
+func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) {
+ rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state
+ WHERE type = 'm.room.history_visibility' AND state_key = '';
+`)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query current room state: %w", err)
+ }
+ defer rows.Close() // nolint: errcheck
+ var eventBytes []byte
+ var roomID string
+ var event gomatrixserverlib.HeaderedEvent
+ var hisVis gomatrixserverlib.HistoryVisibility
+ historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
+ for rows.Next() {
+ if err = rows.Scan(&roomID, &eventBytes); err != nil {
+ return nil, fmt.Errorf("failed to scan row: %w", err)
+ }
+ if err = json.Unmarshal(eventBytes, &event); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal event: %w", err)
+ }
+ historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined
+ if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 {
+ historyVisibilities[roomID] = hisVis
+ }
+ }
+ return historyVisibilities, nil
+}
+
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 00223c57..939d6b3f 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -66,10 +66,14 @@ const selectMembershipCountSQL = "" +
const selectHeroesSQL = "" +
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"
+const selectMembershipBeforeSQL = "" +
+ "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
+
type membershipsStatements struct {
- upsertMembershipStmt *sql.Stmt
- selectMembershipCountStmt *sql.Stmt
- selectHeroesStmt *sql.Stmt
+ upsertMembershipStmt *sql.Stmt
+ selectMembershipCountStmt *sql.Stmt
+ selectHeroesStmt *sql.Stmt
+ selectMembershipForUserStmt *sql.Stmt
}
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@@ -82,6 +86,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectHeroesStmt, selectHeroesSQL},
+ {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
}.Prepare(db)
}
@@ -132,3 +137,20 @@ func (s *membershipsStatements) SelectHeroes(
}
return heroes, rows.Err()
}
+
+// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
+// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
+// string as the membership.
+func (s *membershipsStatements) SelectMembershipForUser(
+ ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64,
+) (membership string, topologyPos int, err error) {
+ stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt)
+ err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return "leave", 0, nil
+ }
+ return "", 0, err
+ }
+ return membership, topologyPos, nil
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 34ff6700..8f633640 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -191,10 +191,12 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
}
m := sqlutil.NewMigrator(db)
- m.AddMigrations(sqlutil.Migration{
- Version: "syncapi: add history visibility column (output_room_events)",
- Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
- })
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "syncapi: add history visibility column (output_room_events)",
+ Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
+ },
+ )
err = m.Up(context.Background())
if err != nil {
return nil, err
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index a044716c..979ff664 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
)
@@ -97,6 +98,20 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if err != nil {
return nil, err
}
+
+ // apply migrations which need multiple tables
+ m := sqlutil.NewMigrator(d.db)
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "syncapi: set history visibility for existing events",
+ Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
+ },
+ )
+ err = m.Up(base.Context())
+ if err != nil {
+ return nil, err
+ }
+
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 932bda16..a46e5525 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -231,7 +231,7 @@ func (d *Database) AddPeek(
return
}
-// DeletePeeks tracks the fact that a user has stopped peeking from the specified
+// DeletePeek tracks the fact that a user has stopped peeking from the specified
// device. If the peeks was successfully deleted this returns the stream ID it was
// stored at. Returns an error if there was a problem communicating with the database.
func (d *Database) DeletePeek(
@@ -372,6 +372,7 @@ func (d *Database) WriteEvent(
) (pduPosition types.StreamPosition, returnErr error) {
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
var err error
+ ev.Visibility = historyVisibility
pos, err := d.OutputEvents.InsertEvent(
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility,
)
@@ -563,7 +564,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
return err
}
-// Retrieve the backward topology position, i.e. the position of the
+// GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the
// oldest event in the room's topology.
func (d *Database) GetBackwardTopologyPos(
ctx context.Context,
@@ -674,7 +675,7 @@ func (d *Database) fetchMissingStateEvents(
return events, nil
}
-// getStateDeltas returns the state deltas between fromPos and toPos,
+// GetStateDeltas returns the state deltas between fromPos and toPos,
// exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
@@ -812,7 +813,7 @@ func (d *Database) GetStateDeltas(
return deltas, joinedRoomIDs, nil
}
-// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
+// GetStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
// requests with full_state=true.
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
@@ -1039,37 +1040,41 @@ func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID s
return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to)
}
-func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
- return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
+func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
+ return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
}
-func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
- return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
+func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
}
-func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
- return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
+func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
+ return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
-func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
- return s.Ignores.SelectIgnores(ctx, userID)
+func (d *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
+ return d.Ignores.SelectIgnores(ctx, userID)
}
-func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
- return s.Ignores.UpsertIgnores(ctx, userID, ignores)
+func (d *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
+ return d.Ignores.UpsertIgnores(ctx, userID, ignores)
}
-func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
- return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
+func (d *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
+ return d.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
}
-func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
- return s.Presence.GetPresenceForUser(ctx, nil, userID)
+func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
+ return d.Presence.GetPresenceForUser(ctx, nil, userID)
}
-func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
- return s.Presence.GetPresenceAfter(ctx, nil, after, filter)
+func (d *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
+ return d.Presence.GetPresenceAfter(ctx, nil, after, filter)
}
-func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
- return s.Presence.GetMaxPresenceID(ctx, nil)
+func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
+ return d.Presence.GetMaxPresenceID(ctx, nil)
+}
+
+func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
+ return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
}
diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
index 07917721..d23f0756 100644
--- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
+++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go
@@ -17,7 +17,10 @@ package deltas
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
+
+ "github.com/matrix-org/gomatrixserverlib"
)
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
@@ -37,6 +40,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T
return nil
}
+// UpSetHistoryVisibility sets the history visibility for already stored events.
+// Requires current_room_state and output_room_events to be created.
+func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error {
+ // get the current room history visibilities
+ historyVisibilities, err := currentHistoryVisibilities(ctx, tx)
+ if err != nil {
+ return err
+ }
+
+ // update the history visibility
+ for roomID, hisVis := range historyVisibilities {
+ _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1
+ WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID)
+ if err != nil {
+ return fmt.Errorf("failed to update history visibility: %w", err)
+ }
+ }
+
+ return nil
+}
+
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
// SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists.
// Required for unit tests, as otherwise a duplicate column error will show up.
@@ -51,9 +75,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
+
return nil
}
+// currentHistoryVisibilities returns a map from roomID to current history visibility.
+// If the history visibility was changed after room creation, defaults to joined.
+func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) {
+ rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state
+ WHERE type = 'm.room.history_visibility' AND state_key = '';
+`)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query current room state: %w", err)
+ }
+ defer rows.Close() // nolint: errcheck
+ var eventBytes []byte
+ var roomID string
+ var event gomatrixserverlib.HeaderedEvent
+ var hisVis gomatrixserverlib.HistoryVisibility
+ historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
+ for rows.Next() {
+ if err = rows.Scan(&roomID, &eventBytes); err != nil {
+ return nil, fmt.Errorf("failed to scan row: %w", err)
+ }
+ if err = json.Unmarshal(eventBytes, &event); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal event: %w", err)
+ }
+ historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined
+ if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 {
+ historyVisibilities[roomID] = hisVis
+ }
+ }
+ return historyVisibilities, nil
+}
+
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
// SQLite doesn't have "if exists", so check if the column exists.
_, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go
index e4daa99c..0c966fca 100644
--- a/syncapi/storage/sqlite3/memberships_table.go
+++ b/syncapi/storage/sqlite3/memberships_table.go
@@ -66,11 +66,15 @@ const selectMembershipCountSQL = "" +
const selectHeroesSQL = "" +
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5"
+const selectMembershipBeforeSQL = "" +
+ "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
+
type membershipsStatements struct {
db *sql.DB
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
//selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic
+ selectMembershipForUserStmt *sql.Stmt
}
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@@ -84,6 +88,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
return s, sqlutil.StatementList{
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
+ {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
// {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic
}.Prepare(db)
}
@@ -148,3 +153,20 @@ func (s *membershipsStatements) SelectHeroes(
}
return heroes, rows.Err()
}
+
+// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
+// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
+// string as the membership.
+func (s *membershipsStatements) SelectMembershipForUser(
+ ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64,
+) (membership string, topologyPos int, err error) {
+ stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt)
+ err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return "leave", 0, nil
+ }
+ return "", 0, err
+ }
+ return membership, topologyPos, nil
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index de389fa9..91fd35b5 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -139,10 +139,12 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
}
m := sqlutil.NewMigrator(db)
- m.AddMigrations(sqlutil.Migration{
- Version: "syncapi: add history visibility column (output_room_events)",
- Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
- })
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "syncapi: add history visibility column (output_room_events)",
+ Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
+ },
+ )
err = m.Up(context.Background())
if err != nil {
return nil, err
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 5c5eb0f5..a84e2bd1 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -16,12 +16,14 @@
package sqlite3
import (
+ "context"
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
+ "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
)
// SyncServerDatasource represents a sync server datasource which manages
@@ -41,13 +43,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
return nil, err
}
- if err = d.prepare(); err != nil {
+ if err = d.prepare(base.Context()); err != nil {
return nil, err
}
return &d, nil
}
-func (d *SyncServerDatasource) prepare() (err error) {
+func (d *SyncServerDatasource) prepare(ctx context.Context) (err error) {
if err = d.streamID.Prepare(d.db); err != nil {
return err
}
@@ -107,6 +109,19 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
+
+ // apply migrations which need multiple tables
+ m := sqlutil.NewMigrator(d.db)
+ m.AddMigrations(
+ sqlutil.Migration{
+ Version: "syncapi: set history visibility for existing events",
+ Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
+ },
+ )
+ err = m.Up(ctx)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index eda5ef3e..a62818e9 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -12,20 +12,22 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
)
var ctx = context.Background()
-func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
+func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) {
connStr, close := test.PrepareDBConnectionString(t, dbType)
- db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{
+ base, closeBase := testrig.CreateBaseDendrite(t, dbType)
+ db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})
if err != nil {
t.Fatalf("NewSyncServerDatasource returned %s", err)
}
- return db, close
+ return db, close, closeBase
}
func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
@@ -51,8 +53,9 @@ func TestWriteEvents(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
- db, close := MustCreateDatabase(t, dbType)
+ db, close, closeBase := MustCreateDatabase(t, dbType)
defer close()
+ defer closeBase()
MustWriteEvents(t, db, r.Events())
})
}
@@ -60,8 +63,9 @@ func TestWriteEvents(t *testing.T) {
// These tests assert basic functionality of RecentEvents for PDUs
func TestRecentEventsPDU(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close := MustCreateDatabase(t, dbType)
+ db, close, closeBase := MustCreateDatabase(t, dbType)
defer close()
+ defer closeBase()
alice := test.NewUser(t)
// dummy room to make sure SQL queries are filtering on room ID
MustWriteEvents(t, db, test.NewRoom(t, alice).Events())
@@ -163,8 +167,9 @@ func TestRecentEventsPDU(t *testing.T) {
// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token
func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close := MustCreateDatabase(t, dbType)
+ db, close, closeBase := MustCreateDatabase(t, dbType)
defer close()
+ defer closeBase()
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
for i := 0; i < 10; i++ {
@@ -404,8 +409,9 @@ func TestSendToDeviceBehaviour(t *testing.T) {
bob := test.NewUser(t)
deviceID := "one"
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, close := MustCreateDatabase(t, dbType)
+ db, close, closeBase := MustCreateDatabase(t, dbType)
defer close()
+ defer closeBase()
// At this point there should be no messages. We haven't sent anything
// yet.
_, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100)
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index d68351d4..468d26ac 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -185,6 +185,7 @@ type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
+ SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
}
type NotificationData interface {
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 1ad3adc4..136cbea5 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -10,10 +10,13 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/internal"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"go.uber.org/atomic"
@@ -123,7 +126,7 @@ func (p *PDUStreamProvider) CompleteSync(
defer reqWaitGroup.Done()
jr, jerr := p.getJoinResponseForCompleteSync(
- ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
+ ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
@@ -149,7 +152,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
- ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
+ ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@@ -281,12 +284,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
}
- if len(recentEvents) > 0 {
- updateLatestPosition(recentEvents[len(recentEvents)-1].EventID())
- }
- if len(delta.StateEvents) > 0 {
- updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
- }
if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers(
@@ -306,6 +303,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
+ // Applies the history visibility rules
+ events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
+ if err != nil {
+ logrus.WithError(err).Error("unable to apply history visibility filter")
+ }
+
+ if len(events) > 0 {
+ updateLatestPosition(events[len(events)-1].EventID())
+ }
+ if len(delta.StateEvents) > 0 {
+ updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
+ }
+
switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
@@ -313,14 +323,17 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
}
jr.Timeline.PrevBatch = &prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
+ // TODO: Apply history visibility on peeked rooms
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
@@ -330,12 +343,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
- // TODO: recentEvents may contain events that this user is not allowed to see because they are
- // no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
- lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = *lr
}
@@ -343,6 +356,41 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
+// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make
+// sure we always return the required events in the timeline.
+func applyHistoryVisibilityFilter(
+ ctx context.Context,
+ db storage.Database,
+ rsAPI roomserverAPI.SyncRoomserverAPI,
+ roomID, userID string,
+ limit int,
+ recentEvents []*gomatrixserverlib.HeaderedEvent,
+) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ // We need to make sure we always include the latest states events, if they are in the timeline.
+ // We grep at least limit * 2 events, to ensure we really get the needed events.
+ stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil)
+ if err != nil {
+ // Not a fatal error, we can continue without the stateEvents,
+ // they are only needed if there are state events in the timeline.
+ logrus.WithError(err).Warnf("failed to get current room state")
+ }
+ alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
+ for _, ev := range stateEvents {
+ alwaysIncludeIDs[ev.EventID()] = struct{}{}
+ }
+ startTime := time.Now()
+ events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
+ if err != nil {
+
+ return nil, err
+ }
+ logrus.WithFields(logrus.Fields{
+ "duration": time.Since(startTime),
+ "room_id": roomID,
+ }).Debug("applied history visibility (sync)")
+ return events, nil
+}
+
func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
// Work out how many members are in the room.
joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
@@ -390,6 +438,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
eventFilter *gomatrixserverlib.RoomEventFilter,
wantFullState bool,
device *userapi.Device,
+ isPeek bool,
) (jr *types.JoinResponse, err error) {
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
@@ -404,33 +453,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return
}
- // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
- // user shouldn't see, we check the recent events and remove any prior to the join event of the user
- // which is equiv to history_visibility: joined
- joinEventIndex := -1
- for i := len(recentStreamEvents) - 1; i >= 0; i-- {
- ev := recentStreamEvents[i]
- if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
- membership, _ := ev.Membership()
- if membership == "join" {
- joinEventIndex = i
- if i > 0 {
- // the create event happens before the first join, so we should cut it at that point instead
- if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
- joinEventIndex = i - 1
- break
- }
- }
- break
- }
- }
- }
- if joinEventIndex != -1 {
- // cut all events earlier than the join (but not the join itself)
- recentStreamEvents = recentStreamEvents[joinEventIndex:]
- limited = false // so clients know not to try to backpaginate
- }
-
// Work our way through the timeline events and pick out the event IDs
// of any state events that appear in the timeline. We'll specifically
// exclude them at the next step, so that we don't get duplicate state
@@ -474,6 +496,19 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
+ events := recentEvents
+ // Only apply history visibility checks if the response is for joined rooms
+ if !isPeek {
+ events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents)
+ if err != nil {
+ logrus.WithError(err).Error("unable to apply history visibility filter")
+ }
+ }
+
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ limited = limited && len(events) == len(recentEvents)
+
if stateFilter.LazyLoadMembers {
if err != nil {
return nil, err
@@ -488,8 +523,10 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return jr, nil
}
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 931fef88..dc073a16 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -12,6 +12,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
@@ -54,6 +55,16 @@ func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsap
return nil
}
+func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error {
+ res.IsRoomForgotten = false
+ res.RoomExists = true
+ return nil
+}
+
+func (s *syncRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, req *rsapi.QueryMembershipAtEventRequest, res *rsapi.QueryMembershipAtEventResponse) error {
+ return nil
+}
+
type syncUserAPI struct {
userapi.SyncUserAPI
accounts []userapi.Device
@@ -107,7 +118,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
- msgs := toNATSMsgs(t, base, room.Events())
+ msgs := toNATSMsgs(t, base, room.Events()...)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{})
testrig.MustPublishMsgs(t, jsctx, msgs...)
@@ -200,7 +211,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
- msgs := toNATSMsgs(t, base, room.Events())
+ msgs := toNATSMsgs(t, base, room.Events()...)
sinceTokens := make([]string, len(msgs))
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{})
for i, msg := range msgs {
@@ -315,6 +326,174 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
}
+// This is mainly what Sytest is doing in "test_history_visibility"
+func TestMessageHistoryVisibility(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ testHistoryVisibility(t, dbType)
+ })
+}
+
+func testHistoryVisibility(t *testing.T, dbType test.DBType) {
+ type result struct {
+ seeWithoutJoin bool
+ seeBeforeJoin bool
+ seeAfterInvite bool
+ }
+
+ // create the users
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+
+ bobDev := userapi.Device{
+ ID: "BOBID",
+ UserID: bob.ID,
+ AccessToken: "BOD_BEARER_TOKEN",
+ DisplayName: "BOB",
+ }
+
+ ctx := context.Background()
+ // check guest and normal user accounts
+ for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} {
+ testCases := []struct {
+ historyVisibility gomatrixserverlib.HistoryVisibility
+ wantResult result
+ }{
+ {
+ historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable,
+ wantResult: result{
+ seeWithoutJoin: true,
+ seeBeforeJoin: true,
+ seeAfterInvite: true,
+ },
+ },
+ {
+ historyVisibility: gomatrixserverlib.HistoryVisibilityShared,
+ wantResult: result{
+ seeWithoutJoin: false,
+ seeBeforeJoin: true,
+ seeAfterInvite: true,
+ },
+ },
+ {
+ historyVisibility: gomatrixserverlib.HistoryVisibilityInvited,
+ wantResult: result{
+ seeWithoutJoin: false,
+ seeBeforeJoin: false,
+ seeAfterInvite: true,
+ },
+ },
+ {
+ historyVisibility: gomatrixserverlib.HistoryVisibilityJoined,
+ wantResult: result{
+ seeWithoutJoin: false,
+ seeBeforeJoin: false,
+ seeAfterInvite: false,
+ },
+ },
+ }
+
+ bobDev.AccountType = accType
+ userType := "guest"
+ if accType == userapi.AccountTypeUser {
+ userType = "real user"
+ }
+
+ base, close := testrig.CreateBaseDendrite(t, dbType)
+ defer close()
+
+ jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+
+ // Use the actual internal roomserver API
+ rsAPI := roomserver.NewInternalAPI(base)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{})
+
+ for _, tc := range testCases {
+ testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
+ t.Run(testname, func(t *testing.T) {
+ // create a room with the given visibility
+ room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
+
+ // send the events/messages to NATS to create the rooms
+ beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)})
+ eventsToSend := append(room.Events(), beforeJoinEv)
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ // There is only one event, we expect only to be able to see this, if the room is world_readable
+ w := httptest.NewRecorder()
+ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ "access_token": bobDev.AccessToken,
+ "dir": "b",
+ })))
+ if w.Code != 200 {
+ t.Logf("%s", w.Body.String())
+ t.Fatalf("got HTTP %d want %d", w.Code, 200)
+ }
+ // We only care about the returned events at this point
+ var res struct {
+ Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
+ }
+ if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
+ t.Errorf("failed to decode response body: %s", err)
+ }
+
+ verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk)
+
+ // Create invite, a message, join the room and create another message.
+ inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
+ afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
+ joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
+ msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)})
+
+ eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
+
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ // Verify the messages after/before invite are visible or not
+ w = httptest.NewRecorder()
+ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
+ "access_token": bobDev.AccessToken,
+ "dir": "b",
+ })))
+ if w.Code != 200 {
+ t.Logf("%s", w.Body.String())
+ t.Fatalf("got HTTP %d want %d", w.Code, 200)
+ }
+ if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
+ t.Errorf("failed to decode response body: %s", err)
+ }
+ // verify results
+ verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk)
+ verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk)
+ })
+ }
+ }
+}
+
+func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []gomatrixserverlib.ClientEvent) {
+ t.Helper()
+ if wantVisible {
+ for _, ev := range chunk {
+ if ev.EventID == wantVisibleEvent.EventID() {
+ return
+ }
+ }
+ t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk)
+ } else {
+ for _, ev := range chunk {
+ if ev.EventID == wantVisibleEvent.EventID() {
+ t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content))
+ }
+ }
+ }
+}
+
func TestSendToDevice(t *testing.T) {
test.WithAllDatabases(t, testSendToDevice)
}
@@ -448,7 +627,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
-func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
+func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {
var addsStateIDs []string
@@ -460,6 +639,7 @@ func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverli
NewRoomEvent: &rsapi.OutputNewRoomEvent{
Event: ev,
AddsStateEventIDs: addsStateIDs,
+ HistoryVisibility: ev.Visibility,
},
})
}