aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-30 14:52:21 +0100
committerGitHub <noreply@github.com>2020-07-30 14:52:21 +0100
commita2174d3294841dbdf201bde76de3ffc44399fcbc (patch)
tree64f5c0d7dee038960941a937dd84631d53c1e9a9 /syncapi
parent9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (diff)
Implement /keys/changes (#1232)
* Implement /keys/changes And refactor QueryKeyChanges to accept a `to` offset. * Unbreak tests * Sort keys when serialising log tokens
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/internal/keychange.go26
-rw-r--r--syncapi/internal/keychange_test.go35
-rw-r--r--syncapi/routing/routing.go4
-rw-r--r--syncapi/sync/requestpool.go60
-rw-r--r--syncapi/types/types.go8
-rw-r--r--syncapi/types/types_test.go7
6 files changed, 99 insertions, 41 deletions
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index b594cc62..cb4fca7d 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -33,15 +33,15 @@ const DeviceListLogName = "dl"
// be already filled in with join/leave information.
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
- userID string, res *types.Response, tok types.StreamingToken,
-) (newTok *types.StreamingToken, hasNew bool, err error) {
+ userID string, res *types.Response, from, to types.StreamingToken,
+) (hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
- return nil, false, err
+ return false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
@@ -54,7 +54,7 @@ func DeviceListCatchup(
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
- logOffset := tok.Log(DeviceListLogName)
+ logOffset := from.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
@@ -62,15 +62,23 @@ func DeviceListCatchup(
partition = -1
offset = sarama.OffsetOldest
}
+ var toOffset int64
+ toLog := to.Log(DeviceListLogName)
+ if toLog != nil {
+ toOffset = toLog.Offset
+ } else {
+ toOffset = sarama.OffsetNewest
+ }
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
+ ToOffset: toOffset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
- return
+ return hasNew, nil
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
@@ -82,13 +90,7 @@ func DeviceListCatchup(
hasNew = true
}
}
- // Make a new streaming token using the new offset
- tok.SetLog(DeviceListLogName, &types.LogPosition{
- Offset: queryRes.Offset,
- Partition: queryRes.Partition,
- })
- newTok = &tok
- return
+ return hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index d0d27e44..3f18696c 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -6,6 +6,7 @@ import (
"sort"
"testing"
+ "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -15,6 +16,12 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.NewStreamToken(0, 0, nil)
+ newestToken = types.NewStreamToken(0, 0, map[string]*types.LogPosition{
+ DeviceListLogName: &types.LogPosition{
+ Offset: sarama.OffsetNewest,
+ Partition: 0,
+ },
+ })
)
type mockKeyAPI struct{}
@@ -162,12 +169,12 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -184,12 +191,12 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -206,12 +213,12 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -227,12 +234,12 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -286,11 +293,11 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -311,13 +318,13 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -396,12 +403,12 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index a98955c5..ed0f872e 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -75,4 +75,8 @@ func Setup(
return GetFilter(req, device, syncDB, vars["userId"], vars["filterId"])
}),
).Methods(http.MethodGet, http.MethodOptions)
+
+ r0mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ return srp.OnIncomingKeyChangeRequest(req, device)
+ })).Methods(http.MethodGet, http.MethodOptions)
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 754d6983..f817f098 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -143,6 +143,55 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
+func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
+ from := req.URL.Query().Get("from")
+ to := req.URL.Query().Get("to")
+ if from == "" || to == "" {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="),
+ }
+ }
+ fromToken, err := types.NewStreamTokenFromString(from)
+ if err != nil {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("bad 'from' value"),
+ }
+ }
+ toToken, err := types.NewStreamTokenFromString(to)
+ if err != nil {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
+ }
+ }
+ // work out room joins/leaves
+ res, err := rp.db.IncrementalSync(
+ req.Context(), types.NewResponse(), *device, fromToken, toToken, 0, false,
+ )
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
+ return jsonerror.InternalServerError()
+ }
+
+ res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
+ return jsonerror.InternalServerError()
+ }
+ return util.JSONResponse{
+ Code: 200,
+ JSON: struct {
+ Changed []string `json:"changed"`
+ Left []string `json:"left"`
+ }{
+ Changed: res.DeviceLists.Changed,
+ Left: res.DeviceLists.Left,
+ },
+ }
+}
+
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
res = types.NewResponse()
@@ -172,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
- res, err = rp.appendDeviceLists(res, req.device.UserID, since)
+ res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil {
return
}
@@ -205,14 +254,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
}
func (rp *RequestPool) appendDeviceLists(
- data *types.Response, userID string, since types.StreamingToken,
+ data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
- // TODO: Currently this code will race which may result in duplicates but not missing data.
- // This happens because, whilst we are told the range to fetch here (since / latest) the
- // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
- // returns the latest position with which the response has authority on). We'd need to tweak
- // the API to expose a "to" value to fix this.
- _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
+ _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
if err != nil {
return nil, err
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index f20c73bf..4761cce2 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "sort"
"strconv"
"strings"
@@ -129,15 +130,14 @@ func (t *StreamingToken) EDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *StreamingToken) String() string {
- logStrings := []string{
- t.syncToken.String(),
- }
+ var logStrings []string
for name, lp := range t.logs {
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
}
+ sort.Strings(logStrings)
// E.g s11_22_33.dl0-134.ab1-441
- return strings.Join(logStrings, ".")
+ return strings.Join(append([]string{t.syncToken.String()}, logStrings...), ".")
}
// IsAfter returns true if ANY position in this token is greater than `other`.
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index 7590ea52..634f84dc 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -20,7 +20,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
},
},
},
- "s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{
+ "s4_0.ab-1-14419482332.dl-0-123": &StreamingToken{
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
logs: map[string]*LogPosition{
"ab": &LogPosition{
@@ -46,8 +46,9 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
if !reflect.DeepEqual(got, *want) {
t.Errorf("%s mismatch: got %v want %v", tok, got, want)
}
- if got.String() != tok {
- t.Errorf("%s reserialisation mismatch: got %s want %s", tok, got.String(), tok)
+ gotStr := got.String()
+ if gotStr != tok {
+ t.Errorf("%s reserialisation mismatch: got %s want %s", tok, gotStr, tok)
}
}
}