aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-10-19 14:05:39 +0200
committerGitHub <noreply@github.com>2022-10-19 14:05:39 +0200
commite79bfd8fd55781783482cb45ae6d4e78062bb8ac (patch)
tree9b892d69efc71dd2259d04763037c23922cc68f3 /syncapi/streams
parent8cbe14bd6d985ceb2f7c098548a3fbeedfce2d55 (diff)
Get state deltas without filters (#2810)
This makes the following changes: - get state deltas without the user supplied filter, so we can actually "calculate" state transitions - closes `stmt` when using SQLite - Adds presence for users who newly joined a room, even if the syncing user already knows about the presence status (should fix https://github.com/matrix-org/complement/pull/516)
Diffstat (limited to 'syncapi/streams')
-rw-r--r--syncapi/streams/stream_pdu.go16
-rw-r--r--syncapi/streams/stream_presence.go3
2 files changed, 11 insertions, 8 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 613ac434..9ec2b61c 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -194,7 +194,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
return newPos
@@ -225,7 +225,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
stateFilter *gomatrixserverlib.StateFilter,
- res *types.Response,
+ req *types.SyncRequest,
) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
@@ -290,8 +290,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
hasMembershipChange := false
for _, recentEvent := range recentStreamEvents {
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
+ if membership, _ := recentEvent.Membership(); membership == gomatrixserverlib.Join {
+ req.MembershipChanges[*recentEvent.StateKey()] = struct{}{}
+ }
hasMembershipChange = true
- break
}
}
@@ -318,9 +320,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
- jr.Timeline.Limited = limited && len(events) == len(recentEvents)
+ jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Join[delta.RoomID] = jr
+ req.Response.Rooms.Join[delta.RoomID] = jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
@@ -329,7 +331,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Peek[delta.RoomID] = jr
+ req.Response.Rooms.Peek[delta.RoomID] = jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
@@ -342,7 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
- res.Rooms.Leave[delta.RoomID] = lr
+ req.Response.Rooms.Leave[delta.RoomID] = lr
}
return latestPosition, nil
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 8b87af45..030b7c5d 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -121,7 +121,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
- if skip {
+ _, membershipChange := req.MembershipChanges[presence.UserID]
+ if skip && !membershipChange {
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
continue
}