aboutsummaryrefslogtreecommitdiff
path: root/syncapi/internal
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-08 16:59:06 +0000
committerGitHub <noreply@github.com>2021-01-08 16:59:06 +0000
commitb5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch)
treeb3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/internal
parent56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff)
Sync refactor — Part 1 (#1688)
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
Diffstat (limited to 'syncapi/internal')
-rw-r--r--syncapi/internal/keychange.go21
-rw-r--r--syncapi/internal/keychange_test.go24
2 files changed, 21 insertions, 24 deletions
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 3f901f49..e980437e 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -49,8 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
- userID string, res *types.Response, from, to types.StreamingToken,
-) (hasNew bool, err error) {
+ userID string, res *types.Response, from, to types.LogPosition,
+) (newPos types.LogPosition, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
@@ -58,7 +58,7 @@ func DeviceListCatchup(
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
- return false, err
+ return to, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
@@ -73,13 +73,13 @@ func DeviceListCatchup(
offset = sarama.OffsetOldest
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
- if !from.DeviceListPosition.IsEmpty() {
- partition = from.DeviceListPosition.Partition
- offset = from.DeviceListPosition.Offset
+ if !from.IsEmpty() {
+ partition = from.Partition
+ offset = from.Offset
}
var toOffset int64
toOffset = sarama.OffsetNewest
- if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 {
+ if toLog := to; toLog.Partition == partition && toLog.Offset > 0 {
toOffset = toLog.Offset
}
var queryRes api.QueryKeyChangesResponse
@@ -91,7 +91,7 @@ func DeviceListCatchup(
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
- return hasNew, nil
+ return to, hasNew, nil
}
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
var sharedUsersMap map[string]int
@@ -128,13 +128,12 @@ func DeviceListCatchup(
}
}
// set the new token
- to.DeviceListPosition = types.LogPosition{
+ to = types.LogPosition{
Partition: queryRes.Partition,
Offset: queryRes.Offset,
}
- res.NextBatch.ApplyUpdates(to)
- return hasNew, nil
+ return to, hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index 9eaeda75..44c4a4dd 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -16,12 +16,10 @@ import (
var (
syncingUser = "@alice:localhost"
- emptyToken = types.StreamingToken{}
- newestToken = types.StreamingToken{
- DeviceListPosition: types.LogPosition{
- Offset: sarama.OffsetNewest,
- Partition: 0,
- },
+ emptyToken = types.LogPosition{}
+ newestToken = types.LogPosition{
+ Offset: sarama.OffsetNewest,
+ Partition: 0,
}
)
@@ -180,7 +178,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -203,7 +201,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -226,7 +224,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -248,7 +246,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -307,7 +305,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -335,7 +333,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -420,7 +418,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
"!another:room": {syncingUser},
},
}
- hasNew, err := DeviceListCatchup(
+ _, hasNew, err := DeviceListCatchup(
context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
)
if err != nil {