aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync/notifier.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/sync/notifier.go')
-rw-r--r--syncapi/sync/notifier.go48
1 files changed, 35 insertions, 13 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 1d8cd624..66460a8d 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -77,9 +77,8 @@ func (n *Notifier) OnNewEvent(
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
+ n.currPos.ApplyUpdates(posUpdate)
n.removeEmptyUserStreams()
if ev != nil {
@@ -113,11 +112,11 @@ func (n *Notifier) OnNewEvent(
}
}
- n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos)
+ n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
} else if roomID != "" {
- n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos)
+ n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
} else if len(userIDs) > 0 {
- n.wakeupUsers(userIDs, nil, latestPos)
+ n.wakeupUsers(userIDs, nil, n.currPos)
} else {
log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
@@ -155,20 +154,33 @@ func (n *Notifier) OnNewSendToDevice(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
- n.wakeupUserDevice(userID, deviceIDs, latestPos)
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUserDevice(userID, deviceIDs, n.currPos)
+}
+
+// OnNewReceipt updates the current position
+func (n *Notifier) OnNewTyping(
+ roomID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
// OnNewReceipt updates the current position
func (n *Notifier) OnNewReceipt(
+ roomID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
func (n *Notifier) OnNewKeyChange(
@@ -176,9 +188,19 @@ func (n *Notifier) OnNewKeyChange(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
- n.wakeupUsers([]string{wakeUserID}, nil, latestPos)
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
+}
+
+func (n *Notifier) OnNewInvite(
+ posUpdate types.StreamingToken, wakeUserID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
}
// GetListener returns a UserStreamListener that can be used to wait for