diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-18 11:11:21 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-18 11:11:21 +0000 |
commit | 50963b724b12f4f8155ac00a677afe38d0bd69d8 (patch) | |
tree | 11a9ff7d4dca995b770e5cd3ad512c4ba3d1a98c /syncapi/sync | |
parent | a518e2971ab034cf4b97f39423e835452ca97540 (diff) |
More sane next batch handling, typing notification tweaks, give invites their own stream position, device list fix (#1641)
* Update sync responses
* Fix positions, add ApplyUpdates
* Fix MarshalText as non-pointer, PrevBatch is optional
* Increment by number of read receipts
* Merge branch 'master' into neilalexander/devicelist
* Tweak typing
* Include keyserver position tweak
* Fix typing next position in all cases
* Tweaks
* Fix typo
* Tweaks, restore StreamingToken.MarshalText which somehow went missing?
* Rely on positions from notifier rather than manually advancing them
* Revert "Rely on positions from notifier rather than manually advancing them"
This reverts commit 53112a62cc3bfd9989acab518e69eeb27938117a.
* Give invites their own position, fix other things
* Fix test
* Fix invites maybe
* Un-whitelist tests that look to be genuinely wrong
* Use real receipt positions
* Ensure send-to-device uses real positions too
Diffstat (limited to 'syncapi/sync')
-rw-r--r-- | syncapi/sync/notifier.go | 48 | ||||
-rw-r--r-- | syncapi/sync/notifier_test.go | 4 | ||||
-rw-r--r-- | syncapi/sync/request.go | 12 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 21 |
4 files changed, 48 insertions, 37 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 1d8cd624..66460a8d 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -77,9 +77,8 @@ func (n *Notifier) OnNewEvent( // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos + n.currPos.ApplyUpdates(posUpdate) n.removeEmptyUserStreams() if ev != nil { @@ -113,11 +112,11 @@ func (n *Notifier) OnNewEvent( } } - n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos) + n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos) + n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, nil, latestPos) + n.wakeupUsers(userIDs, nil, n.currPos) } else { log.WithFields(log.Fields{ "posUpdate": posUpdate.String, @@ -155,20 +154,33 @@ func (n *Notifier) OnNewSendToDevice( ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos - n.wakeupUserDevice(userID, deviceIDs, latestPos) + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUserDevice(userID, deviceIDs, n.currPos) +} + +// OnNewReceipt updates the current position +func (n *Notifier) OnNewTyping( + roomID string, + posUpdate types.StreamingToken, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) } // OnNewReceipt updates the current position func (n *Notifier) OnNewReceipt( + roomID string, posUpdate types.StreamingToken, ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) } func (n *Notifier) OnNewKeyChange( @@ -176,9 +188,19 @@ func (n *Notifier) OnNewKeyChange( ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos - n.wakeupUsers([]string{wakeUserID}, nil, latestPos) + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) +} + +func (n *Notifier) OnNewInvite( + posUpdate types.StreamingToken, wakeUserID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) } // GetListener returns a UserStreamListener that can be used to wait for diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index 39124214..d24da463 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -335,7 +335,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) { return types.StreamingToken{}, fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since, ) - case <-listener.GetNotifyChannel(*req.since): + case <-listener.GetNotifyChannel(req.since): p := listener.GetSyncPosition() return p, nil } @@ -365,7 +365,7 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syn ID: deviceID, }, timeout: 1 * time.Minute, - since: &since, + since: since, wantFullState: false, limit: DefaultTimelineLimit, log: util.GetLogger(context.TODO()), diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index d5cf143d..f2f2894b 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -46,7 +46,7 @@ type syncRequest struct { device userapi.Device limit int timeout time.Duration - since *types.StreamingToken // nil means that no since token was supplied + since types.StreamingToken // nil means that no since token was supplied wantFullState bool log *log.Entry } @@ -55,17 +55,13 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" - var since *types.StreamingToken - sinceStr := req.URL.Query().Get("since") + since, sinceStr := types.StreamingToken{}, req.URL.Query().Get("since") if sinceStr != "" { - tok, err := types.NewStreamTokenFromString(sinceStr) + var err error + since, err = types.NewStreamTokenFromString(sinceStr) if err != nil { return nil, err } - since = &tok - } - if since == nil { - since = &types.StreamingToken{} } timelineLimit := DefaultTimelineLimit // TODO: read from stored filters too diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 3a31edd0..0751487a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -185,13 +185,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. // respond with, so we skip the return an go back to waiting for content to // be sent down or the request timing out. var hasTimedOut bool - sincePos := *syncReq.since + sincePos := syncReq.since for { select { // Wait for notifier to wake us up case <-userStreamListener.GetNotifyChannel(sincePos): currPos = userStreamListener.GetSyncPosition() - sincePos = currPos // Or for timeout to expire case <-timer.C: // We just need to ensure we get out of the select after reaching the @@ -279,7 +278,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea res := types.NewResponse() // See if we have any new tasks to do for the send-to-device messaging. - events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, *req.since) + lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since) if err != nil { return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err) } @@ -291,7 +290,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return res, fmt.Errorf("rp.db.CompleteSync: %w", err) } } else { - res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState) + res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState) if err != nil { return res, fmt.Errorf("rp.db.IncrementalSync: %w", err) } @@ -302,7 +301,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea if err != nil { return res, fmt.Errorf("rp.appendAccountData: %w", err) } - res, err = rp.appendDeviceLists(res, req.device.UserID, *req.since, latestPos) + res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos) if err != nil { return res, fmt.Errorf("rp.appendDeviceLists: %w", err) } @@ -316,7 +315,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea // Then add the updates into the sync response. if len(updates) > 0 || len(deletions) > 0 { // Handle the updates and deletions in the database. - err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, *req.since) + err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since) if err != nil { return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err) } @@ -326,15 +325,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea for _, event := range events { res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent) } - - // Get the next_batch from the sync response and increase the - // EDU counter. - if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil { - pos.SendToDevicePosition++ - res.NextBatch = pos.String() - } } + res.NextBatch.SendToDevicePosition = lastPos return res, err } @@ -464,7 +457,7 @@ func (rp *RequestPool) appendAccountData( // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool { - if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState { + if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState { return true } waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID) |