diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-04-19 10:46:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-19 09:46:45 +0100 |
commit | 57e3622b85fd4d80d9826404135f09e91ed47973 (patch) | |
tree | ecedc835dfd4b6eb21ff3b028df8f43cfb65a01f /syncapi/streams/stream_pdu.go | |
parent | 3ddbffd59ece5f74d951d6209882d9d954db4bc3 (diff) |
Implement lazy loading on `/sync` (#2346)
* Initial work on lazyloading
* Partially implement lazy loading on /sync
* Rename methods
* Make missing tests pass
* Preallocate slice, even if it will end up with fewer values
* Let the cache handle the user mapping
* Linter
* Cap cache growth
Diffstat (limited to 'syncapi/streams/stream_pdu.go')
-rw-r--r-- | syncapi/streams/stream_pdu.go | 100 |
1 files changed, 97 insertions, 3 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index bcaf6ca3..ddc2f55c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -26,7 +27,8 @@ type PDUStreamProvider struct { tasks chan func() workers atomic.Int32 - userAPI userapi.UserInternalAPI + // userID+deviceID -> lazy loading cache + lazyLoadCache *caching.LazyLoadCache } func (p *PDUStreamProvider) worker() { @@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = from for _, delta := range stateDeltas { var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } @@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( r types.Range, delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, + stateFilter *gomatrixserverlib.StateFilter, res *types.Response, ) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { @@ -247,7 +250,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // room that were returned. latestPosition := r.To updateLatestPosition := func(mostRecentEventID string) { - if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + var pos types.StreamPosition + if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { switch { case r.Backwards && pos > latestPosition: fallthrough @@ -263,6 +267,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } + if stateFilter.LazyLoadMembers { + if err != nil { + return r.From, err + } + delta.StateEvents, err = p.lazyLoadMembers( + ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers, + device, recentEvents, delta.StateEvents, + ) + if err != nil { + return r.From, err + } + } + hasMembershipChange := false for _, recentEvent := range recentStreamEvents { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { @@ -402,6 +419,20 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + + if stateFilter.LazyLoadMembers { + if err != nil { + return nil, err + } + stateEvents, err = p.lazyLoadMembers(ctx, roomID, + false, limited, stateFilter.IncludeRedundantMembers, + device, recentEvents, stateEvents, + ) + if err != nil { + return nil, err + } + } + jr = types.NewJoinResponse() jr.Summary.JoinedMemberCount = &joinedCount jr.Summary.InvitedMemberCount = &invitedCount @@ -412,6 +443,69 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return jr, nil } +func (p *PDUStreamProvider) lazyLoadMembers( + ctx context.Context, roomID string, + incremental, limited, includeRedundant bool, + device *userapi.Device, + timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + if len(timelineEvents) == 0 { + return stateEvents, nil + } + // Work out which memberships to include + timelineUsers := make(map[string]struct{}) + if !incremental { + timelineUsers[device.UserID] = struct{}{} + } + // Add all users the client doesn't know about yet to a list + for _, event := range timelineEvents { + // Membership is not yet cached, add it to the list + if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok { + timelineUsers[event.Sender()] = struct{}{} + } + } + // Preallocate with the same amount, even if it will end up with fewer values + newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents)) + // Remove existing membership events we don't care about, e.g. users not in the timeline.events + for _, event := range stateEvents { + if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { + // If this is a gapped incremental sync, we still want this membership + isGappedIncremental := limited && incremental + // We want this users membership event, keep it in the list + _, ok := timelineUsers[event.Sender()] + wantMembership := ok || isGappedIncremental + if wantMembership { + newStateEvents = append(newStateEvents, event) + if !includeRedundant { + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID()) + } + delete(timelineUsers, event.Sender()) + } + } else { + newStateEvents = append(newStateEvents, event) + } + } + wantUsers := make([]string, 0, len(timelineUsers)) + for userID := range timelineUsers { + wantUsers = append(wantUsers, userID) + } + // Query missing membership events + memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ + Limit: 100, + Senders: &wantUsers, + Types: &[]string{gomatrixserverlib.MRoomMember}, + }) + if err != nil { + return stateEvents, err + } + // cache the membership events + for _, membership := range memberships { + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID()) + } + stateEvents = append(newStateEvents, memberships...) + return stateEvents, nil +} + // addIgnoredUsersToFilter adds ignored users to the eventfilter and // the syncreq itself for further use in streams. func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { |