diff options
author | kegsay <kegan@matrix.org> | 2022-05-19 09:00:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-19 09:00:56 +0100 |
commit | 21dd5a7176e52d018b91854db273424e4430af7b (patch) | |
tree | 36f1d5f22bc6736d372b59b90e10b023ee143a45 | |
parent | f321a7d55ea75e6a5276cd88eddcbbc82ceeaaeb (diff) |
syncapi: don't return early for no-op incremental syncs (#2473)
* syncapi: don't return early for no-op incremental syncs
Comments explain why, but basically it's an inefficient use
of bandwidth and some sytests rely on /sync to block.
* Honour timeouts
* Actually return a response with timeout=0
-rw-r--r-- | syncapi/sync/requestpool.go | 246 | ||||
-rw-r--r-- | syncapi/types/types.go | 13 |
2 files changed, 149 insertions, 110 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index ad151f70..7b9526b5 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -251,125 +251,151 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() - currentPos := rp.Notifier.CurrentPosition() - - if !rp.shouldReturnImmediately(syncReq, currentPos) { - timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above - defer timer.Stop() - - userStreamListener := rp.Notifier.GetListener(*syncReq) - defer userStreamListener.Close() - - giveup := func() util.JSONResponse { - syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached") - syncReq.Response.NextBatch = syncReq.Since - // We should always try to include OTKs in sync responses, otherwise clients might upload keys - // even if that's not required. See also: - // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281 - // Only try to get OTKs if the context isn't already done. - if syncReq.Context.Err() == nil { - err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response) - if err != nil && err != context.Canceled { - syncReq.Log.WithError(err).Warn("failed to get OTK counts") + // loop until we get some data + for { + startTime := time.Now() + currentPos := rp.Notifier.CurrentPosition() + + // if the since token matches the current positions, wait via the notifier + if !rp.shouldReturnImmediately(syncReq, currentPos) { + timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above + defer timer.Stop() + + userStreamListener := rp.Notifier.GetListener(*syncReq) + defer userStreamListener.Close() + + giveup := func() util.JSONResponse { + syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached") + syncReq.Response.NextBatch = syncReq.Since + // We should always try to include OTKs in sync responses, otherwise clients might upload keys + // even if that's not required. See also: + // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281 + // Only try to get OTKs if the context isn't already done. + if syncReq.Context.Err() == nil { + err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response) + if err != nil && err != context.Canceled { + syncReq.Log.WithError(err).Warn("failed to get OTK counts") + } + } + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncReq.Response, } } - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncReq.Response, - } - } - select { - case <-syncReq.Context.Done(): // Caller gave up - return giveup() + select { + case <-syncReq.Context.Done(): // Caller gave up + return giveup() - case <-timer.C: // Timeout reached - return giveup() + case <-timer.C: // Timeout reached + return giveup() - case <-userStreamListener.GetNotifyChannel(syncReq.Since): - syncReq.Log.Debugln("Responding to sync after wake-up") - currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) + case <-userStreamListener.GetNotifyChannel(syncReq.Since): + syncReq.Log.Debugln("Responding to sync after wake-up") + currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) + } + } else { + syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately") } - } else { - syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately") - } - if syncReq.Since.IsEmpty() { - // Complete sync - syncReq.Response.NextBatch = types.StreamingToken{ - PDUPosition: rp.streams.PDUStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - TypingPosition: rp.streams.TypingStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - InvitePosition: rp.streams.InviteStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync( - syncReq.Context, syncReq, - ), - } - } else { - // Incremental sync - syncReq.Response.NextBatch = types.StreamingToken{ - PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.PDUPosition, currentPos.PDUPosition, - ), - TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.TypingPosition, currentPos.TypingPosition, - ), - ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition, - ), - InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.InvitePosition, currentPos.InvitePosition, - ), - SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition, - ), - AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, - ), - NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition, - ), - DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, - ), - PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync( - syncReq.Context, syncReq, - syncReq.Since.PresencePosition, currentPos.PresencePosition, - ), + if syncReq.Since.IsEmpty() { + // Complete sync + syncReq.Response.NextBatch = types.StreamingToken{ + PDUPosition: rp.streams.PDUStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + TypingPosition: rp.streams.TypingStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + InvitePosition: rp.streams.InviteStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + } + } else { + // Incremental sync + syncReq.Response.NextBatch = types.StreamingToken{ + PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.PDUPosition, currentPos.PDUPosition, + ), + TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.TypingPosition, currentPos.TypingPosition, + ), + ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition, + ), + InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.InvitePosition, currentPos.InvitePosition, + ), + SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition, + ), + AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, + ), + NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition, + ), + DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, + ), + PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.PresencePosition, currentPos.PresencePosition, + ), + } + // it's possible for there to be no updates for this user even though since < current pos, + // e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op + // response immediately, so let's try this again but pretend they bumped their since token. + // If the incremental sync was processed very quickly then we expect the next loop to block + // with a notifier, but if things are slow it's entirely possible that currentPos is no + // longer the current position so we will hit this code path again. We need to do this and + // not return a no-op response because: + // - It's an inefficient use of bandwidth. + // - Some sytests which test 'waking up' sync rely on some sync requests to block, which + // they weren't always doing, resulting in flakey tests. + if !syncReq.Response.HasUpdates() { + syncReq.Since = currentPos + // do not loop again if the ?timeout= is 0 as that means "return immediately" + if syncReq.Timeout > 0 { + syncReq.Timeout = syncReq.Timeout - time.Since(startTime) + if syncReq.Timeout < 0 { + syncReq.Timeout = 0 + } + continue + } + } } - } - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncReq.Response, + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncReq.Response, + } } } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index ba6b4f8c..159fa08b 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -350,6 +350,19 @@ type Response struct { DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` } +func (r *Response) HasUpdates() bool { + // purposefully exclude DeviceListsOTKCount as we always include them + return (len(r.AccountData.Events) > 0 || + len(r.Presence.Events) > 0 || + len(r.Rooms.Invite) > 0 || + len(r.Rooms.Join) > 0 || + len(r.Rooms.Leave) > 0 || + len(r.Rooms.Peek) > 0 || + len(r.ToDevice.Events) > 0 || + len(r.DeviceLists.Changed) > 0 || + len(r.DeviceLists.Left) > 0) +} + // NewResponse creates an empty response with initialised maps. func NewResponse() *Response { res := Response{} |