aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/caching/cache_lazy_load_members.go86
-rw-r--r--syncapi/streams/stream_pdu.go100
-rw-r--r--syncapi/streams/streams.go4
-rw-r--r--syncapi/syncapi.go6
-rw-r--r--sytest-whitelist11
5 files changed, 200 insertions, 7 deletions
diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go
new file mode 100644
index 00000000..71a31762
--- /dev/null
+++ b/internal/caching/cache_lazy_load_members.go
@@ -0,0 +1,86 @@
+package caching
+
+import (
+ "fmt"
+ "time"
+
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+)
+
+const (
+ LazyLoadCacheName = "lazy_load_members"
+ LazyLoadCacheMaxEntries = 128
+ LazyLoadCacheMaxUserEntries = 128
+ LazyLoadCacheMutable = true
+ LazyLoadCacheMaxAge = time.Minute * 30
+)
+
+type LazyLoadCache struct {
+ // InMemoryLRUCachePartition containing other InMemoryLRUCachePartitions
+ // with the actual cached members
+ userCaches *InMemoryLRUCachePartition
+}
+
+// NewLazyLoadCache creates a new LazyLoadCache.
+func NewLazyLoadCache() (*LazyLoadCache, error) {
+ cache, err := NewInMemoryLRUCachePartition(
+ LazyLoadCacheName,
+ LazyLoadCacheMutable,
+ LazyLoadCacheMaxEntries,
+ LazyLoadCacheMaxAge,
+ true,
+ )
+ if err != nil {
+ return nil, err
+ }
+ go cacheCleaner(cache)
+ return &LazyLoadCache{
+ userCaches: cache,
+ }, nil
+}
+
+func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) {
+ cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID)
+ userCache, ok := c.userCaches.Get(cacheName)
+ if ok && userCache != nil {
+ if cache, ok := userCache.(*InMemoryLRUCachePartition); ok {
+ return cache, nil
+ }
+ }
+ cache, err := NewInMemoryLRUCachePartition(
+ LazyLoadCacheName,
+ LazyLoadCacheMutable,
+ LazyLoadCacheMaxUserEntries,
+ LazyLoadCacheMaxAge,
+ false,
+ )
+ if err != nil {
+ return nil, err
+ }
+ c.userCaches.Set(cacheName, cache)
+ go cacheCleaner(cache)
+ return cache, nil
+}
+
+func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) {
+ cache, err := c.lazyLoadCacheForUser(device)
+ if err != nil {
+ return
+ }
+ cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID)
+ cache.Set(cacheKey, eventID)
+}
+
+func (c *LazyLoadCache) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) {
+ cache, err := c.lazyLoadCacheForUser(device)
+ if err != nil {
+ return "", false
+ }
+
+ cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID)
+ val, ok := cache.Get(cacheKey)
+ if !ok {
+ return "", ok
+ }
+ return val.(string), ok
+}
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index bcaf6ca3..ddc2f55c 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -6,6 +6,7 @@ import (
"sync"
"time"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -26,7 +27,8 @@ type PDUStreamProvider struct {
tasks chan func()
workers atomic.Int32
- userAPI userapi.UserInternalAPI
+ // userID+deviceID -> lazy loading cache
+ lazyLoadCache *caching.LazyLoadCache
}
func (p *PDUStreamProvider) worker() {
@@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = from
for _, delta := range stateDeltas {
var pos types.StreamPosition
- if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
+ if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
@@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
r types.Range,
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
+ stateFilter *gomatrixserverlib.StateFilter,
res *types.Response,
) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
@@ -247,7 +250,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// room that were returned.
latestPosition := r.To
updateLatestPosition := func(mostRecentEventID string) {
- if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
+ var pos types.StreamPosition
+ if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
switch {
case r.Backwards && pos > latestPosition:
fallthrough
@@ -263,6 +267,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
+ if stateFilter.LazyLoadMembers {
+ if err != nil {
+ return r.From, err
+ }
+ delta.StateEvents, err = p.lazyLoadMembers(
+ ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
+ device, recentEvents, delta.StateEvents,
+ )
+ if err != nil {
+ return r.From, err
+ }
+ }
+
hasMembershipChange := false
for _, recentEvent := range recentStreamEvents {
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
@@ -402,6 +419,20 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
+
+ if stateFilter.LazyLoadMembers {
+ if err != nil {
+ return nil, err
+ }
+ stateEvents, err = p.lazyLoadMembers(ctx, roomID,
+ false, limited, stateFilter.IncludeRedundantMembers,
+ device, recentEvents, stateEvents,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+
jr = types.NewJoinResponse()
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
@@ -412,6 +443,69 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return jr, nil
}
+func (p *PDUStreamProvider) lazyLoadMembers(
+ ctx context.Context, roomID string,
+ incremental, limited, includeRedundant bool,
+ device *userapi.Device,
+ timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
+) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ if len(timelineEvents) == 0 {
+ return stateEvents, nil
+ }
+ // Work out which memberships to include
+ timelineUsers := make(map[string]struct{})
+ if !incremental {
+ timelineUsers[device.UserID] = struct{}{}
+ }
+ // Add all users the client doesn't know about yet to a list
+ for _, event := range timelineEvents {
+ // Membership is not yet cached, add it to the list
+ if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok {
+ timelineUsers[event.Sender()] = struct{}{}
+ }
+ }
+ // Preallocate with the same amount, even if it will end up with fewer values
+ newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
+ // Remove existing membership events we don't care about, e.g. users not in the timeline.events
+ for _, event := range stateEvents {
+ if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
+ // If this is a gapped incremental sync, we still want this membership
+ isGappedIncremental := limited && incremental
+ // We want this users membership event, keep it in the list
+ _, ok := timelineUsers[event.Sender()]
+ wantMembership := ok || isGappedIncremental
+ if wantMembership {
+ newStateEvents = append(newStateEvents, event)
+ if !includeRedundant {
+ p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID())
+ }
+ delete(timelineUsers, event.Sender())
+ }
+ } else {
+ newStateEvents = append(newStateEvents, event)
+ }
+ }
+ wantUsers := make([]string, 0, len(timelineUsers))
+ for userID := range timelineUsers {
+ wantUsers = append(wantUsers, userID)
+ }
+ // Query missing membership events
+ memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{
+ Limit: 100,
+ Senders: &wantUsers,
+ Types: &[]string{gomatrixserverlib.MRoomMember},
+ })
+ if err != nil {
+ return stateEvents, err
+ }
+ // cache the membership events
+ for _, membership := range memberships {
+ p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID())
+ }
+ stateEvents = append(newStateEvents, memberships...)
+ return stateEvents, nil
+}
+
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
// the syncreq itself for further use in streams.
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index c7d06a29..d3195b78 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -27,12 +27,12 @@ type Streams struct {
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
- eduCache *caching.EDUCache, notifier *notifier.Notifier,
+ eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
- userAPI: userAPI,
+ lazyLoadCache: lazyLoadCache,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 384121a8..2f9165d9 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -57,8 +57,12 @@ func AddPublicRoutes(
}
eduCache := caching.NewTypingCache()
+ lazyLoadCache, err := caching.NewLazyLoadCache()
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to create lazy loading cache")
+ }
notifier := notifier.NewNotifier()
- streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
+ streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
diff --git a/sytest-whitelist b/sytest-whitelist
index f63b96f5..c8dedd59 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -699,4 +699,13 @@ Ignore invite in full sync
Ignore invite in incremental sync
A filtered timeline reaches its limit
A change to displayname should not result in a full state sync
-Can fetch images in room \ No newline at end of file
+Can fetch images in room
+The only membership state included in an initial sync is for all the senders in the timeline
+The only membership state included in an incremental sync is for senders in the timeline
+Old members are included in gappy incr LL sync if they start speaking
+We do send redundant membership state across incremental syncs if asked
+Rejecting invite over federation doesn't break incremental /sync
+Gapped incremental syncs include all state changes
+Old leaves are present in gapped incremental syncs
+Leaves are present in non-gapped incremental syncs
+Members from the gap are included in gappy incr LL sync \ No newline at end of file