aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/typingserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/typingserver.go')
-rw-r--r--syncapi/consumers/typingserver.go11
1 files changed, 8 insertions, 3 deletions
diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go
index 392f7987..36925441 100644
--- a/syncapi/consumers/typingserver.go
+++ b/syncapi/consumers/typingserver.go
@@ -63,7 +63,12 @@ func NewOutputTypingEventConsumer(
// Start consuming from typing api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
- s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition})
+ s.notifier.OnNewEvent(
+ nil, roomID, nil,
+ types.PaginationToken{
+ EDUTypingPosition: types.StreamPosition(latestSyncPosition),
+ },
+ )
})
return s.typingConsumer.Start()
@@ -83,7 +88,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
"typing": output.Event.Typing,
}).Debug("received data from typing server")
- var typingPos int64
+ var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
@@ -91,6 +96,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
- s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos})
+ s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.PaginationToken{EDUTypingPosition: typingPos})
return nil
}