aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/keychange.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/keychange.go')
-rw-r--r--syncapi/consumers/keychange.go12
1 files changed, 7 insertions, 5 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 3fc6120d..0d82f7a5 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -114,12 +114,14 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
- posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
- syncinternal.DeviceListLogName: {
- Offset: msg.Offset,
- Partition: msg.Partition,
+ posUpdate := types.StreamingToken{
+ Logs: map[string]*types.LogPosition{
+ syncinternal.DeviceListLogName: {
+ Offset: msg.Offset,
+ Partition: msg.Partition,
+ },
},
- })
+ }
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}