aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-18 11:11:21 +0000
committerGitHub <noreply@github.com>2020-12-18 11:11:21 +0000
commit50963b724b12f4f8155ac00a677afe38d0bd69d8 (patch)
tree11a9ff7d4dca995b770e5cd3ad512c4ba3d1a98c /syncapi/sync
parenta518e2971ab034cf4b97f39423e835452ca97540 (diff)
More sane next batch handling, typing notification tweaks, give invites their own stream position, device list fix (#1641)
* Update sync responses * Fix positions, add ApplyUpdates * Fix MarshalText as non-pointer, PrevBatch is optional * Increment by number of read receipts * Merge branch 'master' into neilalexander/devicelist * Tweak typing * Include keyserver position tweak * Fix typing next position in all cases * Tweaks * Fix typo * Tweaks, restore StreamingToken.MarshalText which somehow went missing? * Rely on positions from notifier rather than manually advancing them * Revert "Rely on positions from notifier rather than manually advancing them" This reverts commit 53112a62cc3bfd9989acab518e69eeb27938117a. * Give invites their own position, fix other things * Fix test * Fix invites maybe * Un-whitelist tests that look to be genuinely wrong * Use real receipt positions * Ensure send-to-device uses real positions too
Diffstat (limited to 'syncapi/sync')
-rw-r--r--syncapi/sync/notifier.go48
-rw-r--r--syncapi/sync/notifier_test.go4
-rw-r--r--syncapi/sync/request.go12
-rw-r--r--syncapi/sync/requestpool.go21
4 files changed, 48 insertions, 37 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 1d8cd624..66460a8d 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -77,9 +77,8 @@ func (n *Notifier) OnNewEvent(
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
+ n.currPos.ApplyUpdates(posUpdate)
n.removeEmptyUserStreams()
if ev != nil {
@@ -113,11 +112,11 @@ func (n *Notifier) OnNewEvent(
}
}
- n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos)
+ n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
} else if roomID != "" {
- n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos)
+ n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
} else if len(userIDs) > 0 {
- n.wakeupUsers(userIDs, nil, latestPos)
+ n.wakeupUsers(userIDs, nil, n.currPos)
} else {
log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
@@ -155,20 +154,33 @@ func (n *Notifier) OnNewSendToDevice(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
- n.wakeupUserDevice(userID, deviceIDs, latestPos)
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUserDevice(userID, deviceIDs, n.currPos)
+}
+
+// OnNewReceipt updates the current position
+func (n *Notifier) OnNewTyping(
+ roomID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
// OnNewReceipt updates the current position
func (n *Notifier) OnNewReceipt(
+ roomID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
func (n *Notifier) OnNewKeyChange(
@@ -176,9 +188,19 @@ func (n *Notifier) OnNewKeyChange(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
- latestPos := n.currPos.WithUpdates(posUpdate)
- n.currPos = latestPos
- n.wakeupUsers([]string{wakeUserID}, nil, latestPos)
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
+}
+
+func (n *Notifier) OnNewInvite(
+ posUpdate types.StreamingToken, wakeUserID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
}
// GetListener returns a UserStreamListener that can be used to wait for
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
index 39124214..d24da463 100644
--- a/syncapi/sync/notifier_test.go
+++ b/syncapi/sync/notifier_test.go
@@ -335,7 +335,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
return types.StreamingToken{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
)
- case <-listener.GetNotifyChannel(*req.since):
+ case <-listener.GetNotifyChannel(req.since):
p := listener.GetSyncPosition()
return p, nil
}
@@ -365,7 +365,7 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syn
ID: deviceID,
},
timeout: 1 * time.Minute,
- since: &since,
+ since: since,
wantFullState: false,
limit: DefaultTimelineLimit,
log: util.GetLogger(context.TODO()),
diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go
index d5cf143d..f2f2894b 100644
--- a/syncapi/sync/request.go
+++ b/syncapi/sync/request.go
@@ -46,7 +46,7 @@ type syncRequest struct {
device userapi.Device
limit int
timeout time.Duration
- since *types.StreamingToken // nil means that no since token was supplied
+ since types.StreamingToken // nil means that no since token was supplied
wantFullState bool
log *log.Entry
}
@@ -55,17 +55,13 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
- var since *types.StreamingToken
- sinceStr := req.URL.Query().Get("since")
+ since, sinceStr := types.StreamingToken{}, req.URL.Query().Get("since")
if sinceStr != "" {
- tok, err := types.NewStreamTokenFromString(sinceStr)
+ var err error
+ since, err = types.NewStreamTokenFromString(sinceStr)
if err != nil {
return nil, err
}
- since = &tok
- }
- if since == nil {
- since = &types.StreamingToken{}
}
timelineLimit := DefaultTimelineLimit
// TODO: read from stored filters too
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 3a31edd0..0751487a 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -185,13 +185,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
- sincePos := *syncReq.since
+ sincePos := syncReq.since
for {
select {
// Wait for notifier to wake us up
case <-userStreamListener.GetNotifyChannel(sincePos):
currPos = userStreamListener.GetSyncPosition()
- sincePos = currPos
// Or for timeout to expire
case <-timer.C:
// We just need to ensure we get out of the select after reaching the
@@ -279,7 +278,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
res := types.NewResponse()
// See if we have any new tasks to do for the send-to-device messaging.
- events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, *req.since)
+ lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since)
if err != nil {
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
}
@@ -291,7 +290,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
}
} else {
- res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
+ res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState)
if err != nil {
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
}
@@ -302,7 +301,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return res, fmt.Errorf("rp.appendAccountData: %w", err)
}
- res, err = rp.appendDeviceLists(res, req.device.UserID, *req.since, latestPos)
+ res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos)
if err != nil {
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
}
@@ -316,7 +315,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database.
- err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, *req.since)
+ err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since)
if err != nil {
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
}
@@ -326,15 +325,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
for _, event := range events {
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
}
-
- // Get the next_batch from the sync response and increase the
- // EDU counter.
- if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
- pos.SendToDevicePosition++
- res.NextBatch = pos.String()
- }
}
+ res.NextBatch.SendToDevicePosition = lastPos
return res, err
}
@@ -464,7 +457,7 @@ func (rp *RequestPool) appendAccountData(
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
- if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState {
+ if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
return true
}
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)