aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2022-01-20 15:26:45 +0000
committerGitHub <noreply@github.com>2022-01-20 15:26:45 +0000
commitdb7d9cba8ad28c300dd66c01b9b0ce2414e8cbe0 (patch)
treead91c627b4ce44490bd484c53a23f343ee49041e
parent16035b97373849d74961e15616f3f1449f0a5abd (diff)
BREAKING: Remove Partitioned Stream Positions (#2096)
* go mod tidy * Break complement to check it fails CI * Remove partitioned stream positions This was used by the device list stream position. The device list position now corresponds to the `Offset`, and the partition is always 0, in prep for removing reliance on Kafka topics for device list changes. * Linting * Migrate old style tokens to new style because element-web doesn't soft-logoout on 4xx errors on /sync
-rw-r--r--go.mod2
-rw-r--r--keyserver/api/api.go4
-rw-r--r--keyserver/internal/internal.go7
-rw-r--r--keyserver/producers/keychange.go9
-rw-r--r--syncapi/consumers/keychange.go22
-rw-r--r--syncapi/internal/keychange.go40
-rw-r--r--syncapi/internal/keychange_test.go21
-rw-r--r--syncapi/streams/stream_devicelist.go8
-rw-r--r--syncapi/streams/streams.go8
-rw-r--r--syncapi/streams/template_pstream.go38
-rw-r--r--syncapi/sync/requestpool.go6
-rw-r--r--syncapi/types/provider.go8
-rw-r--r--syncapi/types/types.go87
-rw-r--r--syncapi/types/types_test.go41
14 files changed, 70 insertions, 231 deletions
diff --git a/go.mod b/go.mod
index eb421ce1..e6202f66 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,7 @@ require (
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
- github.com/MFAshby/stdemuxerhook v1.0.0 // indirect
+ github.com/MFAshby/stdemuxerhook v1.0.0
github.com/Masterminds/semver/v3 v3.1.1
github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32
github.com/Shopify/sarama v1.29.0
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index 5a109cc6..eae14ae1 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -224,8 +224,6 @@ type QueryKeysResponse struct {
}
type QueryKeyChangesRequest struct {
- // The partition which had key events sent to
- Partition int32
// The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
Offset int64
// The inclusive offset where to track key changes up to. Messages with this offset are included in the response.
@@ -236,8 +234,6 @@ type QueryKeyChangesRequest struct {
type QueryKeyChangesResponse struct {
// The set of users who have had their keys change.
UserIDs []string
- // The partition being served - useful if the partition is unknown at request time
- Partition int32
// The latest offset represented in this response.
Offset int64
// Set if there was a problem handling the request.
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index 3e91962e..0140a8f4 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -59,17 +59,14 @@ func (a *KeyInternalAPI) InputDeviceListUpdate(
}
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
- if req.Partition < 0 {
- req.Partition = a.Producer.DefaultPartition()
- }
- userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset, req.ToOffset)
+ partition := 0
+ userIDs, latest, err := a.DB.KeyChanges(ctx, int32(partition), req.Offset, req.ToOffset)
if err != nil {
res.Error = &api.KeyError{
Err: err.Error(),
}
}
res.Offset = latest
- res.Partition = req.Partition
res.UserIDs = userIDs
}
diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go
index 782675c2..c5ddf2c1 100644
--- a/keyserver/producers/keychange.go
+++ b/keyserver/producers/keychange.go
@@ -32,15 +32,6 @@ type KeyChange struct {
DB storage.Database
}
-// DefaultPartition returns the default partition this process is sending key changes to.
-// NB: A keyserver MUST send key changes to only 1 partition or else query operations will
-// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but
-// then all keyservers must be queried to calculate the entire set of key changes between
-// two sync tokens.
-func (p *KeyChange) DefaultPartition() int32 {
- return 0
-}
-
// ProduceKeyChanges creates new change events for each key
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
userToDeviceCount := make(map[string]int)
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 76b143d8..d63e4832 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -38,7 +38,7 @@ type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
notifier *notifier.Notifier
- stream types.PartitionedStreamProvider
+ stream types.StreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
@@ -57,7 +57,7 @@ func NewOutputKeyChangeEventConsumer(
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
- stream types.PartitionedStreamProvider,
+ stream types.StreamProvider,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
@@ -118,15 +118,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
switch m.Type {
case api.TypeCrossSigningUpdate:
- return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)
+ return s.onCrossSigningMessage(m, msg.Offset)
case api.TypeDeviceKeyUpdate:
fallthrough
default:
- return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition)
+ return s.onDeviceKeyMessage(m, msg.Offset)
}
}
-func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error {
+func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64) error {
if m.DeviceKeys == nil {
return nil
}
@@ -143,10 +143,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, o
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
- posUpdate := types.LogPosition{
- Offset: offset,
- Partition: partition,
- }
+ posUpdate := types.StreamPosition(offset)
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
@@ -156,7 +153,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, o
return nil
}
-func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error {
+func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64) error {
output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
@@ -170,10 +167,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
- posUpdate := types.LogPosition{
- Offset: offset,
- Partition: partition,
- }
+ posUpdate := types.StreamPosition(offset)
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 56a438fb..41efd4a0 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -47,8 +47,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// be already filled in with join/leave information.
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
- userID string, res *types.Response, from, to types.LogPosition,
-) (newPos types.LogPosition, hasNew bool, err error) {
+ userID string, res *types.Response, from, to types.StreamPosition,
+) (newPos types.StreamPosition, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
@@ -64,27 +64,18 @@ func DeviceListCatchup(
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
-
- var partition int32
- var offset int64
- partition = -1
- offset = sarama.OffsetOldest
- // Extract partition/offset from sync token
- // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
- if !from.IsEmpty() {
- partition = from.Partition
- offset = from.Offset
+ offset := sarama.OffsetOldest
+ toOffset := sarama.OffsetNewest
+ if to > 0 && to > from {
+ toOffset = int64(to)
}
- var toOffset int64
- toOffset = sarama.OffsetNewest
- if toLog := to; toLog.Partition == partition && toLog.Offset > 0 {
- toOffset = toLog.Offset
+ if from > 0 {
+ offset = int64(from)
}
var queryRes keyapi.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &keyapi.QueryKeyChangesRequest{
- Partition: partition,
- Offset: offset,
- ToOffset: toOffset,
+ Offset: offset,
+ ToOffset: toOffset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
@@ -95,8 +86,8 @@ func DeviceListCatchup(
var sharedUsersMap map[string]int
sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs)
util.GetLogger(ctx).Debugf(
- "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
- partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
+ "QueryKeyChanges request off=%d,to=%d response off=%d uids=%v",
+ offset, toOffset, queryRes.Offset, queryRes.UserIDs,
)
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
@@ -125,13 +116,8 @@ func DeviceListCatchup(
res.DeviceLists.Left = append(res.DeviceLists.Left, userID)
}
}
- // set the new token
- to = types.LogPosition{
- Partition: queryRes.Partition,
- Offset: queryRes.Offset,
- }
- return to, hasNew, nil
+ return types.StreamPosition(queryRes.Offset), hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index e52e5556..d9fb9cf8 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -6,7 +6,6 @@ import (
"sort"
"testing"
- "github.com/Shopify/sarama"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -16,11 +15,7 @@ import (
var (
syncingUser = "@alice:localhost"
- emptyToken = types.LogPosition{}
- newestToken = types.LogPosition{
- Offset: sarama.OffsetNewest,
- Partition: 0,
- }
+ emptyToken = types.StreamPosition(0)
)
type mockKeyAPI struct{}
@@ -186,7 +181,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -209,7 +204,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -232,7 +227,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -254,7 +249,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -313,7 +308,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser},
},
}
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@@ -341,7 +336,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser},
},
}
- _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -427,7 +422,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
},
}
_, hasNew, err := DeviceListCatchup(
- context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
+ context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken,
)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go
index 9ea9d088..6ff8a7fd 100644
--- a/syncapi/streams/stream_devicelist.go
+++ b/syncapi/streams/stream_devicelist.go
@@ -10,7 +10,7 @@ import (
)
type DeviceListStreamProvider struct {
- PartitionedStreamProvider
+ StreamProvider
rsAPI api.RoomserverInternalAPI
keyAPI keyapi.KeyInternalAPI
}
@@ -18,15 +18,15 @@ type DeviceListStreamProvider struct {
func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
-) types.LogPosition {
+) types.StreamPosition {
return p.LatestPosition(ctx)
}
func (p *DeviceListStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
- from, to types.LogPosition,
-) types.LogPosition {
+ from, to types.StreamPosition,
+) types.StreamPosition {
var err error
to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index ba4118df..6b02c75e 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -18,7 +18,7 @@ type Streams struct {
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
- DeviceListStreamProvider types.PartitionedStreamProvider
+ DeviceListStreamProvider types.StreamProvider
}
func NewSyncStreamProviders(
@@ -48,9 +48,9 @@ func NewSyncStreamProviders(
userAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{
- PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
- rsAPI: rsAPI,
- keyAPI: keyAPI,
+ StreamProvider: StreamProvider{DB: d},
+ rsAPI: rsAPI,
+ keyAPI: keyAPI,
},
}
diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go
deleted file mode 100644
index 265e22a2..00000000
--- a/syncapi/streams/template_pstream.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package streams
-
-import (
- "context"
- "sync"
-
- "github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/types"
-)
-
-type PartitionedStreamProvider struct {
- DB storage.Database
- latest types.LogPosition
- latestMutex sync.RWMutex
-}
-
-func (p *PartitionedStreamProvider) Setup() {
-}
-
-func (p *PartitionedStreamProvider) Advance(
- latest types.LogPosition,
-) {
- p.latestMutex.Lock()
- defer p.latestMutex.Unlock()
-
- if latest.IsAfter(&p.latest) {
- p.latest = latest
- }
-}
-
-func (p *PartitionedStreamProvider) LatestPosition(
- ctx context.Context,
-) types.LogPosition {
- p.latestMutex.RLock()
- defer p.latestMutex.RUnlock()
-
- return p.latest
-}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index a4573610..ca35951a 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -140,6 +140,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
// Extract values from request
syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
+ if err == types.ErrMalformedSyncToken {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.InvalidArgumentValue(err.Error()),
+ }
+ }
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown(err.Error()),
diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go
index 93ed1266..f6185fcb 100644
--- a/syncapi/types/provider.go
+++ b/syncapi/types/provider.go
@@ -42,11 +42,3 @@ type StreamProvider interface {
// LatestPosition returns the latest stream position for this stream.
LatestPosition(ctx context.Context) StreamPosition
}
-
-type PartitionedStreamProvider interface {
- Setup()
- Advance(latest LogPosition)
- CompleteSync(ctx context.Context, req *SyncRequest) LogPosition
- IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition
- LatestPosition(ctx context.Context) LogPosition
-}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 44e718b3..68c308d8 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -16,6 +16,7 @@ package types
import (
"encoding/json"
+ "errors"
"fmt"
"strconv"
"strings"
@@ -26,13 +27,10 @@ import (
)
var (
- // ErrInvalidSyncTokenType is returned when an attempt at creating a
- // new instance of SyncToken with an invalid type (i.e. neither "s"
- // nor "t").
- ErrInvalidSyncTokenType = fmt.Errorf("sync token has an unknown prefix (should be either s or t)")
- // ErrInvalidSyncTokenLen is returned when the pagination token is an
- // invalid length
- ErrInvalidSyncTokenLen = fmt.Errorf("sync token has an invalid length")
+ // This error is returned when parsing sync tokens if the token is invalid. Callers can use this
+ // error to detect whether to 400 or 401 the client. It is recommended to 401 them to force a
+ // logout.
+ ErrMalformedSyncToken = errors.New("malformed sync token")
)
type StateDelta struct {
@@ -47,27 +45,6 @@ type StateDelta struct {
// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64
-// LogPosition represents the offset in a Kafka log a client is at.
-type LogPosition struct {
- Partition int32
- Offset int64
-}
-
-func (p *LogPosition) IsEmpty() bool {
- return p.Offset == 0
-}
-
-// IsAfter returns true if this position is after `lp`.
-func (p *LogPosition) IsAfter(lp *LogPosition) bool {
- if lp == nil {
- return false
- }
- if p.Partition != lp.Partition {
- return false
- }
- return p.Offset > lp.Offset
-}
-
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct {
*gomatrixserverlib.HeaderedEvent
@@ -124,7 +101,7 @@ type StreamingToken struct {
SendToDevicePosition StreamPosition
InvitePosition StreamPosition
AccountDataPosition StreamPosition
- DeviceListPosition LogPosition
+ DeviceListPosition StreamPosition
}
// This will be used as a fallback by json.Marshal.
@@ -140,14 +117,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
- "s%d_%d_%d_%d_%d_%d",
+ "s%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
- t.InvitePosition, t.AccountDataPosition,
+ t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition,
)
- if dl := t.DeviceListPosition; !dl.IsEmpty() {
- posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
- }
return posStr
}
@@ -166,14 +140,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.AccountDataPosition > other.AccountDataPosition:
return true
- case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
+ case t.DeviceListPosition > other.DeviceListPosition:
return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
- return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty()
+ return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition == 0
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@@ -208,7 +182,7 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.AccountDataPosition > t.AccountDataPosition {
t.AccountDataPosition = other.AccountDataPosition
}
- if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) {
+ if other.DeviceListPosition > t.DeviceListPosition {
t.DeviceListPosition = other.DeviceListPosition
}
}
@@ -292,16 +266,18 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
if len(tok) < 1 {
- err = fmt.Errorf("empty stream token")
+ err = ErrMalformedSyncToken
return
}
if tok[0] != SyncTokenTypeStream[0] {
- err = fmt.Errorf("stream token must start with 's'")
+ err = ErrMalformedSyncToken
return
}
- categories := strings.Split(tok[1:], ".")
- parts := strings.Split(categories[0], "_")
- var positions [6]StreamPosition
+ // Migration: Remove everything after and including '.' - we previously had tokens like:
+ // s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
+ tok = strings.Split(tok, ".")[0]
+ parts := strings.Split(tok[1:], "_")
+ var positions [7]StreamPosition
for i, p := range parts {
if i > len(positions) {
break
@@ -309,6 +285,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
var pos int
pos, err = strconv.Atoi(p)
if err != nil {
+ err = ErrMalformedSyncToken
return
}
positions[i] = StreamPosition(pos)
@@ -320,31 +297,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
SendToDevicePosition: positions[3],
InvitePosition: positions[4],
AccountDataPosition: positions[5],
- }
- // dl-0-1234
- // $log_name-$partition-$offset
- for _, logStr := range categories[1:] {
- segments := strings.Split(logStr, "-")
- if len(segments) != 3 {
- err = fmt.Errorf("invalid log position %q", logStr)
- return
- }
- switch segments[0] {
- case "dl":
- // Device list syncing
- var partition, offset int
- if partition, err = strconv.Atoi(segments[1]); err != nil {
- return
- }
- if offset, err = strconv.Atoi(segments[2]); err != nil {
- return
- }
- token.DeviceListPosition.Partition = int32(partition)
- token.DeviceListPosition.Offset = int64(offset)
- default:
- err = fmt.Errorf("unrecognised token type %q", segments[0])
- return
- }
+ DeviceListPosition: positions[6],
}
return token, nil
}
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index 3e577788..cda178b3 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -2,50 +2,17 @@ package types
import (
"encoding/json"
- "reflect"
"testing"
"github.com/matrix-org/gomatrixserverlib"
)
-func TestNewSyncTokenWithLogs(t *testing.T) {
- tests := map[string]*StreamingToken{
- "s4_0_0_0_0_0": {
- PDUPosition: 4,
- },
- "s4_0_0_0_0_0.dl-0-123": {
- PDUPosition: 4,
- DeviceListPosition: LogPosition{
- Partition: 0,
- Offset: 123,
- },
- },
- }
- for tok, want := range tests {
- got, err := NewStreamTokenFromString(tok)
- if err != nil {
- if want == nil {
- continue // error expected
- }
- t.Errorf("%s errored: %s", tok, err)
- continue
- }
- if !reflect.DeepEqual(got, *want) {
- t.Errorf("%s mismatch: got %v want %v", tok, got, want)
- }
- gotStr := got.String()
- if gotStr != tok {
- t.Errorf("%s reserialisation mismatch: got %s want %s", tok, gotStr, tok)
- }
- }
-}
-
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
- "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(),
- "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(),
- "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(),
- "t3_1": TopologyToken{3, 1}.String(),
+ "s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0}.String(),
+ "s3_1_0_0_0_0_2": StreamingToken{3, 1, 0, 0, 0, 0, 2}.String(),
+ "s3_1_2_3_5_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0}.String(),
+ "t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {