aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-15 15:09:10 +0000
committerGitHub <noreply@github.com>2020-12-15 15:09:10 +0000
commit38318b0f162a41ac9d1aa60dc3b7693eae033851 (patch)
tree9c032fc27d8b06e1b747da410789d572ac8aa790
parent98ebbd01e552aa00c37abbc635fca61b91c40683 (diff)
De-map device list positions in streaming tokens (#1642)
* De-map device list positions in streaming tokens * Fix lint error * Tweak toOffset
-rw-r--r--syncapi/consumers/keychange.go9
-rw-r--r--syncapi/internal/keychange.go14
-rw-r--r--syncapi/internal/keychange_test.go8
-rw-r--r--syncapi/types/types.go85
-rw-r--r--syncapi/types/types_test.go30
5 files changed, 47 insertions, 99 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 0d82f7a5..128f6011 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
- syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -115,11 +114,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.StreamingToken{
- Logs: map[string]*types.LogPosition{
- syncinternal.DeviceListLogName: {
- Offset: msg.Offset,
- Partition: msg.Partition,
- },
+ DeviceListPosition: types.LogPosition{
+ Offset: msg.Offset,
+ Partition: msg.Partition,
},
}
for userID := range queryRes.UserIDsToCount {
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 090e0c65..6af48a9c 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -73,15 +73,13 @@ func DeviceListCatchup(
offset = sarama.OffsetOldest
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
- logOffset := from.Log(DeviceListLogName)
- if logOffset != nil {
- partition = logOffset.Partition
- offset = logOffset.Offset
+ if !from.DeviceListPosition.IsEmpty() {
+ partition = from.DeviceListPosition.Partition
+ offset = from.DeviceListPosition.Offset
}
var toOffset int64
toOffset = sarama.OffsetNewest
- toLog := to.Log(DeviceListLogName)
- if toLog != nil && toLog.Offset > 0 {
+ if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 {
toOffset = toLog.Offset
}
var queryRes api.QueryKeyChangesResponse
@@ -130,10 +128,10 @@ func DeviceListCatchup(
}
}
// set the new token
- to.SetLog(DeviceListLogName, &types.LogPosition{
+ to.DeviceListPosition = types.LogPosition{
Partition: queryRes.Partition,
Offset: queryRes.Offset,
- })
+ }
res.NextBatch = to.String()
return hasNew, nil
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index f65db0a5..9eaeda75 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -18,11 +18,9 @@ var (
syncingUser = "@alice:localhost"
emptyToken = types.StreamingToken{}
newestToken = types.StreamingToken{
- Logs: map[string]*types.LogPosition{
- DeviceListLogName: {
- Offset: sarama.OffsetNewest,
- Partition: 0,
- },
+ DeviceListPosition: types.LogPosition{
+ Offset: sarama.OffsetNewest,
+ Partition: 0,
},
}
)
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index fe76b74e..78c3a41a 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -17,7 +17,6 @@ package types
import (
"encoding/json"
"fmt"
- "sort"
"strconv"
"strings"
@@ -45,6 +44,10 @@ type LogPosition struct {
Offset int64
}
+func (p *LogPosition) IsEmpty() bool {
+ return p.Offset == 0
+}
+
// IsAfter returns true if this position is after `lp`.
func (p *LogPosition) IsAfter(lp *LogPosition) bool {
if lp == nil {
@@ -110,22 +113,7 @@ type StreamingToken struct {
TypingPosition StreamPosition
ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition
- Logs map[string]*LogPosition
-}
-
-func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
- if t.Logs == nil {
- t.Logs = make(map[string]*LogPosition)
- }
- t.Logs[name] = lp
-}
-
-func (t *StreamingToken) Log(name string) *LogPosition {
- l, ok := t.Logs[name]
- if !ok {
- return nil
- }
- return l
+ DeviceListPosition LogPosition
}
func (t StreamingToken) String() string {
@@ -134,14 +122,10 @@ func (t StreamingToken) String() string {
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
)
- var logStrings []string
- for name, lp := range t.Logs {
- logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
- logStrings = append(logStrings, logStr)
+ if dl := t.DeviceListPosition; !dl.IsEmpty() {
+ posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
}
- sort.Strings(logStrings)
- // E.g s11_22_33_44.dl0-134.ab1-441
- return strings.Join(append([]string{posStr}, logStrings...), ".")
+ return posStr
}
// IsAfter returns true if ANY position in this token is greater than `other`.
@@ -155,21 +139,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.SendToDevicePosition > other.SendToDevicePosition:
return true
- }
- for name := range t.Logs {
- otherLog := other.Log(name)
- if otherLog == nil {
- continue
- }
- if t.Logs[name].IsAfter(otherLog) {
- return true
- }
+ case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
+ return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
- return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0
+ return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 && t.DeviceListPosition.IsEmpty()
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@@ -187,15 +164,8 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
ret.ReceiptPosition = other.ReceiptPosition
case other.SendToDevicePosition > 0:
ret.SendToDevicePosition = other.SendToDevicePosition
- }
- ret.Logs = make(map[string]*LogPosition)
- for name := range t.Logs {
- otherLog := other.Log(name)
- if otherLog == nil {
- continue
- }
- copy := *otherLog
- ret.Logs[name] = &copy
+ case other.DeviceListPosition.Offset > 0:
+ ret.DeviceListPosition = other.DeviceListPosition
}
return ret
}
@@ -294,30 +264,31 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
TypingPosition: positions[1],
ReceiptPosition: positions[2],
SendToDevicePosition: positions[3],
- Logs: make(map[string]*LogPosition),
}
// dl-0-1234
// $log_name-$partition-$offset
for _, logStr := range categories[1:] {
segments := strings.Split(logStr, "-")
if len(segments) != 3 {
- err = fmt.Errorf("token %s - invalid log: %s", tok, logStr)
+ err = fmt.Errorf("invalid log position %q", logStr)
return
}
- var partition int64
- partition, err = strconv.ParseInt(segments[1], 10, 32)
- if err != nil {
+ switch segments[0] {
+ case "dl":
+ // Device list syncing
+ var partition, offset int
+ if partition, err = strconv.Atoi(segments[1]); err != nil {
+ return
+ }
+ if offset, err = strconv.Atoi(segments[2]); err != nil {
+ return
+ }
+ token.DeviceListPosition.Partition = int32(partition)
+ token.DeviceListPosition.Offset = int64(offset)
+ default:
+ err = fmt.Errorf("unrecognised token type %q", segments[0])
return
}
- var offset int64
- offset, err = strconv.ParseInt(segments[2], 10, 64)
- if err != nil {
- return
- }
- token.Logs[segments[0]] = &LogPosition{
- Partition: int32(partition),
- Offset: offset,
- }
}
return token, nil
}
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index 15079188..ecb0ab6f 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -12,28 +12,12 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{
"s4_0_0_0": {
PDUPosition: 4,
- Logs: make(map[string]*LogPosition),
},
"s4_0_0_0.dl-0-123": {
PDUPosition: 4,
- Logs: map[string]*LogPosition{
- "dl": {
- Partition: 0,
- Offset: 123,
- },
- },
- },
- "s4_0_0_0.ab-1-14419482332.dl-0-123": {
- PDUPosition: 4,
- Logs: map[string]*LogPosition{
- "ab": {
- Partition: 1,
- Offset: 14419482332,
- },
- "dl": {
- Partition: 0,
- Offset: 123,
- },
+ DeviceListPosition: LogPosition{
+ Partition: 0,
+ Offset: 123,
},
},
}
@@ -58,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
- "s4_0_0_0": StreamingToken{4, 0, 0, 0, nil}.String(),
- "s3_1_0_0": StreamingToken{3, 1, 0, 0, nil}.String(),
- "s3_1_2_3": StreamingToken{3, 1, 2, 3, nil}.String(),
- "t3_1": TopologyToken{3, 1}.String(),
+ "s4_0_0_0": StreamingToken{4, 0, 0, 0, LogPosition{}}.String(),
+ "s3_1_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, LogPosition{1, 2}}.String(),
+ "s3_1_2_3": StreamingToken{3, 1, 2, 3, LogPosition{}}.String(),
+ "t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {