aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-30 14:52:21 +0100
committerGitHub <noreply@github.com>2020-07-30 14:52:21 +0100
commita2174d3294841dbdf201bde76de3ffc44399fcbc (patch)
tree64f5c0d7dee038960941a937dd84631d53c1e9a9
parent9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (diff)
Implement /keys/changes (#1232)
* Implement /keys/changes And refactor QueryKeyChanges to accept a `to` offset. * Unbreak tests * Sort keys when serialising log tokens
-rw-r--r--keyserver/api/api.go3
-rw-r--r--keyserver/internal/internal.go2
-rw-r--r--keyserver/storage/interface.go5
-rw-r--r--keyserver/storage/postgres/key_changes_table.go11
-rw-r--r--keyserver/storage/shared/storage.go4
-rw-r--r--keyserver/storage/sqlite3/key_changes_table.go11
-rw-r--r--keyserver/storage/storage_test.go26
-rw-r--r--keyserver/storage/tables/interface.go4
-rw-r--r--syncapi/internal/keychange.go26
-rw-r--r--syncapi/internal/keychange_test.go35
-rw-r--r--syncapi/routing/routing.go4
-rw-r--r--syncapi/sync/requestpool.go60
-rw-r--r--syncapi/types/types.go8
-rw-r--r--syncapi/types/types_test.go7
-rw-r--r--sytest-whitelist2
15 files changed, 153 insertions, 55 deletions
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index c9afb09c..98bcd944 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -138,6 +138,9 @@ type QueryKeyChangesRequest struct {
Partition int32
// The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
Offset int64
+ // The inclusive offset where to track key changes up to. Messages with this offset are included in the response.
+ // Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
+ ToOffset int64
}
type QueryKeyChangesResponse struct {
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index 9a41e44f..70371353 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -44,7 +44,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC
if req.Partition < 0 {
req.Partition = a.Producer.DefaultPartition()
}
- userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset)
+ userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset, req.ToOffset)
if err != nil {
res.Error = &api.KeyError{
Err: err.Error(),
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go
index f4787790..7a4fce6f 100644
--- a/keyserver/storage/interface.go
+++ b/keyserver/storage/interface.go
@@ -48,7 +48,8 @@ type Database interface {
// their keys in some way.
StoreKeyChange(ctx context.Context, partition int32, offset int64, userID string) error
- // KeyChanges returns a list of user IDs who have modified their keys from the offset given.
+ // KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive).
+ // A to offset of sarama.OffsetNewest means no upper limit.
// Returns the offset of the latest key change.
- KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error)
+ KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
}
diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go
index 9d259f9f..d7f0991a 100644
--- a/keyserver/storage/postgres/key_changes_table.go
+++ b/keyserver/storage/postgres/key_changes_table.go
@@ -17,7 +17,9 @@ package postgres
import (
"context"
"database/sql"
+ "math"
+ "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -44,7 +46,7 @@ const upsertKeyChangeSQL = "" +
// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
- "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 GROUP BY user_id"
+ "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id"
type keyChangesStatements struct {
db *sql.DB
@@ -75,9 +77,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in
}
func (s *keyChangesStatements) SelectKeyChanges(
- ctx context.Context, partition int32, fromOffset int64,
+ ctx context.Context, partition int32, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset)
+ if toOffset == sarama.OffsetNewest {
+ toOffset = math.MaxInt64
+ }
+ rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}
diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go
index 537a5f7b..8c2534f5 100644
--- a/keyserver/storage/shared/storage.go
+++ b/keyserver/storage/shared/storage.go
@@ -78,6 +78,6 @@ func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset i
return d.KeyChangesTable.InsertKeyChange(ctx, partition, offset, userID)
}
-func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error) {
- return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset)
+func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) {
+ return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset)
}
diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go
index b830214d..32721eae 100644
--- a/keyserver/storage/sqlite3/key_changes_table.go
+++ b/keyserver/storage/sqlite3/key_changes_table.go
@@ -17,7 +17,9 @@ package sqlite3
import (
"context"
"database/sql"
+ "math"
+ "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -45,7 +47,7 @@ const upsertKeyChangeSQL = "" +
// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
- "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 GROUP BY user_id"
+ "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 AND offset <= $3 GROUP BY user_id"
type keyChangesStatements struct {
db *sql.DB
@@ -76,9 +78,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in
}
func (s *keyChangesStatements) SelectKeyChanges(
- ctx context.Context, partition int32, fromOffset int64,
+ ctx context.Context, partition int32, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset)
+ if toOffset == sarama.OffsetNewest {
+ toOffset = math.MaxInt64
+ }
+ rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}
diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go
index 88972478..66f6930f 100644
--- a/keyserver/storage/storage_test.go
+++ b/keyserver/storage/storage_test.go
@@ -4,6 +4,8 @@ import (
"context"
"reflect"
"testing"
+
+ "github.com/Shopify/sarama"
)
var ctx = context.Background()
@@ -24,7 +26,7 @@ func TestKeyChanges(t *testing.T) {
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost"))
- userIDs, latest, err := db.KeyChanges(ctx, 0, 1)
+ userIDs, latest, err := db.KeyChanges(ctx, 0, 1, sarama.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@@ -44,7 +46,7 @@ func TestKeyChangesNoDupes(t *testing.T) {
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@alice:localhost"))
- userIDs, latest, err := db.KeyChanges(ctx, 0, 0)
+ userIDs, latest, err := db.KeyChanges(ctx, 0, 0, sarama.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@@ -55,3 +57,23 @@ func TestKeyChangesNoDupes(t *testing.T) {
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
}
}
+
+func TestKeyChangesUpperLimit(t *testing.T) {
+ db, err := NewDatabase("file::memory:", nil)
+ if err != nil {
+ t.Fatalf("Failed to NewDatabase: %s", err)
+ }
+ MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
+ MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost"))
+ MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost"))
+ userIDs, latest, err := db.KeyChanges(ctx, 0, 0, 1)
+ if err != nil {
+ t.Fatalf("Failed to KeyChanges: %s", err)
+ }
+ if latest != 1 {
+ t.Fatalf("KeyChanges: got latest=%d want 1", latest)
+ }
+ if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) {
+ t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
+ }
+}
diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go
index 824b9f0f..8b89283f 100644
--- a/keyserver/storage/tables/interface.go
+++ b/keyserver/storage/tables/interface.go
@@ -38,5 +38,7 @@ type DeviceKeys interface {
type KeyChanges interface {
InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error
- SelectKeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error)
+ // SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets.
+ // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset.
+ SelectKeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
}
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index b594cc62..cb4fca7d 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -33,15 +33,15 @@ const DeviceListLogName = "dl"
// be already filled in with join/leave information.
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
- userID string, res *types.Response, tok types.StreamingToken,
-) (newTok *types.StreamingToken, hasNew bool, err error) {
+ userID string, res *types.Response, from, to types.StreamingToken,
+) (hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
- return nil, false, err
+ return false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
@@ -54,7 +54,7 @@ func DeviceListCatchup(
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
- logOffset := tok.Log(DeviceListLogName)
+ logOffset := from.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
@@ -62,15 +62,23 @@ func DeviceListCatchup(
partition = -1
offset = sarama.OffsetOldest
}
+ var toOffset int64
+ toLog := to.Log(DeviceListLogName)
+ if toLog != nil {
+ toOffset = toLog.Offset
+ } else {
+ toOffset = sarama.OffsetNewest
+ }
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
+ ToOffset: toOffset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
- return
+ return hasNew, nil
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
@@ -82,13 +90,7 @@ func DeviceListCatchup(
hasNew = true
}
}
- // Make a new streaming token using the new offset
- tok.SetLog(DeviceListLogName, &types.LogPosition{
- Offset: queryRes.Offset,
- Partition: queryRes.Partition,
- })
- newTok = &tok
- return
+ return hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index d0d27e44..3f18696c 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -6,6 +6,7 @@ import (
"sort"
"testing"
+ "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -15,6 +16,12 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.NewStreamToken(0, 0, nil)
+ newestToken = types.NewStreamToken(0, 0, map[string]*types.LogPosition{
+ DeviceListLogName: &types.LogPosition{
+ Offset: sarama.OffsetNewest,
+ Partition: 0,
+ },
+ })
)
type mockKeyAPI struct{}
@@ -162,12 +169,12 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -184,12 +191,12 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -206,12 +213,12 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -227,12 +234,12 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -286,11 +293,11 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -311,13 +318,13 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -396,12 +403,12 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
- }, syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index a98955c5..ed0f872e 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -75,4 +75,8 @@ func Setup(
return GetFilter(req, device, syncDB, vars["userId"], vars["filterId"])
}),
).Methods(http.MethodGet, http.MethodOptions)
+
+ r0mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ return srp.OnIncomingKeyChangeRequest(req, device)
+ })).Methods(http.MethodGet, http.MethodOptions)
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 754d6983..f817f098 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -143,6 +143,55 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
+func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
+ from := req.URL.Query().Get("from")
+ to := req.URL.Query().Get("to")
+ if from == "" || to == "" {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="),
+ }
+ }
+ fromToken, err := types.NewStreamTokenFromString(from)
+ if err != nil {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("bad 'from' value"),
+ }
+ }
+ toToken, err := types.NewStreamTokenFromString(to)
+ if err != nil {
+ return util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
+ }
+ }
+ // work out room joins/leaves
+ res, err := rp.db.IncrementalSync(
+ req.Context(), types.NewResponse(), *device, fromToken, toToken, 0, false,
+ )
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
+ return jsonerror.InternalServerError()
+ }
+
+ res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
+ return jsonerror.InternalServerError()
+ }
+ return util.JSONResponse{
+ Code: 200,
+ JSON: struct {
+ Changed []string `json:"changed"`
+ Left []string `json:"left"`
+ }{
+ Changed: res.DeviceLists.Changed,
+ Left: res.DeviceLists.Left,
+ },
+ }
+}
+
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
res = types.NewResponse()
@@ -172,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
- res, err = rp.appendDeviceLists(res, req.device.UserID, since)
+ res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil {
return
}
@@ -205,14 +254,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
}
func (rp *RequestPool) appendDeviceLists(
- data *types.Response, userID string, since types.StreamingToken,
+ data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
- // TODO: Currently this code will race which may result in duplicates but not missing data.
- // This happens because, whilst we are told the range to fetch here (since / latest) the
- // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
- // returns the latest position with which the response has authority on). We'd need to tweak
- // the API to expose a "to" value to fix this.
- _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
+ _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
if err != nil {
return nil, err
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index f20c73bf..4761cce2 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "sort"
"strconv"
"strings"
@@ -129,15 +130,14 @@ func (t *StreamingToken) EDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *StreamingToken) String() string {
- logStrings := []string{
- t.syncToken.String(),
- }
+ var logStrings []string
for name, lp := range t.logs {
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
}
+ sort.Strings(logStrings)
// E.g s11_22_33.dl0-134.ab1-441
- return strings.Join(logStrings, ".")
+ return strings.Join(append([]string{t.syncToken.String()}, logStrings...), ".")
}
// IsAfter returns true if ANY position in this token is greater than `other`.
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index 7590ea52..634f84dc 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -20,7 +20,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
},
},
},
- "s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{
+ "s4_0.ab-1-14419482332.dl-0-123": &StreamingToken{
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
logs: map[string]*LogPosition{
"ab": &LogPosition{
@@ -46,8 +46,9 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
if !reflect.DeepEqual(got, *want) {
t.Errorf("%s mismatch: got %v want %v", tok, got, want)
}
- if got.String() != tok {
- t.Errorf("%s reserialisation mismatch: got %s want %s", tok, got.String(), tok)
+ gotStr := got.String()
+ if gotStr != tok {
+ t.Errorf("%s reserialisation mismatch: got %s want %s", tok, gotStr, tok)
}
}
}
diff --git a/sytest-whitelist b/sytest-whitelist
index 26922df4..341df8a9 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -128,6 +128,8 @@ query for user with no keys returns empty key dict
Can claim one time key using POST
Can claim remote one time key using POST
Local device key changes appear in v2 /sync
+Local device key changes appear in /keys/changes
+Get left notifs for other users in sync and /keys/changes when user leaves
Can add account data
Can add account data to room
Can get account data without syncing