diff options
author | Kegsay <kegan@matrix.org> | 2020-07-30 14:52:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-30 14:52:21 +0100 |
commit | a2174d3294841dbdf201bde76de3ffc44399fcbc (patch) | |
tree | 64f5c0d7dee038960941a937dd84631d53c1e9a9 /syncapi/sync | |
parent | 9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (diff) |
Implement /keys/changes (#1232)
* Implement /keys/changes
And refactor QueryKeyChanges to accept a `to` offset.
* Unbreak tests
* Sort keys when serialising log tokens
Diffstat (limited to 'syncapi/sync')
-rw-r--r-- | syncapi/sync/requestpool.go | 60 |
1 files changed, 52 insertions, 8 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 754d6983..f817f098 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -143,6 +143,55 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } +func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse { + from := req.URL.Query().Get("from") + to := req.URL.Query().Get("to") + if from == "" || to == "" { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="), + } + } + fromToken, err := types.NewStreamTokenFromString(from) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidArgumentValue("bad 'from' value"), + } + } + toToken, err := types.NewStreamTokenFromString(to) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidArgumentValue("bad 'to' value"), + } + } + // work out room joins/leaves + res, err := rp.db.IncrementalSync( + req.Context(), types.NewResponse(), *device, fromToken, toToken, 0, false, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync") + return jsonerror.InternalServerError() + } + + res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info") + return jsonerror.InternalServerError() + } + return util.JSONResponse{ + Code: 200, + JSON: struct { + Changed []string `json:"changed"` + Left []string `json:"left"` + }{ + Changed: res.DeviceLists.Changed, + Left: res.DeviceLists.Left, + }, + } +} + func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) { res = types.NewResponse() @@ -172,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea if err != nil { return } - res, err = rp.appendDeviceLists(res, req.device.UserID, since) + res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos) if err != nil { return } @@ -205,14 +254,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea } func (rp *RequestPool) appendDeviceLists( - data *types.Response, userID string, since types.StreamingToken, + data *types.Response, userID string, since, to types.StreamingToken, ) (*types.Response, error) { - // TODO: Currently this code will race which may result in duplicates but not missing data. - // This happens because, whilst we are told the range to fetch here (since / latest) the - // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then - // returns the latest position with which the response has authority on). We'd need to tweak - // the API to expose a "to" value to fix this. - _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since) + _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to) if err != nil { return nil, err } |