diff options
author | Kegsay <kegan@matrix.org> | 2020-07-30 14:52:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-30 14:52:21 +0100 |
commit | a2174d3294841dbdf201bde76de3ffc44399fcbc (patch) | |
tree | 64f5c0d7dee038960941a937dd84631d53c1e9a9 /syncapi | |
parent | 9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (diff) |
Implement /keys/changes (#1232)
* Implement /keys/changes
And refactor QueryKeyChanges to accept a `to` offset.
* Unbreak tests
* Sort keys when serialising log tokens
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/internal/keychange.go | 26 | ||||
-rw-r--r-- | syncapi/internal/keychange_test.go | 35 | ||||
-rw-r--r-- | syncapi/routing/routing.go | 4 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 60 | ||||
-rw-r--r-- | syncapi/types/types.go | 8 | ||||
-rw-r--r-- | syncapi/types/types_test.go | 7 |
6 files changed, 99 insertions, 41 deletions
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index b594cc62..cb4fca7d 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -33,15 +33,15 @@ const DeviceListLogName = "dl" // be already filled in with join/leave information. func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, - userID string, res *types.Response, tok types.StreamingToken, -) (newTok *types.StreamingToken, hasNew bool, err error) { + userID string, res *types.Response, from, to types.StreamingToken, +) (hasNew bool, err error) { // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. newlyJoinedRooms := joinedRooms(res, userID) newlyLeftRooms := leftRooms(res) if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { - return nil, false, err + return false, err } res.DeviceLists.Changed = changed res.DeviceLists.Left = left @@ -54,7 +54,7 @@ func DeviceListCatchup( var offset int64 // Extract partition/offset from sync token // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. - logOffset := tok.Log(DeviceListLogName) + logOffset := from.Log(DeviceListLogName) if logOffset != nil { partition = logOffset.Partition offset = logOffset.Offset @@ -62,15 +62,23 @@ func DeviceListCatchup( partition = -1 offset = sarama.OffsetOldest } + var toOffset int64 + toLog := to.Log(DeviceListLogName) + if toLog != nil { + toOffset = toLog.Offset + } else { + toOffset = sarama.OffsetNewest + } var queryRes api.QueryKeyChangesResponse keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ Partition: partition, Offset: offset, + ToOffset: toOffset, }, &queryRes) if queryRes.Error != nil { // don't fail the catchup because we may have got useful information by tracking membership util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - return + return hasNew, nil } userSet := make(map[string]bool) for _, userID := range res.DeviceLists.Changed { @@ -82,13 +90,7 @@ func DeviceListCatchup( hasNew = true } } - // Make a new streaming token using the new offset - tok.SetLog(DeviceListLogName, &types.LogPosition{ - Offset: queryRes.Offset, - Partition: queryRes.Partition, - }) - newTok = &tok - return + return hasNew, nil } // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index d0d27e44..3f18696c 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -6,6 +6,7 @@ import ( "sort" "testing" + "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/syncapi/types" @@ -15,6 +16,12 @@ import ( var ( syncingUser = "@alice:localhost" emptyToken = types.NewStreamToken(0, 0, nil) + newestToken = types.NewStreamToken(0, 0, map[string]*types.LogPosition{ + DeviceListLogName: &types.LogPosition{ + Offset: sarama.OffsetNewest, + Partition: 0, + }, + }) ) type mockKeyAPI struct{} @@ -162,12 +169,12 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -184,12 +191,12 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -206,12 +213,12 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -227,12 +234,12 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -286,11 +293,11 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -311,13 +318,13 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -396,12 +403,12 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {newShareUser, newShareUser2}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index a98955c5..ed0f872e 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -75,4 +75,8 @@ func Setup( return GetFilter(req, device, syncDB, vars["userId"], vars["filterId"]) }), ).Methods(http.MethodGet, http.MethodOptions) + + r0mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return srp.OnIncomingKeyChangeRequest(req, device) + })).Methods(http.MethodGet, http.MethodOptions) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 754d6983..f817f098 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -143,6 +143,55 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } +func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse { + from := req.URL.Query().Get("from") + to := req.URL.Query().Get("to") + if from == "" || to == "" { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="), + } + } + fromToken, err := types.NewStreamTokenFromString(from) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidArgumentValue("bad 'from' value"), + } + } + toToken, err := types.NewStreamTokenFromString(to) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidArgumentValue("bad 'to' value"), + } + } + // work out room joins/leaves + res, err := rp.db.IncrementalSync( + req.Context(), types.NewResponse(), *device, fromToken, toToken, 0, false, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync") + return jsonerror.InternalServerError() + } + + res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info") + return jsonerror.InternalServerError() + } + return util.JSONResponse{ + Code: 200, + JSON: struct { + Changed []string `json:"changed"` + Left []string `json:"left"` + }{ + Changed: res.DeviceLists.Changed, + Left: res.DeviceLists.Left, + }, + } +} + func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) { res = types.NewResponse() @@ -172,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea if err != nil { return } - res, err = rp.appendDeviceLists(res, req.device.UserID, since) + res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos) if err != nil { return } @@ -205,14 +254,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea } func (rp *RequestPool) appendDeviceLists( - data *types.Response, userID string, since types.StreamingToken, + data *types.Response, userID string, since, to types.StreamingToken, ) (*types.Response, error) { - // TODO: Currently this code will race which may result in duplicates but not missing data. - // This happens because, whilst we are told the range to fetch here (since / latest) the - // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then - // returns the latest position with which the response has authority on). We'd need to tweak - // the API to expose a "to" value to fix this. - _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since) + _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to) if err != nil { return nil, err } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index f20c73bf..4761cce2 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -18,6 +18,7 @@ import ( "encoding/json" "errors" "fmt" + "sort" "strconv" "strings" @@ -129,15 +130,14 @@ func (t *StreamingToken) EDUPosition() StreamPosition { return t.Positions[1] } func (t *StreamingToken) String() string { - logStrings := []string{ - t.syncToken.String(), - } + var logStrings []string for name, lp := range t.logs { logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset) logStrings = append(logStrings, logStr) } + sort.Strings(logStrings) // E.g s11_22_33.dl0-134.ab1-441 - return strings.Join(logStrings, ".") + return strings.Join(append([]string{t.syncToken.String()}, logStrings...), ".") } // IsAfter returns true if ANY position in this token is greater than `other`. diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 7590ea52..634f84dc 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -20,7 +20,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) { }, }, }, - "s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{ + "s4_0.ab-1-14419482332.dl-0-123": &StreamingToken{ syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, logs: map[string]*LogPosition{ "ab": &LogPosition{ @@ -46,8 +46,9 @@ func TestNewSyncTokenWithLogs(t *testing.T) { if !reflect.DeepEqual(got, *want) { t.Errorf("%s mismatch: got %v want %v", tok, got, want) } - if got.String() != tok { - t.Errorf("%s reserialisation mismatch: got %s want %s", tok, got.String(), tok) + gotStr := got.String() + if gotStr != tok { + t.Errorf("%s reserialisation mismatch: got %s want %s", tok, gotStr, tok) } } } |