diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-10-19 14:05:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-19 14:05:39 +0200 |
commit | e79bfd8fd55781783482cb45ae6d4e78062bb8ac (patch) | |
tree | 9b892d69efc71dd2259d04763037c23922cc68f3 /syncapi/streams | |
parent | 8cbe14bd6d985ceb2f7c098548a3fbeedfce2d55 (diff) |
Get state deltas without filters (#2810)
This makes the following changes:
- get state deltas without the user supplied filter, so we can actually
"calculate" state transitions
- closes `stmt` when using SQLite
- Adds presence for users who newly joined a room, even if the syncing
user already knows about the presence status (should fix
https://github.com/matrix-org/complement/pull/516)
Diffstat (limited to 'syncapi/streams')
-rw-r--r-- | syncapi/streams/stream_pdu.go | 16 | ||||
-rw-r--r-- | syncapi/streams/stream_presence.go | 3 |
2 files changed, 11 insertions, 8 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 613ac434..9ec2b61c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -194,7 +194,7 @@ func (p *PDUStreamProvider) IncrementalSync( } } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { return newPos @@ -225,7 +225,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, stateFilter *gomatrixserverlib.StateFilter, - res *types.Response, + req *types.SyncRequest, ) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. @@ -290,8 +290,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( hasMembershipChange := false for _, recentEvent := range recentStreamEvents { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { + if membership, _ := recentEvent.Membership(); membership == gomatrixserverlib.Join { + req.MembershipChanges[*recentEvent.StateKey()] = struct{}{} + } hasMembershipChange = true - break } } @@ -318,9 +320,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) // If we are limited by the filter AND the history visibility filter // didn't "remove" events, return that the response is limited. - jr.Timeline.Limited = limited && len(events) == len(recentEvents) + jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.RoomID] = jr + req.Response.Rooms.Join[delta.RoomID] = jr case gomatrixserverlib.Peek: jr := types.NewJoinResponse() @@ -329,7 +331,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Peek[delta.RoomID] = jr + req.Response.Rooms.Peek[delta.RoomID] = jr case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban @@ -342,7 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // didn't "remove" events, return that the response is limited. lr.Timeline.Limited = limited && len(events) == len(recentEvents) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.RoomID] = lr + req.Response.Rooms.Leave[delta.RoomID] = lr } return latestPosition, nil diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 8b87af45..030b7c5d 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -121,7 +121,8 @@ func (p *PresenceStreamProvider) IncrementalSync( prevPresence := pres.(*types.PresenceInternal) currentlyActive := prevPresence.CurrentlyActive() skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID - if skip { + _, membershipChange := req.MembershipChanges[presence.UserID] + if skip && !membershipChange { req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID) continue } |