diff options
Diffstat (limited to 'syncapi/consumers/keychange.go')
-rw-r--r-- | syncapi/consumers/keychange.go | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 3fc6120d..0d82f7a5 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -114,12 +114,14 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams - posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ - syncinternal.DeviceListLogName: { - Offset: msg.Offset, - Partition: msg.Partition, + posUpdate := types.StreamingToken{ + Logs: map[string]*types.LogPosition{ + syncinternal.DeviceListLogName: { + Offset: msg.Offset, + Partition: msg.Partition, + }, }, - }) + } for userID := range queryRes.UserIDsToCount { s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) } |