aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/roomserver.go5
-rw-r--r--syncapi/consumers/typingserver.go11
3 files changed, 12 insertions, 6 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index ed39cd2d..17f2c522 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -90,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
- s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos})
+ s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.PaginationToken{PDUPosition: pduPos})
return nil
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index cde2f508..ba1b7dc5 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -133,6 +133,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
msg.AddsStateEventIDs,
msg.RemovesStateEventIDs,
msg.TransactionID,
+ false,
)
if err != nil {
// panic rather than continue with an inconsistent database
@@ -144,7 +145,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure")
return nil
}
- s.notifier.OnNewEvent(&ev, "", nil, types.SyncPosition{PDUPosition: pduPos})
+ s.notifier.OnNewEvent(&ev, "", nil, types.PaginationToken{PDUPosition: pduPos})
return nil
}
@@ -161,7 +162,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
- s.notifier.OnNewEvent(&msg.Event, "", nil, types.SyncPosition{PDUPosition: pduPos})
+ s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos})
return nil
}
diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go
index 392f7987..36925441 100644
--- a/syncapi/consumers/typingserver.go
+++ b/syncapi/consumers/typingserver.go
@@ -63,7 +63,12 @@ func NewOutputTypingEventConsumer(
// Start consuming from typing api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
- s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition})
+ s.notifier.OnNewEvent(
+ nil, roomID, nil,
+ types.PaginationToken{
+ EDUTypingPosition: types.StreamPosition(latestSyncPosition),
+ },
+ )
})
return s.typingConsumer.Start()
@@ -83,7 +88,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
"typing": output.Event.Typing,
}).Debug("received data from typing server")
- var typingPos int64
+ var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
@@ -91,6 +96,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
- s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos})
+ s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.PaginationToken{EDUTypingPosition: typingPos})
return nil
}