diff options
author | Kegsay <kegan@matrix.org> | 2020-05-13 12:14:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-13 12:14:50 +0100 |
commit | 5e9dce1c0c66736937eeddd5c33c92700d9a65a7 (patch) | |
tree | e0c40667cb17714d160fba2fbc64470eedf9ca7b /syncapi/consumers | |
parent | 31e6a7f1932c11d9b5b682ad06a5b8db9d74a44f (diff) |
syncapi: Rename and split out tokens (#1025)
* syncapi: Rename and split out tokens
Previously we used the badly named `PaginationToken` which was
used for both `/sync` and `/messages` requests. This quickly
became confusing because named fields like `PDUPosition` meant
different things depending on the token type. Instead, we now have
two token types: `TopologyToken` and `StreamingToken`, both of
which have fields which make more sense for their specific situations.
Updated the codebase to use one or the other. `PaginationToken` still
lives on as `syncToken`, an unexported type which both tokens rely on.
This allows us to guarantee that the specific mappings of positions
to a string remain solely under the control of the `types` package.
This enables us to move high-level conceptual things like
"decrement this topological token" to function calls e.g
`TopologicalToken.Decrement()`.
Currently broken because `/messages` seemingly used both stream and
topological tokens, though I need to confirm this.
* final tweaks/hacks
* spurious logging
* Review comments and linting
Diffstat (limited to 'syncapi/consumers')
-rw-r--r-- | syncapi/consumers/clientapi.go | 2 | ||||
-rw-r--r-- | syncapi/consumers/eduserver.go | 6 | ||||
-rw-r--r-- | syncapi/consumers/roomserver.go | 4 |
3 files changed, 5 insertions, 7 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index f5b8c43e..b65d01a0 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -90,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.PaginationToken{PDUPosition: pduPos}) + s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0)) return nil } diff --git a/syncapi/consumers/eduserver.go b/syncapi/consumers/eduserver.go index 249452af..ece999d5 100644 --- a/syncapi/consumers/eduserver.go +++ b/syncapi/consumers/eduserver.go @@ -65,9 +65,7 @@ func (s *OutputTypingEventConsumer) Start() error { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.notifier.OnNewEvent( nil, roomID, nil, - types.PaginationToken{ - EDUTypingPosition: types.StreamPosition(latestSyncPosition), - }, + types.NewStreamToken(0, types.StreamPosition(latestSyncPosition)), ) }) @@ -96,6 +94,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.PaginationToken{EDUTypingPosition: typingPos}) + s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos)) return nil } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 987cc5df..368420a6 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -146,7 +146,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( }).Panicf("roomserver output log: write event failure") return nil } - s.notifier.OnNewEvent(&ev, "", nil, types.PaginationToken{PDUPosition: pduPos}) + s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0)) return nil } @@ -164,7 +164,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( }).Panicf("roomserver output log: write invite failure") return nil } - s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos}) + s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0)) return nil } |