aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_pdu.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/streams/stream_pdu.go')
-rw-r--r--syncapi/streams/stream_pdu.go57
1 files changed, 22 insertions, 35 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 2818aad8..fa4c722c 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -209,11 +209,27 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = from
for _, delta := range stateDeltas {
+ newRange := r
+ // If this room was joined in this sync, try to fetch
+ // as much timeline events as allowed by the filter.
+ if delta.NewlyJoined {
+ // Reverse the range, so we get the most recent first.
+ // This will be limited by the eventFilter.
+ newRange = types.Range{
+ From: r.To,
+ To: 0,
+ Backwards: true,
+ }
+ }
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
+ // Reset the position, as it is only for the special case of newly joined rooms
+ if delta.NewlyJoined {
+ pos = newRange.From
+ }
switch {
case r.Backwards && pos < newPos:
fallthrough
@@ -222,37 +238,6 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
- // If we joined a new room in this sync, make sure we add enough information about it.
- // This does an "initial sync" for the newly joined rooms
- newlyJoinedRooms := joinedRooms(req.Response, req.Device.UserID)
- if len(newlyJoinedRooms) > 0 {
- // remove already added rooms, as we're doing an "initial sync"
- for _, x := range newlyJoinedRooms {
- delete(req.Response.Rooms.Join, x)
- }
- r = types.Range{
- From: to,
- To: 0,
- Backwards: true,
- }
- // We only care about the newly joined rooms, so update the stateFilter to reflect that
- stateFilter.Rooms = &newlyJoinedRooms
- if stateDeltas, _, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
- req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
- return newPos
- }
- for _, delta := range stateDeltas {
- // Ignore deltas for rooms we didn't newly join
- if _, ok := req.Response.Rooms.Join[delta.RoomID]; ok {
- continue
- }
- if _, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
- req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
- return newPos
- }
- }
- }
-
return newPos
}
@@ -340,12 +325,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}
- if len(events) > 0 {
- updateLatestPosition(events[len(events)-1].EventID())
- }
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
+ if len(events) > 0 {
+ updateLatestPosition(events[len(events)-1].EventID())
+ }
switch delta.Membership {
case gomatrixserverlib.Join:
@@ -418,6 +403,8 @@ func applyHistoryVisibilityFilter(
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": roomID,
+ "before": len(recentEvents),
+ "after": len(events),
}).Debug("applied history visibility (sync)")
return events, nil
}