aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync/requestpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/sync/requestpool.go')
-rw-r--r--syncapi/sync/requestpool.go60
1 files changed, 52 insertions, 8 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 754d6983..f817f098 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -143,6 +143,55 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
+func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
+ from := req.URL.Query().Get("from")
+ to := req.URL.Query().Get("to")
+ if from == "" || to == "" {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="),
+ }
+ }
+ fromToken, err := types.NewStreamTokenFromString(from)
+ if err != nil {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("bad 'from' value"),
+ }
+ }
+ toToken, err := types.NewStreamTokenFromString(to)
+ if err != nil {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
+ }
+ }
+ // work out room joins/leaves
+ res, err := rp.db.IncrementalSync(
+ req.Context(), types.NewResponse(), *device, fromToken, toToken, 0, false,
+ )
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
+ return jsonerror.InternalServerError()
+ }
+
+ res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
+ return jsonerror.InternalServerError()
+ }
+ return util.JSONResponse{
+ Code: 200,
+ JSON: struct {
+ Changed []string `json:"changed"`
+ Left []string `json:"left"`
+ }{
+ Changed: res.DeviceLists.Changed,
+ Left: res.DeviceLists.Left,
+ },
+ }
+}
+
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
res = types.NewResponse()
@@ -172,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
- res, err = rp.appendDeviceLists(res, req.device.UserID, since)
+ res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil {
return
}
@@ -205,14 +254,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
}
func (rp *RequestPool) appendDeviceLists(
- data *types.Response, userID string, since types.StreamingToken,
+ data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
- // TODO: Currently this code will race which may result in duplicates but not missing data.
- // This happens because, whilst we are told the range to fetch here (since / latest) the
- // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
- // returns the latest position with which the response has authority on). We'd need to tweak
- // the API to expose a "to" value to fix this.
- _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
+ _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
if err != nil {
return nil, err
}