diff options
author | Kegsay <kegan@matrix.org> | 2020-08-07 17:32:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-07 17:32:13 +0100 |
commit | f371783da765f96fc3764091e95fb8cb8004e208 (patch) | |
tree | dd1748eca0719054be508f2f026cb0e314d2565f /syncapi | |
parent | 30c2325eaf85f28f438f9a3c7b703978eee66cf7 (diff) |
Finish inbound E2E device lists (#1243)
* Add tests for device list updates
* Add stale_device_lists table and use db before asking remote for device keys
* Fetch remote keys if all devices are requested
* Add display_name col to store remote device names
Few other tweaks to make `Server correctly handles incoming m.device_list_update`
pass.
* Fix sqlite otk bug
* Unbuffered channel to block /send causing sytest to not race anymore
* Linting and fix bug whereby we didn't send updated dl tokens to the client causing a tightloop on /sync sometimes
* No longer assert staleness as Update blocks on workers now
* Back out tweaks
* Bugfixes
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/internal/keychange.go | 22 | ||||
-rw-r--r-- | syncapi/types/types.go | 7 |
2 files changed, 22 insertions, 7 deletions
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 66134d79..e0379aaf 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -46,6 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. +// nolint:gocyclo func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, res *types.Response, from, to types.StreamingToken, @@ -68,22 +69,20 @@ func DeviceListCatchup( var partition int32 var offset int64 + partition = -1 + offset = sarama.OffsetOldest // Extract partition/offset from sync token // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. logOffset := from.Log(DeviceListLogName) if logOffset != nil { partition = logOffset.Partition offset = logOffset.Offset - } else { - partition = -1 - offset = sarama.OffsetOldest } var toOffset int64 + toOffset = sarama.OffsetNewest toLog := to.Log(DeviceListLogName) - if toLog != nil { + if toLog != nil && toLog.Offset > 0 { toOffset = toLog.Offset - } else { - toOffset = sarama.OffsetNewest } var queryRes api.QueryKeyChangesResponse keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ @@ -96,6 +95,10 @@ func DeviceListCatchup( util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") return hasNew, nil } + util.GetLogger(ctx).Debugf( + "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", + partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, + ) userSet := make(map[string]bool) for _, userID := range res.DeviceLists.Changed { userSet[userID] = true @@ -116,6 +119,13 @@ func DeviceListCatchup( userSet[userID] = true } } + // set the new token + to.SetLog(DeviceListLogName, &types.LogPosition{ + Partition: queryRes.Partition, + Offset: queryRes.Offset, + }) + res.NextBatch = to.String() + return hasNew, nil } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index f465d9ff..f3324800 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -112,6 +112,9 @@ type StreamingToken struct { } func (t *StreamingToken) SetLog(name string, lp *LogPosition) { + if t.logs == nil { + t.logs = make(map[string]*LogPosition) + } t.logs[name] = lp } @@ -173,12 +176,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) } ret.Positions[i] = other.Positions[i] } + ret.logs = make(map[string]*LogPosition) for name := range t.logs { otherLog := other.Log(name) if otherLog == nil { continue } - t.logs[name] = otherLog + copy := *otherLog + ret.logs[name] = © } return ret } |