aboutsummaryrefslogtreecommitdiff
path: root/syncapi/types/types.go
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-29 19:00:04 +0100
committerGitHub <noreply@github.com>2020-07-29 19:00:04 +0100
commit0fdd4f14d123e76bd3d0368947d3aab84a787946 (patch)
tree024b7d526095bfe2a64d46d1eba4ca4e0c3b388b /syncapi/types/types.go
parent9a5fb489c5f80148a8512e61c95c8df7bb46d314 (diff)
Add support for logs in StreamingToken (#1229)
* Add support for logs in StreamingToken Tokens now end up looking like `s11_22|dl-0-123|ab-0-12224` where `dl` and `ab` are log names, `0` is the partition and `123` and `12224` are the offsets. * Also test reserialisation * s/|/./g so tokens url escape nicely
Diffstat (limited to 'syncapi/types/types.go')
-rw-r--r--syncapi/types/types.go105
1 files changed, 94 insertions, 11 deletions
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 7dc02281..7bba8e52 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -39,6 +39,23 @@ var (
// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64
+// LogPosition represents the offset in a Kafka log a client is at.
+type LogPosition struct {
+ Partition int32
+ Offset int64
+}
+
+// IsAfter returns true if this position is after `lp`.
+func (p *LogPosition) IsAfter(lp *LogPosition) bool {
+ if lp == nil {
+ return false
+ }
+ if p.Partition != lp.Partition {
+ return false
+ }
+ return p.Offset > lp.Offset
+}
+
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct {
gomatrixserverlib.HeaderedEvent
@@ -90,6 +107,15 @@ const (
type StreamingToken struct {
syncToken
+ logs map[string]*LogPosition
+}
+
+func (t *StreamingToken) Log(name string) *LogPosition {
+ l, ok := t.logs[name]
+ if !ok {
+ return nil
+ }
+ return l
}
func (t *StreamingToken) PDUPosition() StreamPosition {
@@ -99,7 +125,15 @@ func (t *StreamingToken) EDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *StreamingToken) String() string {
- return t.syncToken.String()
+ logStrings := []string{
+ t.syncToken.String(),
+ }
+ for name, lp := range t.logs {
+ logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
+ logStrings = append(logStrings, logStr)
+ }
+ // E.g s11_22_33.dl0-134.ab1-441
+ return strings.Join(logStrings, ".")
}
// IsAfter returns true if ANY position in this token is greater than `other`.
@@ -109,12 +143,22 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
}
}
+ for name := range t.logs {
+ otherLog := other.Log(name)
+ if otherLog == nil {
+ continue
+ }
+ if t.logs[name].IsAfter(otherLog) {
+ return true
+ }
+ }
return false
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
// If the latter StreamingToken contains a field that is not 0, it is considered an update,
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
+// If the other token has a log, they will replace any existing log on this token.
func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) {
ret.Type = t.Type
ret.Positions = make([]StreamPosition, len(t.Positions))
@@ -125,6 +169,13 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
}
ret.Positions[i] = other.Positions[i]
}
+ for name := range t.logs {
+ otherLog := other.Log(name)
+ if otherLog == nil {
+ continue
+ }
+ t.logs[name] = otherLog
+ }
return ret
}
@@ -139,7 +190,7 @@ func (t *TopologyToken) PDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *TopologyToken) StreamToken() StreamingToken {
- return NewStreamToken(t.PDUPosition(), 0)
+ return NewStreamToken(t.PDUPosition(), 0, nil)
}
func (t *TopologyToken) String() string {
return t.syncToken.String()
@@ -174,9 +225,9 @@ func (t *TopologyToken) Decrement() {
// error if the token couldn't be parsed into an int64, or if the token type
// isn't a known type (returns ErrInvalidSyncTokenType in the latter
// case).
-func newSyncTokenFromString(s string) (token *syncToken, err error) {
+func newSyncTokenFromString(s string) (token *syncToken, categories []string, err error) {
if len(s) == 0 {
- return nil, ErrInvalidSyncTokenLen
+ return nil, nil, ErrInvalidSyncTokenLen
}
token = new(syncToken)
@@ -185,16 +236,17 @@ func newSyncTokenFromString(s string) (token *syncToken, err error) {
switch t := SyncTokenType(s[:1]); t {
case SyncTokenTypeStream, SyncTokenTypeTopology:
token.Type = t
- positions = strings.Split(s[1:], "_")
+ categories = strings.Split(s[1:], ".")
+ positions = strings.Split(categories[0], "_")
default:
- return nil, ErrInvalidSyncTokenType
+ return nil, nil, ErrInvalidSyncTokenType
}
for _, pos := range positions {
if posInt, err := strconv.ParseInt(pos, 10, 64); err != nil {
- return nil, err
+ return nil, nil, err
} else if posInt < 0 {
- return nil, errors.New("negative position not allowed")
+ return nil, nil, errors.New("negative position not allowed")
} else {
token.Positions = append(token.Positions, StreamPosition(posInt))
}
@@ -215,7 +267,7 @@ func NewTopologyToken(depth, streamPos StreamPosition) TopologyToken {
}
}
func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
- t, err := newSyncTokenFromString(tok)
+ t, _, err := newSyncTokenFromString(tok)
if err != nil {
return
}
@@ -233,16 +285,20 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
}
// NewStreamToken creates a new sync token for /sync
-func NewStreamToken(pduPos, eduPos StreamPosition) StreamingToken {
+func NewStreamToken(pduPos, eduPos StreamPosition, logs map[string]*LogPosition) StreamingToken {
+ if logs == nil {
+ logs = make(map[string]*LogPosition)
+ }
return StreamingToken{
syncToken: syncToken{
Type: SyncTokenTypeStream,
Positions: []StreamPosition{pduPos, eduPos},
},
+ logs: logs,
}
}
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
- t, err := newSyncTokenFromString(tok)
+ t, categories, err := newSyncTokenFromString(tok)
if err != nil {
return
}
@@ -254,8 +310,35 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
return
}
+ logs := make(map[string]*LogPosition)
+ if len(categories) > 1 {
+ // dl-0-1234
+ // $log_name-$partition-$offset
+ for _, logStr := range categories[1:] {
+ segments := strings.Split(logStr, "-")
+ if len(segments) != 3 {
+ err = fmt.Errorf("token %s - invalid log: %s", tok, logStr)
+ return
+ }
+ var partition int64
+ partition, err = strconv.ParseInt(segments[1], 10, 32)
+ if err != nil {
+ return
+ }
+ var offset int64
+ offset, err = strconv.ParseInt(segments[2], 10, 64)
+ if err != nil {
+ return
+ }
+ logs[segments[0]] = &LogPosition{
+ Partition: int32(partition),
+ Offset: offset,
+ }
+ }
+ }
return StreamingToken{
syncToken: *t,
+ logs: logs,
}, nil
}