diff options
author | Kegsay <kegan@matrix.org> | 2020-07-29 19:00:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-29 19:00:04 +0100 |
commit | 0fdd4f14d123e76bd3d0368947d3aab84a787946 (patch) | |
tree | 024b7d526095bfe2a64d46d1eba4ca4e0c3b388b /syncapi/types/types.go | |
parent | 9a5fb489c5f80148a8512e61c95c8df7bb46d314 (diff) |
Add support for logs in StreamingToken (#1229)
* Add support for logs in StreamingToken
Tokens now end up looking like `s11_22|dl-0-123|ab-0-12224`
where `dl` and `ab` are log names, `0` is the partition and
`123` and `12224` are the offsets.
* Also test reserialisation
* s/|/./g so tokens url escape nicely
Diffstat (limited to 'syncapi/types/types.go')
-rw-r--r-- | syncapi/types/types.go | 105 |
1 files changed, 94 insertions, 11 deletions
diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 7dc02281..7bba8e52 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -39,6 +39,23 @@ var ( // StreamPosition represents the offset in the sync stream a client is at. type StreamPosition int64 +// LogPosition represents the offset in a Kafka log a client is at. +type LogPosition struct { + Partition int32 + Offset int64 +} + +// IsAfter returns true if this position is after `lp`. +func (p *LogPosition) IsAfter(lp *LogPosition) bool { + if lp == nil { + return false + } + if p.Partition != lp.Partition { + return false + } + return p.Offset > lp.Offset +} + // StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event. type StreamEvent struct { gomatrixserverlib.HeaderedEvent @@ -90,6 +107,15 @@ const ( type StreamingToken struct { syncToken + logs map[string]*LogPosition +} + +func (t *StreamingToken) Log(name string) *LogPosition { + l, ok := t.logs[name] + if !ok { + return nil + } + return l } func (t *StreamingToken) PDUPosition() StreamPosition { @@ -99,7 +125,15 @@ func (t *StreamingToken) EDUPosition() StreamPosition { return t.Positions[1] } func (t *StreamingToken) String() string { - return t.syncToken.String() + logStrings := []string{ + t.syncToken.String(), + } + for name, lp := range t.logs { + logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset) + logStrings = append(logStrings, logStr) + } + // E.g s11_22_33.dl0-134.ab1-441 + return strings.Join(logStrings, ".") } // IsAfter returns true if ANY position in this token is greater than `other`. @@ -109,12 +143,22 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true } } + for name := range t.logs { + otherLog := other.Log(name) + if otherLog == nil { + continue + } + if t.logs[name].IsAfter(otherLog) { + return true + } + } return false } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // If the latter StreamingToken contains a field that is not 0, it is considered an update, // and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called. +// If the other token has a log, they will replace any existing log on this token. func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) { ret.Type = t.Type ret.Positions = make([]StreamPosition, len(t.Positions)) @@ -125,6 +169,13 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) } ret.Positions[i] = other.Positions[i] } + for name := range t.logs { + otherLog := other.Log(name) + if otherLog == nil { + continue + } + t.logs[name] = otherLog + } return ret } @@ -139,7 +190,7 @@ func (t *TopologyToken) PDUPosition() StreamPosition { return t.Positions[1] } func (t *TopologyToken) StreamToken() StreamingToken { - return NewStreamToken(t.PDUPosition(), 0) + return NewStreamToken(t.PDUPosition(), 0, nil) } func (t *TopologyToken) String() string { return t.syncToken.String() @@ -174,9 +225,9 @@ func (t *TopologyToken) Decrement() { // error if the token couldn't be parsed into an int64, or if the token type // isn't a known type (returns ErrInvalidSyncTokenType in the latter // case). -func newSyncTokenFromString(s string) (token *syncToken, err error) { +func newSyncTokenFromString(s string) (token *syncToken, categories []string, err error) { if len(s) == 0 { - return nil, ErrInvalidSyncTokenLen + return nil, nil, ErrInvalidSyncTokenLen } token = new(syncToken) @@ -185,16 +236,17 @@ func newSyncTokenFromString(s string) (token *syncToken, err error) { switch t := SyncTokenType(s[:1]); t { case SyncTokenTypeStream, SyncTokenTypeTopology: token.Type = t - positions = strings.Split(s[1:], "_") + categories = strings.Split(s[1:], ".") + positions = strings.Split(categories[0], "_") default: - return nil, ErrInvalidSyncTokenType + return nil, nil, ErrInvalidSyncTokenType } for _, pos := range positions { if posInt, err := strconv.ParseInt(pos, 10, 64); err != nil { - return nil, err + return nil, nil, err } else if posInt < 0 { - return nil, errors.New("negative position not allowed") + return nil, nil, errors.New("negative position not allowed") } else { token.Positions = append(token.Positions, StreamPosition(posInt)) } @@ -215,7 +267,7 @@ func NewTopologyToken(depth, streamPos StreamPosition) TopologyToken { } } func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { - t, err := newSyncTokenFromString(tok) + t, _, err := newSyncTokenFromString(tok) if err != nil { return } @@ -233,16 +285,20 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { } // NewStreamToken creates a new sync token for /sync -func NewStreamToken(pduPos, eduPos StreamPosition) StreamingToken { +func NewStreamToken(pduPos, eduPos StreamPosition, logs map[string]*LogPosition) StreamingToken { + if logs == nil { + logs = make(map[string]*LogPosition) + } return StreamingToken{ syncToken: syncToken{ Type: SyncTokenTypeStream, Positions: []StreamPosition{pduPos, eduPos}, }, + logs: logs, } } func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { - t, err := newSyncTokenFromString(tok) + t, categories, err := newSyncTokenFromString(tok) if err != nil { return } @@ -254,8 +310,35 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions)) return } + logs := make(map[string]*LogPosition) + if len(categories) > 1 { + // dl-0-1234 + // $log_name-$partition-$offset + for _, logStr := range categories[1:] { + segments := strings.Split(logStr, "-") + if len(segments) != 3 { + err = fmt.Errorf("token %s - invalid log: %s", tok, logStr) + return + } + var partition int64 + partition, err = strconv.ParseInt(segments[1], 10, 32) + if err != nil { + return + } + var offset int64 + offset, err = strconv.ParseInt(segments[2], 10, 64) + if err != nil { + return + } + logs[segments[0]] = &LogPosition{ + Partition: int32(partition), + Offset: offset, + } + } + } return StreamingToken{ syncToken: *t, + logs: logs, }, nil } |