diff options
author | Kegsay <kegan@matrix.org> | 2020-05-13 12:14:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-13 12:14:50 +0100 |
commit | 5e9dce1c0c66736937eeddd5c33c92700d9a65a7 (patch) | |
tree | e0c40667cb17714d160fba2fbc64470eedf9ca7b /syncapi/sync/userstream.go | |
parent | 31e6a7f1932c11d9b5b682ad06a5b8db9d74a44f (diff) |
syncapi: Rename and split out tokens (#1025)
* syncapi: Rename and split out tokens
Previously we used the badly named `PaginationToken` which was
used for both `/sync` and `/messages` requests. This quickly
became confusing because named fields like `PDUPosition` meant
different things depending on the token type. Instead, we now have
two token types: `TopologyToken` and `StreamingToken`, both of
which have fields which make more sense for their specific situations.
Updated the codebase to use one or the other. `PaginationToken` still
lives on as `syncToken`, an unexported type which both tokens rely on.
This allows us to guarantee that the specific mappings of positions
to a string remain solely under the control of the `types` package.
This enables us to move high-level conceptual things like
"decrement this topological token" to function calls e.g
`TopologicalToken.Decrement()`.
Currently broken because `/messages` seemingly used both stream and
topological tokens, though I need to confirm this.
* final tweaks/hacks
* spurious logging
* Review comments and linting
Diffstat (limited to 'syncapi/sync/userstream.go')
-rw-r--r-- | syncapi/sync/userstream.go | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go index 88867005..b2eafa3d 100644 --- a/syncapi/sync/userstream.go +++ b/syncapi/sync/userstream.go @@ -34,7 +34,7 @@ type UserStream struct { // Closed when there is an update. signalChannel chan struct{} // The last sync position that there may have been an update for the user - pos types.PaginationToken + pos types.StreamingToken // The last time when we had some listeners waiting timeOfLastChannel time.Time // The number of listeners waiting @@ -50,7 +50,7 @@ type UserStreamListener struct { } // NewUserStream creates a new user stream -func NewUserStream(userID string, currPos types.PaginationToken) *UserStream { +func NewUserStream(userID string, currPos types.StreamingToken) *UserStream { return &UserStream{ UserID: userID, timeOfLastChannel: time.Now(), @@ -83,7 +83,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener { } // Broadcast a new sync position for this user. -func (s *UserStream) Broadcast(pos types.PaginationToken) { +func (s *UserStream) Broadcast(pos types.StreamingToken) { s.lock.Lock() defer s.lock.Unlock() @@ -116,9 +116,9 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time { return s.timeOfLastChannel } -// GetStreamPosition returns last sync position which the UserStream was +// GetSyncPosition returns last sync position which the UserStream was // notified about -func (s *UserStreamListener) GetSyncPosition() types.PaginationToken { +func (s *UserStreamListener) GetSyncPosition() types.StreamingToken { s.userStream.lock.Lock() defer s.userStream.lock.Unlock() @@ -130,7 +130,7 @@ func (s *UserStreamListener) GetSyncPosition() types.PaginationToken { // sincePos specifies from which point we want to be notified about. If there // has already been an update after sincePos we'll return a closed channel // immediately. -func (s *UserStreamListener) GetNotifyChannel(sincePos types.PaginationToken) <-chan struct{} { +func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{} { s.userStream.lock.Lock() defer s.userStream.lock.Unlock() |