aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync/requestpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/sync/requestpool.go')
-rw-r--r--syncapi/sync/requestpool.go21
1 files changed, 7 insertions, 14 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 3a31edd0..0751487a 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -185,13 +185,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
- sincePos := *syncReq.since
+ sincePos := syncReq.since
for {
select {
// Wait for notifier to wake us up
case <-userStreamListener.GetNotifyChannel(sincePos):
currPos = userStreamListener.GetSyncPosition()
- sincePos = currPos
// Or for timeout to expire
case <-timer.C:
// We just need to ensure we get out of the select after reaching the
@@ -279,7 +278,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
res := types.NewResponse()
// See if we have any new tasks to do for the send-to-device messaging.
- events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, *req.since)
+ lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since)
if err != nil {
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
}
@@ -291,7 +290,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
}
} else {
- res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
+ res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState)
if err != nil {
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
}
@@ -302,7 +301,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return res, fmt.Errorf("rp.appendAccountData: %w", err)
}
- res, err = rp.appendDeviceLists(res, req.device.UserID, *req.since, latestPos)
+ res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos)
if err != nil {
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
}
@@ -316,7 +315,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database.
- err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, *req.since)
+ err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since)
if err != nil {
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
}
@@ -326,15 +325,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
for _, event := range events {
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
}
-
- // Get the next_batch from the sync response and increase the
- // EDU counter.
- if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
- pos.SendToDevicePosition++
- res.NextBatch = pos.String()
- }
}
+ res.NextBatch.SendToDevicePosition = lastPos
return res, err
}
@@ -464,7 +457,7 @@ func (rp *RequestPool) appendAccountData(
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
- if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState {
+ if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
return true
}
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)