aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_pdu.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/streams/stream_pdu.go')
-rw-r--r--syncapi/streams/stream_pdu.go123
1 files changed, 80 insertions, 43 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 1ad3adc4..136cbea5 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -10,10 +10,13 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/internal"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"go.uber.org/atomic"
@@ -123,7 +126,7 @@ func (p *PDUStreamProvider) CompleteSync(
defer reqWaitGroup.Done()
jr, jerr := p.getJoinResponseForCompleteSync(
- ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
+ ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
@@ -149,7 +152,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
- ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
+ ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@@ -281,12 +284,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
}
- if len(recentEvents) > 0 {
- updateLatestPosition(recentEvents[len(recentEvents)-1].EventID())
- }
- if len(delta.StateEvents) > 0 {
- updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
- }
if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers(
@@ -306,6 +303,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
+ // Applies the history visibility rules
+ events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
+ if err != nil {
+ logrus.WithError(err).Error("unable to apply history visibility filter")
+ }
+
+ if len(events) > 0 {
+ updateLatestPosition(events[len(events)-1].EventID())
+ }
+ if len(delta.StateEvents) > 0 {
+ updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
+ }
+
switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
@@ -313,14 +323,17 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
}
jr.Timeline.PrevBatch = &prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
+ // TODO: Apply history visibility on peeked rooms
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
@@ -330,12 +343,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
- // TODO: recentEvents may contain events that this user is not allowed to see because they are
- // no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
- lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = *lr
}
@@ -343,6 +356,41 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
+// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make
+// sure we always return the required events in the timeline.
+func applyHistoryVisibilityFilter(
+ ctx context.Context,
+ db storage.Database,
+ rsAPI roomserverAPI.SyncRoomserverAPI,
+ roomID, userID string,
+ limit int,
+ recentEvents []*gomatrixserverlib.HeaderedEvent,
+) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ // We need to make sure we always include the latest states events, if they are in the timeline.
+ // We grep at least limit * 2 events, to ensure we really get the needed events.
+ stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil)
+ if err != nil {
+ // Not a fatal error, we can continue without the stateEvents,
+ // they are only needed if there are state events in the timeline.
+ logrus.WithError(err).Warnf("failed to get current room state")
+ }
+ alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
+ for _, ev := range stateEvents {
+ alwaysIncludeIDs[ev.EventID()] = struct{}{}
+ }
+ startTime := time.Now()
+ events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
+ if err != nil {
+
+ return nil, err
+ }
+ logrus.WithFields(logrus.Fields{
+ "duration": time.Since(startTime),
+ "room_id": roomID,
+ }).Debug("applied history visibility (sync)")
+ return events, nil
+}
+
func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
// Work out how many members are in the room.
joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
@@ -390,6 +438,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
eventFilter *gomatrixserverlib.RoomEventFilter,
wantFullState bool,
device *userapi.Device,
+ isPeek bool,
) (jr *types.JoinResponse, err error) {
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
@@ -404,33 +453,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return
}
- // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
- // user shouldn't see, we check the recent events and remove any prior to the join event of the user
- // which is equiv to history_visibility: joined
- joinEventIndex := -1
- for i := len(recentStreamEvents) - 1; i >= 0; i-- {
- ev := recentStreamEvents[i]
- if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
- membership, _ := ev.Membership()
- if membership == "join" {
- joinEventIndex = i
- if i > 0 {
- // the create event happens before the first join, so we should cut it at that point instead
- if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
- joinEventIndex = i - 1
- break
- }
- }
- break
- }
- }
- }
- if joinEventIndex != -1 {
- // cut all events earlier than the join (but not the join itself)
- recentStreamEvents = recentStreamEvents[joinEventIndex:]
- limited = false // so clients know not to try to backpaginate
- }
-
// Work our way through the timeline events and pick out the event IDs
// of any state events that appear in the timeline. We'll specifically
// exclude them at the next step, so that we don't get duplicate state
@@ -474,6 +496,19 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
+ events := recentEvents
+ // Only apply history visibility checks if the response is for joined rooms
+ if !isPeek {
+ events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents)
+ if err != nil {
+ logrus.WithError(err).Error("unable to apply history visibility filter")
+ }
+ }
+
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ limited = limited && len(events) == len(recentEvents)
+
if stateFilter.LazyLoadMembers {
if err != nil {
return nil, err
@@ -488,8 +523,10 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
- jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = limited
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
+ // If we are limited by the filter AND the history visibility filter
+ // didn't "remove" events, return that the response is limited.
+ jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return jr, nil
}