From f371783da765f96fc3764091e95fb8cb8004e208 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 7 Aug 2020 17:32:13 +0100 Subject: Finish inbound E2E device lists (#1243) * Add tests for device list updates * Add stale_device_lists table and use db before asking remote for device keys * Fetch remote keys if all devices are requested * Add display_name col to store remote device names Few other tweaks to make `Server correctly handles incoming m.device_list_update` pass. * Fix sqlite otk bug * Unbuffered channel to block /send causing sytest to not race anymore * Linting and fix bug whereby we didn't send updated dl tokens to the client causing a tightloop on /sync sometimes * No longer assert staleness as Update blocks on workers now * Back out tweaks * Bugfixes --- syncapi/internal/keychange.go | 22 ++++++++++++++++------ syncapi/types/types.go | 7 ++++++- 2 files changed, 22 insertions(+), 7 deletions(-) (limited to 'syncapi') diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 66134d79..e0379aaf 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -46,6 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. +// nolint:gocyclo func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, res *types.Response, from, to types.StreamingToken, @@ -68,22 +69,20 @@ func DeviceListCatchup( var partition int32 var offset int64 + partition = -1 + offset = sarama.OffsetOldest // Extract partition/offset from sync token // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. logOffset := from.Log(DeviceListLogName) if logOffset != nil { partition = logOffset.Partition offset = logOffset.Offset - } else { - partition = -1 - offset = sarama.OffsetOldest } var toOffset int64 + toOffset = sarama.OffsetNewest toLog := to.Log(DeviceListLogName) - if toLog != nil { + if toLog != nil && toLog.Offset > 0 { toOffset = toLog.Offset - } else { - toOffset = sarama.OffsetNewest } var queryRes api.QueryKeyChangesResponse keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ @@ -96,6 +95,10 @@ func DeviceListCatchup( util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") return hasNew, nil } + util.GetLogger(ctx).Debugf( + "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", + partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, + ) userSet := make(map[string]bool) for _, userID := range res.DeviceLists.Changed { userSet[userID] = true @@ -116,6 +119,13 @@ func DeviceListCatchup( userSet[userID] = true } } + // set the new token + to.SetLog(DeviceListLogName, &types.LogPosition{ + Partition: queryRes.Partition, + Offset: queryRes.Offset, + }) + res.NextBatch = to.String() + return hasNew, nil } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index f465d9ff..f3324800 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -112,6 +112,9 @@ type StreamingToken struct { } func (t *StreamingToken) SetLog(name string, lp *LogPosition) { + if t.logs == nil { + t.logs = make(map[string]*LogPosition) + } t.logs[name] = lp } @@ -173,12 +176,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) } ret.Positions[i] = other.Positions[i] } + ret.logs = make(map[string]*LogPosition) for name := range t.logs { otherLog := other.Log(name) if otherLog == nil { continue } - t.logs[name] = otherLog + copy := *otherLog + ret.logs[name] = © } return ret } -- cgit v1.2.3