diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-15 15:09:10 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-15 15:09:10 +0000 |
commit | 38318b0f162a41ac9d1aa60dc3b7693eae033851 (patch) | |
tree | 9c032fc27d8b06e1b747da410789d572ac8aa790 | |
parent | 98ebbd01e552aa00c37abbc635fca61b91c40683 (diff) |
De-map device list positions in streaming tokens (#1642)
* De-map device list positions in streaming tokens
* Fix lint error
* Tweak toOffset
-rw-r--r-- | syncapi/consumers/keychange.go | 9 | ||||
-rw-r--r-- | syncapi/internal/keychange.go | 14 | ||||
-rw-r--r-- | syncapi/internal/keychange_test.go | 8 | ||||
-rw-r--r-- | syncapi/types/types.go | 85 | ||||
-rw-r--r-- | syncapi/types/types_test.go | 30 |
5 files changed, 47 insertions, 99 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 0d82f7a5..128f6011 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" @@ -115,11 +114,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams posUpdate := types.StreamingToken{ - Logs: map[string]*types.LogPosition{ - syncinternal.DeviceListLogName: { - Offset: msg.Offset, - Partition: msg.Partition, - }, + DeviceListPosition: types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, }, } for userID := range queryRes.UserIDsToCount { diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 090e0c65..6af48a9c 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -73,15 +73,13 @@ func DeviceListCatchup( offset = sarama.OffsetOldest // Extract partition/offset from sync token // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. - logOffset := from.Log(DeviceListLogName) - if logOffset != nil { - partition = logOffset.Partition - offset = logOffset.Offset + if !from.DeviceListPosition.IsEmpty() { + partition = from.DeviceListPosition.Partition + offset = from.DeviceListPosition.Offset } var toOffset int64 toOffset = sarama.OffsetNewest - toLog := to.Log(DeviceListLogName) - if toLog != nil && toLog.Offset > 0 { + if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 { toOffset = toLog.Offset } var queryRes api.QueryKeyChangesResponse @@ -130,10 +128,10 @@ func DeviceListCatchup( } } // set the new token - to.SetLog(DeviceListLogName, &types.LogPosition{ + to.DeviceListPosition = types.LogPosition{ Partition: queryRes.Partition, Offset: queryRes.Offset, - }) + } res.NextBatch = to.String() return hasNew, nil diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index f65db0a5..9eaeda75 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -18,11 +18,9 @@ var ( syncingUser = "@alice:localhost" emptyToken = types.StreamingToken{} newestToken = types.StreamingToken{ - Logs: map[string]*types.LogPosition{ - DeviceListLogName: { - Offset: sarama.OffsetNewest, - Partition: 0, - }, + DeviceListPosition: types.LogPosition{ + Offset: sarama.OffsetNewest, + Partition: 0, }, } ) diff --git a/syncapi/types/types.go b/syncapi/types/types.go index fe76b74e..78c3a41a 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -17,7 +17,6 @@ package types import ( "encoding/json" "fmt" - "sort" "strconv" "strings" @@ -45,6 +44,10 @@ type LogPosition struct { Offset int64 } +func (p *LogPosition) IsEmpty() bool { + return p.Offset == 0 +} + // IsAfter returns true if this position is after `lp`. func (p *LogPosition) IsAfter(lp *LogPosition) bool { if lp == nil { @@ -110,22 +113,7 @@ type StreamingToken struct { TypingPosition StreamPosition ReceiptPosition StreamPosition SendToDevicePosition StreamPosition - Logs map[string]*LogPosition -} - -func (t *StreamingToken) SetLog(name string, lp *LogPosition) { - if t.Logs == nil { - t.Logs = make(map[string]*LogPosition) - } - t.Logs[name] = lp -} - -func (t *StreamingToken) Log(name string) *LogPosition { - l, ok := t.Logs[name] - if !ok { - return nil - } - return l + DeviceListPosition LogPosition } func (t StreamingToken) String() string { @@ -134,14 +122,10 @@ func (t StreamingToken) String() string { t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, ) - var logStrings []string - for name, lp := range t.Logs { - logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset) - logStrings = append(logStrings, logStr) + if dl := t.DeviceListPosition; !dl.IsEmpty() { + posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) } - sort.Strings(logStrings) - // E.g s11_22_33_44.dl0-134.ab1-441 - return strings.Join(append([]string{posStr}, logStrings...), ".") + return posStr } // IsAfter returns true if ANY position in this token is greater than `other`. @@ -155,21 +139,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.SendToDevicePosition > other.SendToDevicePosition: return true - } - for name := range t.Logs { - otherLog := other.Log(name) - if otherLog == nil { - continue - } - if t.Logs[name].IsAfter(otherLog) { - return true - } + case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): + return true } return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 && t.DeviceListPosition.IsEmpty() } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -187,15 +164,8 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) ret.ReceiptPosition = other.ReceiptPosition case other.SendToDevicePosition > 0: ret.SendToDevicePosition = other.SendToDevicePosition - } - ret.Logs = make(map[string]*LogPosition) - for name := range t.Logs { - otherLog := other.Log(name) - if otherLog == nil { - continue - } - copy := *otherLog - ret.Logs[name] = © + case other.DeviceListPosition.Offset > 0: + ret.DeviceListPosition = other.DeviceListPosition } return ret } @@ -294,30 +264,31 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { TypingPosition: positions[1], ReceiptPosition: positions[2], SendToDevicePosition: positions[3], - Logs: make(map[string]*LogPosition), } // dl-0-1234 // $log_name-$partition-$offset for _, logStr := range categories[1:] { segments := strings.Split(logStr, "-") if len(segments) != 3 { - err = fmt.Errorf("token %s - invalid log: %s", tok, logStr) + err = fmt.Errorf("invalid log position %q", logStr) return } - var partition int64 - partition, err = strconv.ParseInt(segments[1], 10, 32) - if err != nil { + switch segments[0] { + case "dl": + // Device list syncing + var partition, offset int + if partition, err = strconv.Atoi(segments[1]); err != nil { + return + } + if offset, err = strconv.Atoi(segments[2]); err != nil { + return + } + token.DeviceListPosition.Partition = int32(partition) + token.DeviceListPosition.Offset = int64(offset) + default: + err = fmt.Errorf("unrecognised token type %q", segments[0]) return } - var offset int64 - offset, err = strconv.ParseInt(segments[2], 10, 64) - if err != nil { - return - } - token.Logs[segments[0]] = &LogPosition{ - Partition: int32(partition), - Offset: offset, - } } return token, nil } diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 15079188..ecb0ab6f 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -12,28 +12,12 @@ func TestNewSyncTokenWithLogs(t *testing.T) { tests := map[string]*StreamingToken{ "s4_0_0_0": { PDUPosition: 4, - Logs: make(map[string]*LogPosition), }, "s4_0_0_0.dl-0-123": { PDUPosition: 4, - Logs: map[string]*LogPosition{ - "dl": { - Partition: 0, - Offset: 123, - }, - }, - }, - "s4_0_0_0.ab-1-14419482332.dl-0-123": { - PDUPosition: 4, - Logs: map[string]*LogPosition{ - "ab": { - Partition: 1, - Offset: 14419482332, - }, - "dl": { - Partition: 0, - Offset: 123, - }, + DeviceListPosition: LogPosition{ + Partition: 0, + Offset: 123, }, }, } @@ -58,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) { func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0": StreamingToken{4, 0, 0, 0, nil}.String(), - "s3_1_0_0": StreamingToken{3, 1, 0, 0, nil}.String(), - "s3_1_2_3": StreamingToken{3, 1, 2, 3, nil}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0": StreamingToken{4, 0, 0, 0, LogPosition{}}.String(), + "s3_1_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, LogPosition{1, 2}}.String(), + "s3_1_2_3": StreamingToken{3, 1, 2, 3, LogPosition{}}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass { |