diff options
author | Matthew Hodgson <matthew@matrix.org> | 2020-09-10 14:39:18 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-10 14:39:18 +0100 |
commit | 39507bacc3dbfc532e0d69b42957c87f27af4c77 (patch) | |
tree | 7ad845e1b25e03e7b7d7cd2d49278fe843c2ff86 /syncapi/sync | |
parent | 35564dd73c48b16b97cd1a972a9b9bc65ec6d7ef (diff) |
Peeking via MSC2753 (#1370)
Initial implementation of MSC2753, as tested by https://github.com/matrix-org/sytest/pull/944.
Doesn't yet handle unpeeks, peeked EDUs, or history viz changing during a peek - these will follow.
https://github.com/matrix-org/dendrite/pull/1370 has full details.
Diffstat (limited to 'syncapi/sync')
-rw-r--r-- | syncapi/sync/notifier.go | 114 |
1 files changed, 103 insertions, 11 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index df23a2f4..fcac3f16 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -33,6 +33,8 @@ import ( type Notifier struct { // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine roomIDToJoinedUsers map[string]userIDSet + // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine + roomIDToPeekingDevices map[string]peekingDeviceSet // Protects currPos and userStreams. streamLock *sync.Mutex // The latest sync position @@ -48,11 +50,12 @@ type Notifier struct { // the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). func NewNotifier(pos types.StreamingToken) *Notifier { return &Notifier{ - currPos: pos, - roomIDToJoinedUsers: make(map[string]userIDSet), - userDeviceStreams: make(map[string]map[string]*UserDeviceStream), - streamLock: &sync.Mutex{}, - lastCleanUpTime: time.Now(), + currPos: pos, + roomIDToJoinedUsers: make(map[string]userIDSet), + roomIDToPeekingDevices: make(map[string]peekingDeviceSet), + userDeviceStreams: make(map[string]map[string]*UserDeviceStream), + streamLock: &sync.Mutex{}, + lastCleanUpTime: time.Now(), } } @@ -82,6 +85,8 @@ func (n *Notifier) OnNewEvent( if ev != nil { // Map this event's room_id to a list of joined users, and wake them up. usersToNotify := n.joinedUsers(ev.RoomID()) + // Map this event's room_id to a list of peeking devices, and wake them up. + peekingDevicesToNotify := n.PeekingDevices(ev.RoomID()) // If this is an invite, also add in the invitee to this list. if ev.Type() == "m.room.member" && ev.StateKey() != nil { targetUserID := *ev.StateKey() @@ -108,11 +113,11 @@ func (n *Notifier) OnNewEvent( } } - n.wakeupUsers(usersToNotify, latestPos) + n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos) } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), latestPos) + n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos) } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, latestPos) + n.wakeupUsers(userIDs, nil, latestPos) } else { log.WithFields(log.Fields{ "posUpdate": posUpdate.String, @@ -120,6 +125,18 @@ func (n *Notifier) OnNewEvent( } } +func (n *Notifier) OnNewPeek( + roomID, userID, deviceID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.addPeekingDevice(roomID, userID, deviceID) + + // we don't wake up devices here given the roomserver consumer will do this shortly afterwards + // by calling OnNewEvent. +} + func (n *Notifier) OnNewSendToDevice( userID string, deviceIDs []string, posUpdate types.StreamingToken, @@ -139,7 +156,7 @@ func (n *Notifier) OnNewKeyChange( defer n.streamLock.Unlock() latestPos := n.currPos.WithUpdates(posUpdate) n.currPos = latestPos - n.wakeupUsers([]string{wakeUserID}, latestPos) + n.wakeupUsers([]string{wakeUserID}, nil, latestPos) } // GetListener returns a UserStreamListener that can be used to wait for @@ -169,6 +186,13 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error { return err } n.setUsersJoinedToRooms(roomToUsers) + + roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx) + if err != nil { + return err + } + n.setPeekingDevices(roomToPeekingDevices) + return nil } @@ -195,9 +219,24 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { } } +// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from +// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to +// OnNewEvent (eg on startup) to prevent racing. +func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) { + // This is just the bulk form of addPeekingDevice + for roomID, peekingDevices := range roomIDToPeekingDevices { + if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { + n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) + } + for _, peekingDevice := range peekingDevices { + n.roomIDToPeekingDevices[roomID].add(peekingDevice) + } + } +} + // wakeupUsers will wake up the sync strems for all of the devices for all of the -// specified user IDs. -func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) { +// specified user IDs, and also the specified peekingDevices +func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { for _, userID := range userIDs { for _, stream := range n.fetchUserStreams(userID) { if stream == nil { @@ -206,6 +245,13 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) { stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream } } + + for _, peekingDevice := range peekingDevices { + // TODO: don't bother waking up for devices whose users we already woke up + if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil { + stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream + } + } } // wakeupUserDevice will wake up the sync stream for a specific user device. Other @@ -284,6 +330,32 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { return n.roomIDToJoinedUsers[roomID].values() } +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { + if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { + n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) + } + n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +// nolint:unused +func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { + if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { + n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) + } + // XXX: is this going to work as a key? + n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { + if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { + return + } + return n.roomIDToPeekingDevices[roomID].values() +} + // removeEmptyUserStreams iterates through the user stream map and removes any // that have been empty for a certain amount of time. This is a crude way of // ensuring that the userStreams map doesn't grow forver. @@ -329,3 +401,23 @@ func (s userIDSet) values() (vals []string) { } return } + +// A set of PeekingDevices, similar to userIDSet + +type peekingDeviceSet map[types.PeekingDevice]bool + +func (s peekingDeviceSet) add(d types.PeekingDevice) { + s[d] = true +} + +// nolint:unused +func (s peekingDeviceSet) remove(d types.PeekingDevice) { + delete(s, d) +} + +func (s peekingDeviceSet) values() (vals []types.PeekingDevice) { + for d := range s { + vals = append(vals, d) + } + return +} |