#!/usr/bin/env python
#
# Test case for the QMP 'change' command and all other associated
# commands
#
# Copyright (C) 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import stat
import time
import iotests
from iotests import qemu_img

old_img = os.path.join(iotests.test_dir, 'test0.img')
new_img = os.path.join(iotests.test_dir, 'test1.img')

class ChangeBaseClass(iotests.QMPTestCase):
    has_opened = False
    has_closed = False

    def process_events(self):
        for event in self.vm.get_qmp_events(wait=False):
            if (event['event'] == 'DEVICE_TRAY_MOVED' and
                event['data']['device'] == 'drive0'):
                if event['data']['tray-open'] == False:
                    self.has_closed = True
                else:
                    self.has_opened = True

    def wait_for_open(self):
        if not self.has_real_tray:
            return

        timeout = time.clock() + 3
        while not self.has_opened and time.clock() < timeout:
            self.process_events()
        if not self.has_opened:
            self.fail('Timeout while waiting for the tray to open')

    def wait_for_close(self):
        if not self.has_real_tray:
            return

        timeout = time.clock() + 3
        while not self.has_closed and time.clock() < timeout:
            self.process_events()
        if not self.has_opened:
            self.fail('Timeout while waiting for the tray to close')

class GeneralChangeTestsBaseClass(ChangeBaseClass):
    def test_change(self):
        result = self.vm.qmp('change', device='drive0', target=new_img,
                                       arg=iotests.imgfmt)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()
        self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_blockdev_change_medium(self):
        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()
        self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_eject(self):
        result = self.vm.qmp('eject', device='drive0', force=True)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        self.assert_qmp_absent(result, 'return[0]/inserted')

    def test_tray_eject_change(self):
        result = self.vm.qmp('eject', device='drive0', force=True)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        self.assert_qmp_absent(result, 'return[0]/inserted')

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt)
        self.assert_qmp(result, 'return', {})

        self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_tray_open_close(self):
        result = self.vm.qmp('blockdev-open-tray', device='drive0', force=True)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        if self.was_empty == True:
            self.assert_qmp_absent(result, 'return[0]/inserted')
        else:
            self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-close-tray', device='drive0')
        self.assert_qmp(result, 'return', {})

        if self.has_real_tray or not self.was_empty:
            self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        if self.was_empty == True:
            self.assert_qmp_absent(result, 'return[0]/inserted')
        else:
            self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

    def test_tray_eject_close(self):
        result = self.vm.qmp('eject', device='drive0', force=True)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        self.assert_qmp_absent(result, 'return[0]/inserted')

        result = self.vm.qmp('blockdev-close-tray', device='drive0')
        self.assert_qmp(result, 'return', {})

        self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        self.assert_qmp_absent(result, 'return[0]/inserted')

    def test_tray_open_change(self):
        result = self.vm.qmp('blockdev-open-tray', device='drive0', force=True)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        if self.was_empty == True:
            self.assert_qmp_absent(result, 'return[0]/inserted')
        else:
            self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt)
        self.assert_qmp(result, 'return', {})

        self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_cycle(self):
        result = self.vm.qmp('blockdev-add',
                             options={'node-name': 'new',
                                      'driver': iotests.imgfmt,
                                      'file': {'filename': new_img,
                                               'driver': 'file'}})
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('blockdev-open-tray', device='drive0', force=True)
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        if self.was_empty == True:
            self.assert_qmp_absent(result, 'return[0]/inserted')
        else:
            self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('x-blockdev-remove-medium', device='drive0')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        self.assert_qmp_absent(result, 'return[0]/inserted')

        result = self.vm.qmp('x-blockdev-insert-medium', device='drive0',
                                                       node_name='new')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

        result = self.vm.qmp('blockdev-close-tray', device='drive0')
        self.assert_qmp(result, 'return', {})

        self.wait_for_close()

        result = self.vm.qmp('query-block')
        if self.has_real_tray:
            self.assert_qmp(result, 'return[0]/tray_open', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_close_on_closed(self):
        result = self.vm.qmp('blockdev-close-tray', device='drive0')
        # Should be a no-op
        self.assert_qmp(result, 'return', {})
        self.assertEquals(self.vm.get_qmp_events(wait=False), [])

    def test_remove_on_closed(self):
        if not self.has_real_tray:
            return

        result = self.vm.qmp('x-blockdev-remove-medium', device='drive0')
        self.assert_qmp(result, 'error/class', 'GenericError')

    def test_insert_on_closed(self):
        if not self.has_real_tray:
            return

        result = self.vm.qmp('blockdev-add',
                             options={'node-name': 'new',
                                      'driver': iotests.imgfmt,
                                      'file': {'filename': new_img,
                                               'driver': 'file'}})
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('x-blockdev-insert-medium', device='drive0',
                                                       node_name='new')
        self.assert_qmp(result, 'error/class', 'GenericError')

class TestInitiallyFilled(GeneralChangeTestsBaseClass):
    was_empty = False

    def setUp(self, media, interface):
        qemu_img('create', '-f', iotests.imgfmt, old_img, '1440k')
        qemu_img('create', '-f', iotests.imgfmt, new_img, '1440k')
        self.vm = iotests.VM().add_drive(old_img, 'media=%s' % media, interface)
        self.vm.launch()

    def tearDown(self):
        self.vm.shutdown()
        os.remove(old_img)
        os.remove(new_img)

    def test_insert_on_filled(self):
        result = self.vm.qmp('blockdev-add',
                             options={'node-name': 'new',
                                      'driver': iotests.imgfmt,
                                      'file': {'filename': new_img,
                                               'driver': 'file'}})
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('blockdev-open-tray', device='drive0')
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('x-blockdev-insert-medium', device='drive0',
                                                       node_name='new')
        self.assert_qmp(result, 'error/class', 'GenericError')

class TestInitiallyEmpty(GeneralChangeTestsBaseClass):
    was_empty = True

    def setUp(self, media, interface):
        qemu_img('create', '-f', iotests.imgfmt, new_img, '1440k')
        self.vm = iotests.VM().add_drive(None, 'media=%s' % media, interface)
        self.vm.launch()

    def tearDown(self):
        self.vm.shutdown()
        os.remove(new_img)

    def test_remove_on_empty(self):
        result = self.vm.qmp('blockdev-open-tray', device='drive0')
        self.assert_qmp(result, 'return', {})

        self.wait_for_open()

        result = self.vm.qmp('x-blockdev-remove-medium', device='drive0')
        # Should be a no-op
        self.assert_qmp(result, 'return', {})

class TestCDInitiallyFilled(TestInitiallyFilled):
    TestInitiallyFilled = TestInitiallyFilled
    has_real_tray = True

    def setUp(self):
        self.TestInitiallyFilled.setUp(self, 'cdrom', 'ide')

class TestCDInitiallyEmpty(TestInitiallyEmpty):
    TestInitiallyEmpty = TestInitiallyEmpty
    has_real_tray = True

    def setUp(self):
        self.TestInitiallyEmpty.setUp(self, 'cdrom', 'ide')

class TestFloppyInitiallyFilled(TestInitiallyFilled):
    TestInitiallyFilled = TestInitiallyFilled
    has_real_tray = False

    def setUp(self):
        self.TestInitiallyFilled.setUp(self, 'disk', 'floppy')

class TestFloppyInitiallyEmpty(TestInitiallyEmpty):
    TestInitiallyEmpty = TestInitiallyEmpty
    has_real_tray = False

    def setUp(self):
        self.TestInitiallyEmpty.setUp(self, 'disk', 'floppy')
        # FDDs not having a real tray and there not being a medium inside the
        # tray at startup means the tray will be considered open
        self.has_opened = True

class TestChangeReadOnly(ChangeBaseClass):
    def setUp(self):
        qemu_img('create', '-f', iotests.imgfmt, old_img, '1440k')
        qemu_img('create', '-f', iotests.imgfmt, new_img, '1440k')
        self.vm = iotests.VM()

    def tearDown(self):
        self.vm.shutdown()
        os.chmod(old_img, 0666)
        os.chmod(new_img, 0666)
        os.remove(old_img)
        os.remove(new_img)

    def test_ro_ro_retain(self):
        os.chmod(old_img, 0444)
        os.chmod(new_img, 0444)
        self.vm.add_drive(old_img, 'media=disk,read-only=on', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt,
                                                       read_only_mode='retain')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_ro_rw_retain(self):
        os.chmod(old_img, 0444)
        self.vm.add_drive(old_img, 'media=disk,read-only=on', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt,
                                                       read_only_mode='retain')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_rw_ro_retain(self):
        os.chmod(new_img, 0444)
        self.vm.add_drive(old_img, 'media=disk', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt,
                                                       read_only_mode='retain')
        self.assert_qmp(result, 'error/class', 'GenericError')

        self.assertEquals(self.vm.get_qmp_events(wait=False), [])

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

    def test_ro_rw(self):
        os.chmod(old_img, 0444)
        self.vm.add_drive(old_img, 'media=disk,read-only=on', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium',
                             device='drive0',
                             filename=new_img,
                             format=iotests.imgfmt,
                             read_only_mode='read-write')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_rw_ro(self):
        os.chmod(new_img, 0444)
        self.vm.add_drive(old_img, 'media=disk', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium',
                             device='drive0',
                             filename=new_img,
                             format=iotests.imgfmt,
                             read_only_mode='read-only')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_make_rw_ro(self):
        self.vm.add_drive(old_img, 'media=disk', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium',
                             device='drive0',
                             filename=new_img,
                             format=iotests.imgfmt,
                             read_only_mode='read-only')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_make_ro_rw(self):
        os.chmod(new_img, 0444)
        self.vm.add_drive(old_img, 'media=disk', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium',
                             device='drive0',
                             filename=new_img,
                             format=iotests.imgfmt,
                             read_only_mode='read-write')
        self.assert_qmp(result, 'error/class', 'GenericError')

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

    def test_make_rw_ro_by_retain(self):
        os.chmod(old_img, 0444)
        self.vm.add_drive(old_img, 'media=disk,read-only=on', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt,
                                                       read_only_mode='retain')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

    def test_make_ro_rw_by_retain(self):
        os.chmod(new_img, 0444)
        self.vm.add_drive(old_img, 'media=disk', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-change-medium', device='drive0',
                                                       filename=new_img,
                                                       format=iotests.imgfmt,
                                                       read_only_mode='retain')
        self.assert_qmp(result, 'error/class', 'GenericError')

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

    def test_rw_ro_cycle(self):
        os.chmod(new_img, 0444)
        self.vm.add_drive(old_img, 'media=disk', 'floppy')
        self.vm.launch()

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('blockdev-add',
                             options={'node-name': 'new',
                                      'driver': iotests.imgfmt,
                                      'read-only': True,
                                      'file': {'filename': new_img,
                                               'driver': 'file'}})
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', False)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

        result = self.vm.qmp('x-blockdev-remove-medium', device='drive0')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp_absent(result, 'return[0]/inserted')

        result = self.vm.qmp('x-blockdev-insert-medium', device='drive0',
                                                       node_name='new')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/ro', True)
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)

GeneralChangeTestsBaseClass = None
TestInitiallyFilled = None
TestInitiallyEmpty = None


class TestBlockJobsAfterCycle(ChangeBaseClass):
    def setUp(self):
        qemu_img('create', '-f', iotests.imgfmt, old_img, '1M')

        self.vm = iotests.VM()
        self.vm.launch()

        result = self.vm.qmp('blockdev-add',
                             options={'id': 'drive0',
                                      'driver': 'null-co'})
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/image/format', 'null-co')

        # For device-less BBs, calling blockdev-open-tray or blockdev-close-tray
        # is not necessary
        result = self.vm.qmp('x-blockdev-remove-medium', device='drive0')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp_absent(result, 'return[0]/inserted')

        result = self.vm.qmp('blockdev-add',
                             options={'node-name': 'node0',
                                      'driver': iotests.imgfmt,
                                      'file': {'filename': old_img,
                                               'driver': 'file'}})
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('x-blockdev-insert-medium', device='drive0',
                                                       node_name='node0')
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/image/filename', old_img)

    def tearDown(self):
        self.vm.shutdown()
        os.remove(old_img)
        try:
            os.remove(new_img)
        except OSError:
            pass

    def test_snapshot_and_commit(self):
        # We need backing file support
        if iotests.imgfmt != 'qcow2' and iotests.imgfmt != 'qed':
            return

        result = self.vm.qmp('blockdev-snapshot-sync', device='drive0',
                                                       snapshot_file=new_img,
                                                       format=iotests.imgfmt)
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('query-block')
        self.assert_qmp(result, 'return[0]/inserted/image/filename', new_img)
        self.assert_qmp(result,
                        'return[0]/inserted/image/backing-image/filename',
                        old_img)

        result = self.vm.qmp('block-commit', device='drive0')
        self.assert_qmp(result, 'return', {})

        self.vm.event_wait(name='BLOCK_JOB_READY')

        result = self.vm.qmp('query-block-jobs')
        self.assert_qmp(result, 'return[0]/device', 'drive0')

        result = self.vm.qmp('block-job-complete', device='drive0')
        self.assert_qmp(result, 'return', {})

        self.vm.event_wait(name='BLOCK_JOB_COMPLETED')


if __name__ == '__main__':
    if iotests.qemu_default_machine != 'pc':
        # We need floppy and IDE CD-ROM
        iotests.notrun('not suitable for this machine type: %s' %
                       iotests.qemu_default_machine)
    # Need to support image creation
    iotests.main(supported_fmts=['vpc', 'parallels', 'qcow', 'vdi', 'qcow2',
                                 'vmdk', 'raw', 'vhdx', 'qed'])