aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2020-09-10 14:39:18 +0100
committerGitHub <noreply@github.com>2020-09-10 14:39:18 +0100
commit39507bacc3dbfc532e0d69b42957c87f27af4c77 (patch)
tree7ad845e1b25e03e7b7d7cd2d49278fe843c2ff86 /syncapi/sync
parent35564dd73c48b16b97cd1a972a9b9bc65ec6d7ef (diff)
Peeking via MSC2753 (#1370)
Initial implementation of MSC2753, as tested by https://github.com/matrix-org/sytest/pull/944. Doesn't yet handle unpeeks, peeked EDUs, or history viz changing during a peek - these will follow. https://github.com/matrix-org/dendrite/pull/1370 has full details.
Diffstat (limited to 'syncapi/sync')
-rw-r--r--syncapi/sync/notifier.go114
1 files changed, 103 insertions, 11 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index df23a2f4..fcac3f16 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -33,6 +33,8 @@ import (
type Notifier struct {
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
roomIDToJoinedUsers map[string]userIDSet
+ // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
+ roomIDToPeekingDevices map[string]peekingDeviceSet
// Protects currPos and userStreams.
streamLock *sync.Mutex
// The latest sync position
@@ -48,11 +50,12 @@ type Notifier struct {
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.StreamingToken) *Notifier {
return &Notifier{
- currPos: pos,
- roomIDToJoinedUsers: make(map[string]userIDSet),
- userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
- streamLock: &sync.Mutex{},
- lastCleanUpTime: time.Now(),
+ currPos: pos,
+ roomIDToJoinedUsers: make(map[string]userIDSet),
+ roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
+ userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
+ streamLock: &sync.Mutex{},
+ lastCleanUpTime: time.Now(),
}
}
@@ -82,6 +85,8 @@ func (n *Notifier) OnNewEvent(
if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up.
usersToNotify := n.joinedUsers(ev.RoomID())
+ // Map this event's room_id to a list of peeking devices, and wake them up.
+ peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
targetUserID := *ev.StateKey()
@@ -108,11 +113,11 @@ func (n *Notifier) OnNewEvent(
}
}
- n.wakeupUsers(usersToNotify, latestPos)
+ n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos)
} else if roomID != "" {
- n.wakeupUsers(n.joinedUsers(roomID), latestPos)
+ n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos)
} else if len(userIDs) > 0 {
- n.wakeupUsers(userIDs, latestPos)
+ n.wakeupUsers(userIDs, nil, latestPos)
} else {
log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
@@ -120,6 +125,18 @@ func (n *Notifier) OnNewEvent(
}
}
+func (n *Notifier) OnNewPeek(
+ roomID, userID, deviceID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.addPeekingDevice(roomID, userID, deviceID)
+
+ // we don't wake up devices here given the roomserver consumer will do this shortly afterwards
+ // by calling OnNewEvent.
+}
+
func (n *Notifier) OnNewSendToDevice(
userID string, deviceIDs []string,
posUpdate types.StreamingToken,
@@ -139,7 +156,7 @@ func (n *Notifier) OnNewKeyChange(
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
- n.wakeupUsers([]string{wakeUserID}, latestPos)
+ n.wakeupUsers([]string{wakeUserID}, nil, latestPos)
}
// GetListener returns a UserStreamListener that can be used to wait for
@@ -169,6 +186,13 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
return err
}
n.setUsersJoinedToRooms(roomToUsers)
+
+ roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx)
+ if err != nil {
+ return err
+ }
+ n.setPeekingDevices(roomToPeekingDevices)
+
return nil
}
@@ -195,9 +219,24 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
}
}
+// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from
+// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to
+// OnNewEvent (eg on startup) to prevent racing.
+func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) {
+ // This is just the bulk form of addPeekingDevice
+ for roomID, peekingDevices := range roomIDToPeekingDevices {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
+ }
+ for _, peekingDevice := range peekingDevices {
+ n.roomIDToPeekingDevices[roomID].add(peekingDevice)
+ }
+ }
+}
+
// wakeupUsers will wake up the sync strems for all of the devices for all of the
-// specified user IDs.
-func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) {
+// specified user IDs, and also the specified peekingDevices
+func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
for _, userID := range userIDs {
for _, stream := range n.fetchUserStreams(userID) {
if stream == nil {
@@ -206,6 +245,13 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) {
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
}
+
+ for _, peekingDevice := range peekingDevices {
+ // TODO: don't bother waking up for devices whose users we already woke up
+ if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
+ stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
+ }
+ }
}
// wakeupUserDevice will wake up the sync stream for a specific user device. Other
@@ -284,6 +330,32 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
return n.roomIDToJoinedUsers[roomID].values()
}
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
+ }
+ n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+// nolint:unused
+func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
+ }
+ // XXX: is this going to work as a key?
+ n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ return
+ }
+ return n.roomIDToPeekingDevices[roomID].values()
+}
+
// removeEmptyUserStreams iterates through the user stream map and removes any
// that have been empty for a certain amount of time. This is a crude way of
// ensuring that the userStreams map doesn't grow forver.
@@ -329,3 +401,23 @@ func (s userIDSet) values() (vals []string) {
}
return
}
+
+// A set of PeekingDevices, similar to userIDSet
+
+type peekingDeviceSet map[types.PeekingDevice]bool
+
+func (s peekingDeviceSet) add(d types.PeekingDevice) {
+ s[d] = true
+}
+
+// nolint:unused
+func (s peekingDeviceSet) remove(d types.PeekingDevice) {
+ delete(s, d)
+}
+
+func (s peekingDeviceSet) values() (vals []types.PeekingDevice) {
+ for d := range s {
+ vals = append(vals, d)
+ }
+ return
+}