diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-04-19 10:46:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-19 09:46:45 +0100 |
commit | 57e3622b85fd4d80d9826404135f09e91ed47973 (patch) | |
tree | ecedc835dfd4b6eb21ff3b028df8f43cfb65a01f /syncapi | |
parent | 3ddbffd59ece5f74d951d6209882d9d954db4bc3 (diff) |
Implement lazy loading on `/sync` (#2346)
* Initial work on lazyloading
* Partially implement lazy loading on /sync
* Rename methods
* Make missing tests pass
* Preallocate slice, even if it will end up with fewer values
* Let the cache handle the user mapping
* Linter
* Cap cache growth
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/streams/stream_pdu.go | 100 | ||||
-rw-r--r-- | syncapi/streams/streams.go | 4 | ||||
-rw-r--r-- | syncapi/syncapi.go | 6 |
3 files changed, 104 insertions, 6 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index bcaf6ca3..ddc2f55c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -26,7 +27,8 @@ type PDUStreamProvider struct { tasks chan func() workers atomic.Int32 - userAPI userapi.UserInternalAPI + // userID+deviceID -> lazy loading cache + lazyLoadCache *caching.LazyLoadCache } func (p *PDUStreamProvider) worker() { @@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = from for _, delta := range stateDeltas { var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } @@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( r types.Range, delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, + stateFilter *gomatrixserverlib.StateFilter, res *types.Response, ) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { @@ -247,7 +250,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // room that were returned. latestPosition := r.To updateLatestPosition := func(mostRecentEventID string) { - if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + var pos types.StreamPosition + if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { switch { case r.Backwards && pos > latestPosition: fallthrough @@ -263,6 +267,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } + if stateFilter.LazyLoadMembers { + if err != nil { + return r.From, err + } + delta.StateEvents, err = p.lazyLoadMembers( + ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers, + device, recentEvents, delta.StateEvents, + ) + if err != nil { + return r.From, err + } + } + hasMembershipChange := false for _, recentEvent := range recentStreamEvents { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { @@ -402,6 +419,20 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + + if stateFilter.LazyLoadMembers { + if err != nil { + return nil, err + } + stateEvents, err = p.lazyLoadMembers(ctx, roomID, + false, limited, stateFilter.IncludeRedundantMembers, + device, recentEvents, stateEvents, + ) + if err != nil { + return nil, err + } + } + jr = types.NewJoinResponse() jr.Summary.JoinedMemberCount = &joinedCount jr.Summary.InvitedMemberCount = &invitedCount @@ -412,6 +443,69 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return jr, nil } +func (p *PDUStreamProvider) lazyLoadMembers( + ctx context.Context, roomID string, + incremental, limited, includeRedundant bool, + device *userapi.Device, + timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + if len(timelineEvents) == 0 { + return stateEvents, nil + } + // Work out which memberships to include + timelineUsers := make(map[string]struct{}) + if !incremental { + timelineUsers[device.UserID] = struct{}{} + } + // Add all users the client doesn't know about yet to a list + for _, event := range timelineEvents { + // Membership is not yet cached, add it to the list + if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok { + timelineUsers[event.Sender()] = struct{}{} + } + } + // Preallocate with the same amount, even if it will end up with fewer values + newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents)) + // Remove existing membership events we don't care about, e.g. users not in the timeline.events + for _, event := range stateEvents { + if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { + // If this is a gapped incremental sync, we still want this membership + isGappedIncremental := limited && incremental + // We want this users membership event, keep it in the list + _, ok := timelineUsers[event.Sender()] + wantMembership := ok || isGappedIncremental + if wantMembership { + newStateEvents = append(newStateEvents, event) + if !includeRedundant { + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID()) + } + delete(timelineUsers, event.Sender()) + } + } else { + newStateEvents = append(newStateEvents, event) + } + } + wantUsers := make([]string, 0, len(timelineUsers)) + for userID := range timelineUsers { + wantUsers = append(wantUsers, userID) + } + // Query missing membership events + memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ + Limit: 100, + Senders: &wantUsers, + Types: &[]string{gomatrixserverlib.MRoomMember}, + }) + if err != nil { + return stateEvents, err + } + // cache the membership events + for _, membership := range memberships { + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID()) + } + stateEvents = append(newStateEvents, memberships...) + return stateEvents, nil +} + // addIgnoredUsersToFilter adds ignored users to the eventfilter and // the syncreq itself for further use in streams. func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index c7d06a29..d3195b78 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -27,12 +27,12 @@ type Streams struct { func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, - eduCache *caching.EDUCache, notifier *notifier.Notifier, + eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, - userAPI: userAPI, + lazyLoadCache: lazyLoadCache, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 384121a8..2f9165d9 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -57,8 +57,12 @@ func AddPublicRoutes( } eduCache := caching.NewTypingCache() + lazyLoadCache, err := caching.NewLazyLoadCache() + if err != nil { + logrus.WithError(err).Panicf("failed to create lazy loading cache") + } notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") |