aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-08-07 17:32:13 +0100
committerGitHub <noreply@github.com>2020-08-07 17:32:13 +0100
commitf371783da765f96fc3764091e95fb8cb8004e208 (patch)
treedd1748eca0719054be508f2f026cb0e314d2565f /syncapi
parent30c2325eaf85f28f438f9a3c7b703978eee66cf7 (diff)
Finish inbound E2E device lists (#1243)
* Add tests for device list updates * Add stale_device_lists table and use db before asking remote for device keys * Fetch remote keys if all devices are requested * Add display_name col to store remote device names Few other tweaks to make `Server correctly handles incoming m.device_list_update` pass. * Fix sqlite otk bug * Unbuffered channel to block /send causing sytest to not race anymore * Linting and fix bug whereby we didn't send updated dl tokens to the client causing a tightloop on /sync sometimes * No longer assert staleness as Update blocks on workers now * Back out tweaks * Bugfixes
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/internal/keychange.go22
-rw-r--r--syncapi/types/types.go7
2 files changed, 22 insertions, 7 deletions
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 66134d79..e0379aaf 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -46,6 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information.
+// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
@@ -68,22 +69,20 @@ func DeviceListCatchup(
var partition int32
var offset int64
+ partition = -1
+ offset = sarama.OffsetOldest
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := from.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
- } else {
- partition = -1
- offset = sarama.OffsetOldest
}
var toOffset int64
+ toOffset = sarama.OffsetNewest
toLog := to.Log(DeviceListLogName)
- if toLog != nil {
+ if toLog != nil && toLog.Offset > 0 {
toOffset = toLog.Offset
- } else {
- toOffset = sarama.OffsetNewest
}
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
@@ -96,6 +95,10 @@ func DeviceListCatchup(
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return hasNew, nil
}
+ util.GetLogger(ctx).Debugf(
+ "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
+ partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
+ )
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
@@ -116,6 +119,13 @@ func DeviceListCatchup(
userSet[userID] = true
}
}
+ // set the new token
+ to.SetLog(DeviceListLogName, &types.LogPosition{
+ Partition: queryRes.Partition,
+ Offset: queryRes.Offset,
+ })
+ res.NextBatch = to.String()
+
return hasNew, nil
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index f465d9ff..f3324800 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -112,6 +112,9 @@ type StreamingToken struct {
}
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
+ if t.logs == nil {
+ t.logs = make(map[string]*LogPosition)
+ }
t.logs[name] = lp
}
@@ -173,12 +176,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
}
ret.Positions[i] = other.Positions[i]
}
+ ret.logs = make(map[string]*LogPosition)
for name := range t.logs {
otherLog := other.Log(name)
if otherLog == nil {
continue
}
- t.logs[name] = otherLog
+ copy := *otherLog
+ ret.logs[name] = &copy
}
return ret
}