aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-04-19 10:46:45 +0200
committerGitHub <noreply@github.com>2022-04-19 09:46:45 +0100
commit57e3622b85fd4d80d9826404135f09e91ed47973 (patch)
treeecedc835dfd4b6eb21ff3b028df8f43cfb65a01f /syncapi
parent3ddbffd59ece5f74d951d6209882d9d954db4bc3 (diff)
Implement lazy loading on `/sync` (#2346)
* Initial work on lazyloading * Partially implement lazy loading on /sync * Rename methods * Make missing tests pass * Preallocate slice, even if it will end up with fewer values * Let the cache handle the user mapping * Linter * Cap cache growth
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/streams/stream_pdu.go100
-rw-r--r--syncapi/streams/streams.go4
-rw-r--r--syncapi/syncapi.go6
3 files changed, 104 insertions, 6 deletions
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index bcaf6ca3..ddc2f55c 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -6,6 +6,7 @@ import (
"sync"
"time"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -26,7 +27,8 @@ type PDUStreamProvider struct {
tasks chan func()
workers atomic.Int32
- userAPI userapi.UserInternalAPI
+ // userID+deviceID -> lazy loading cache
+ lazyLoadCache *caching.LazyLoadCache
}
func (p *PDUStreamProvider) worker() {
@@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = from
for _, delta := range stateDeltas {
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
@@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
r types.Range,
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
+ stateFilter *gomatrixserverlib.StateFilter,
res *types.Response,
) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
@@ -247,7 +250,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// room that were returned.
latestPosition := r.To
updateLatestPosition := func(mostRecentEventID string) {
- if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
+ var pos types.StreamPosition
+ if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
switch {
case r.Backwards && pos > latestPosition:
fallthrough
@@ -263,6 +267,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
+ if stateFilter.LazyLoadMembers {
+ if err != nil {
+ return r.From, err
+ }
+ delta.StateEvents, err = p.lazyLoadMembers(
+ ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
+ device, recentEvents, delta.StateEvents,
+ )
+ if err != nil {
+ return r.From, err
+ }
+ }
+
hasMembershipChange := false
for _, recentEvent := range recentStreamEvents {
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
@@ -402,6 +419,20 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
+
+ if stateFilter.LazyLoadMembers {
+ if err != nil {
+ return nil, err
+ }
+ stateEvents, err = p.lazyLoadMembers(ctx, roomID,
+ false, limited, stateFilter.IncludeRedundantMembers,
+ device, recentEvents, stateEvents,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+
jr = types.NewJoinResponse()
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
@@ -412,6 +443,69 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return jr, nil
}
+func (p *PDUStreamProvider) lazyLoadMembers(
+ ctx context.Context, roomID string,
+ incremental, limited, includeRedundant bool,
+ device *userapi.Device,
+ timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
+) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ if len(timelineEvents) == 0 {
+ return stateEvents, nil
+ }
+ // Work out which memberships to include
+ timelineUsers := make(map[string]struct{})
+ if !incremental {
+ timelineUsers[device.UserID] = struct{}{}
+ }
+ // Add all users the client doesn't know about yet to a list
+ for _, event := range timelineEvents {
+ // Membership is not yet cached, add it to the list
+ if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok {
+ timelineUsers[event.Sender()] = struct{}{}
+ }
+ }
+ // Preallocate with the same amount, even if it will end up with fewer values
+ newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
+ // Remove existing membership events we don't care about, e.g. users not in the timeline.events
+ for _, event := range stateEvents {
+ if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
+ // If this is a gapped incremental sync, we still want this membership
+ isGappedIncremental := limited && incremental
+ // We want this users membership event, keep it in the list
+ _, ok := timelineUsers[event.Sender()]
+ wantMembership := ok || isGappedIncremental
+ if wantMembership {
+ newStateEvents = append(newStateEvents, event)
+ if !includeRedundant {
+ p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID())
+ }
+ delete(timelineUsers, event.Sender())
+ }
+ } else {
+ newStateEvents = append(newStateEvents, event)
+ }
+ }
+ wantUsers := make([]string, 0, len(timelineUsers))
+ for userID := range timelineUsers {
+ wantUsers = append(wantUsers, userID)
+ }
+ // Query missing membership events
+ memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{
+ Limit: 100,
+ Senders: &wantUsers,
+ Types: &[]string{gomatrixserverlib.MRoomMember},
+ })
+ if err != nil {
+ return stateEvents, err
+ }
+ // cache the membership events
+ for _, membership := range memberships {
+ p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID())
+ }
+ stateEvents = append(newStateEvents, memberships...)
+ return stateEvents, nil
+}
+
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
// the syncreq itself for further use in streams.
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index c7d06a29..d3195b78 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -27,12 +27,12 @@ type Streams struct {
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
- eduCache *caching.EDUCache, notifier *notifier.Notifier,
+ eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
- userAPI: userAPI,
+ lazyLoadCache: lazyLoadCache,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 384121a8..2f9165d9 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -57,8 +57,12 @@ func AddPublicRoutes(
}
eduCache := caching.NewTypingCache()
+ lazyLoadCache, err := caching.NewLazyLoadCache()
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to create lazy loading cache")
+ }
notifier := notifier.NewNotifier()
- streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
+ streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")