aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2022-05-19 09:00:56 +0100
committerGitHub <noreply@github.com>2022-05-19 09:00:56 +0100
commit21dd5a7176e52d018b91854db273424e4430af7b (patch)
tree36f1d5f22bc6736d372b59b90e10b023ee143a45 /syncapi
parentf321a7d55ea75e6a5276cd88eddcbbc82ceeaaeb (diff)
syncapi: don't return early for no-op incremental syncs (#2473)
* syncapi: don't return early for no-op incremental syncs Comments explain why, but basically it's an inefficient use of bandwidth and some sytests rely on /sync to block. * Honour timeouts * Actually return a response with timeout=0
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/sync/requestpool.go246
-rw-r--r--syncapi/types/types.go13
2 files changed, 149 insertions, 110 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index ad151f70..7b9526b5 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -251,125 +251,151 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
- currentPos := rp.Notifier.CurrentPosition()
-
- if !rp.shouldReturnImmediately(syncReq, currentPos) {
- timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
- defer timer.Stop()
-
- userStreamListener := rp.Notifier.GetListener(*syncReq)
- defer userStreamListener.Close()
-
- giveup := func() util.JSONResponse {
- syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached")
- syncReq.Response.NextBatch = syncReq.Since
- // We should always try to include OTKs in sync responses, otherwise clients might upload keys
- // even if that's not required. See also:
- // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
- // Only try to get OTKs if the context isn't already done.
- if syncReq.Context.Err() == nil {
- err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response)
- if err != nil && err != context.Canceled {
- syncReq.Log.WithError(err).Warn("failed to get OTK counts")
+ // loop until we get some data
+ for {
+ startTime := time.Now()
+ currentPos := rp.Notifier.CurrentPosition()
+
+ // if the since token matches the current positions, wait via the notifier
+ if !rp.shouldReturnImmediately(syncReq, currentPos) {
+ timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
+ defer timer.Stop()
+
+ userStreamListener := rp.Notifier.GetListener(*syncReq)
+ defer userStreamListener.Close()
+
+ giveup := func() util.JSONResponse {
+ syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached")
+ syncReq.Response.NextBatch = syncReq.Since
+ // We should always try to include OTKs in sync responses, otherwise clients might upload keys
+ // even if that's not required. See also:
+ // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
+ // Only try to get OTKs if the context isn't already done.
+ if syncReq.Context.Err() == nil {
+ err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response)
+ if err != nil && err != context.Canceled {
+ syncReq.Log.WithError(err).Warn("failed to get OTK counts")
+ }
+ }
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: syncReq.Response,
}
}
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: syncReq.Response,
- }
- }
- select {
- case <-syncReq.Context.Done(): // Caller gave up
- return giveup()
+ select {
+ case <-syncReq.Context.Done(): // Caller gave up
+ return giveup()
- case <-timer.C: // Timeout reached
- return giveup()
+ case <-timer.C: // Timeout reached
+ return giveup()
- case <-userStreamListener.GetNotifyChannel(syncReq.Since):
- syncReq.Log.Debugln("Responding to sync after wake-up")
- currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
+ case <-userStreamListener.GetNotifyChannel(syncReq.Since):
+ syncReq.Log.Debugln("Responding to sync after wake-up")
+ currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
+ }
+ } else {
+ syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
}
- } else {
- syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
- }
- if syncReq.Since.IsEmpty() {
- // Complete sync
- syncReq.Response.NextBatch = types.StreamingToken{
- PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
- syncReq.Context, syncReq,
- ),
- }
- } else {
- // Incremental sync
- syncReq.Response.NextBatch = types.StreamingToken{
- PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.PDUPosition, currentPos.PDUPosition,
- ),
- TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.TypingPosition, currentPos.TypingPosition,
- ),
- ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
- ),
- InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.InvitePosition, currentPos.InvitePosition,
- ),
- SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
- ),
- AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
- ),
- NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
- ),
- DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
- ),
- PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
- syncReq.Context, syncReq,
- syncReq.Since.PresencePosition, currentPos.PresencePosition,
- ),
+ if syncReq.Since.IsEmpty() {
+ // Complete sync
+ syncReq.Response.NextBatch = types.StreamingToken{
+ PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ }
+ } else {
+ // Incremental sync
+ syncReq.Response.NextBatch = types.StreamingToken{
+ PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.PDUPosition, currentPos.PDUPosition,
+ ),
+ TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.TypingPosition, currentPos.TypingPosition,
+ ),
+ ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
+ ),
+ InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.InvitePosition, currentPos.InvitePosition,
+ ),
+ SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
+ ),
+ AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
+ ),
+ NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
+ ),
+ DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
+ ),
+ PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.PresencePosition, currentPos.PresencePosition,
+ ),
+ }
+ // it's possible for there to be no updates for this user even though since < current pos,
+ // e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op
+ // response immediately, so let's try this again but pretend they bumped their since token.
+ // If the incremental sync was processed very quickly then we expect the next loop to block
+ // with a notifier, but if things are slow it's entirely possible that currentPos is no
+ // longer the current position so we will hit this code path again. We need to do this and
+ // not return a no-op response because:
+ // - It's an inefficient use of bandwidth.
+ // - Some sytests which test 'waking up' sync rely on some sync requests to block, which
+ // they weren't always doing, resulting in flakey tests.
+ if !syncReq.Response.HasUpdates() {
+ syncReq.Since = currentPos
+ // do not loop again if the ?timeout= is 0 as that means "return immediately"
+ if syncReq.Timeout > 0 {
+ syncReq.Timeout = syncReq.Timeout - time.Since(startTime)
+ if syncReq.Timeout < 0 {
+ syncReq.Timeout = 0
+ }
+ continue
+ }
+ }
}
- }
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: syncReq.Response,
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: syncReq.Response,
+ }
}
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index ba6b4f8c..159fa08b 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -350,6 +350,19 @@ type Response struct {
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
}
+func (r *Response) HasUpdates() bool {
+ // purposefully exclude DeviceListsOTKCount as we always include them
+ return (len(r.AccountData.Events) > 0 ||
+ len(r.Presence.Events) > 0 ||
+ len(r.Rooms.Invite) > 0 ||
+ len(r.Rooms.Join) > 0 ||
+ len(r.Rooms.Leave) > 0 ||
+ len(r.Rooms.Peek) > 0 ||
+ len(r.ToDevice.Events) > 0 ||
+ len(r.DeviceLists.Changed) > 0 ||
+ len(r.DeviceLists.Left) > 0)
+}
+
// NewResponse creates an empty response with initialised maps.
func NewResponse() *Response {
res := Response{}