aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrendan Abolivier <contact@brendanabolivier.com>2018-12-18 17:56:08 +0000
committerGitHub <noreply@github.com>2018-12-18 17:56:08 +0000
commit170264404e66dac3350c3a977a3bc9ba61f70e67 (patch)
tree663d39521583bfc09ff56aef820b6060be213f63
parent2133e6bf59cf83401e2e467ad3c5a5af85b28202 (diff)
Fix the sync api returning an empty sync response when reaching timeout, regardless of the since token (#595)
* Fix the sync api returning an empty sync response when reaching timeout, regardless of the since token * Declare syncData from the start to avoid declaration shadowing * Bump Kafka version Since this is a huge bump, it might completely blow up - I'll revert it if that's the case * Put the loop and the IsEmpty() check back in sync handler * Add doc and allow returning if timeout is reached
-rwxr-xr-xscripts/install-local-kafka.sh4
-rw-r--r--src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go25
2 files changed, 19 insertions, 10 deletions
diff --git a/scripts/install-local-kafka.sh b/scripts/install-local-kafka.sh
index d1fef38e..19ce0911 100755
--- a/scripts/install-local-kafka.sh
+++ b/scripts/install-local-kafka.sh
@@ -9,9 +9,9 @@ cd `dirname $0`/..
mkdir -p .downloads
# The mirror to download kafka from is picked from the list of mirrors at
-# https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.11.0.2.tgz
+# https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# TODO: Check the signature since we are downloading over HTTP.
-MIRROR=http://apache.mirror.anlx.net/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz
+MIRROR=http://apache.mirror.anlx.net/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# Only download the kafka if it isn't already downloaded.
test -f .downloads/kafka.tgz || wget $MIRROR -O .downloads/kafka.tgz
diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
index 5c560ff5..89137eb5 100644
--- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
+++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
@@ -45,6 +45,8 @@ func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.D
// called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out.
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse {
+ var syncData *types.Response
+
// Extract values from request
logger := util.GetLogger(req.Context())
userID := device.UserID
@@ -65,7 +67,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// If this is an initial sync or timeout=0 we return immediately
if syncReq.since == nil || syncReq.timeout == 0 {
- syncData, err := rp.currentSyncForUser(*syncReq, currPos)
+ syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
return httputil.LogThenError(req, err)
}
@@ -84,6 +86,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
userStreamListener := rp.notifier.GetListener(*syncReq)
defer userStreamListener.Close()
+ // We need the loop in case userStreamListener wakes up even if there isn't
+ // anything to send down. In this case, we'll jump out of the select but
+ // don't want to send anything back until we get some actual content to
+ // respond with, so we skip the return an go back to waiting for content to
+ // be sent down or the request timing out.
+ var hasTimedOut bool
for {
select {
// Wait for notifier to wake us up
@@ -91,10 +99,11 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
currPos = userStreamListener.GetStreamPosition()
// Or for timeout to expire
case <-timer.C:
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: types.NewResponse(currPos),
- }
+ // We just need to ensure we get out of the select after reaching the
+ // timeout, but there's nothing specific we want to do in this case
+ // apart from that, so we do nothing except stating we're timing out
+ // and need to respond.
+ hasTimedOut = true
// Or for the request to be cancelled
case <-req.Context().Done():
return httputil.LogThenError(req, req.Context().Err())
@@ -105,17 +114,17 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// of calculating the sync only to get timed out before we
// can respond
- syncData, err := rp.currentSyncForUser(*syncReq, currPos)
+ syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
return httputil.LogThenError(req, err)
}
- if !syncData.IsEmpty() {
+
+ if !syncData.IsEmpty() || hasTimedOut {
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
}
}
-
}
}