diff options
Diffstat (limited to 'syncapi/consumers')
-rw-r--r-- | syncapi/consumers/clientapi.go | 2 | ||||
-rw-r--r-- | syncapi/consumers/roomserver.go | 5 | ||||
-rw-r--r-- | syncapi/consumers/typingserver.go | 11 |
3 files changed, 12 insertions, 6 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index ed39cd2d..17f2c522 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -90,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos}) + s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.PaginationToken{PDUPosition: pduPos}) return nil } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index cde2f508..ba1b7dc5 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -133,6 +133,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, + false, ) if err != nil { // panic rather than continue with an inconsistent database @@ -144,7 +145,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( }).Panicf("roomserver output log: write event failure") return nil } - s.notifier.OnNewEvent(&ev, "", nil, types.SyncPosition{PDUPosition: pduPos}) + s.notifier.OnNewEvent(&ev, "", nil, types.PaginationToken{PDUPosition: pduPos}) return nil } @@ -161,7 +162,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( }).Panicf("roomserver output log: write invite failure") return nil } - s.notifier.OnNewEvent(&msg.Event, "", nil, types.SyncPosition{PDUPosition: pduPos}) + s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos}) return nil } diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go index 392f7987..36925441 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/typingserver.go @@ -63,7 +63,12 @@ func NewOutputTypingEventConsumer( // Start consuming from typing api func (s *OutputTypingEventConsumer) Start() error { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { - s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition}) + s.notifier.OnNewEvent( + nil, roomID, nil, + types.PaginationToken{ + EDUTypingPosition: types.StreamPosition(latestSyncPosition), + }, + ) }) return s.typingConsumer.Start() @@ -83,7 +88,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error "typing": output.Event.Typing, }).Debug("received data from typing server") - var typingPos int64 + var typingPos types.StreamPosition typingEvent := output.Event if typingEvent.Typing { typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime) @@ -91,6 +96,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos}) + s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.PaginationToken{EDUTypingPosition: typingPos}) return nil } |