aboutsummaryrefslogtreecommitdiff
path: root/tests/qemu-iotests/302
blob: a6d79e727b55dd2d0d0e6fb286507f5f87bfd3ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
# group: quick
#
# Tests converting qcow2 compressed to NBD
#
# Copyright (c) 2020 Nir Soffer <nirsof@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# owner=nirsof@gmail.com

import io
import tarfile

import iotests

from iotests import (
    file_path,
    qemu_img,
    qemu_img_check,
    qemu_img_create,
    qemu_img_log,
    qemu_img_measure,
    qemu_io,
    qemu_nbd_popen,
    img_info_log,
)

iotests.script_initialize(supported_fmts=["qcow2"])

# Create source disk. Using qcow2 to enable strict comparing later, and
# avoid issues with random filesystem on CI environment.
src_disk = file_path("disk.qcow2")
qemu_img_create("-f", iotests.imgfmt, src_disk, "1g")
qemu_io("-f", iotests.imgfmt, "-c", "write 1m 64k", src_disk)

# The use case is writing qcow2 image directly into an ova file, which
# is a tar file with specific layout. This is tricky since we don't know the
# size of the image before compressing, so we have to do:
# 1. Add an ovf file.
# 2. Find the offset of the next member data.
# 3. Make room for image data, allocating for the worst case.
# 4. Write compressed image data into the tar.
# 5. Add a tar entry with the actual image size.
# 6. Shrink the tar to the actual size, aligned to 512 bytes.

tar_file = file_path("test.ova")

with tarfile.open(tar_file, "w") as tar:

    # 1. Add an ovf file.

    ovf_data = b"<xml/>"
    ovf = tarfile.TarInfo("vm.ovf")
    ovf.size = len(ovf_data)
    tar.addfile(ovf, io.BytesIO(ovf_data))

    # 2. Find the offset of the next member data.

    offset = tar.fileobj.tell() + 512

    # 3. Make room for image data, allocating for the worst case.

    measure = qemu_img_measure("-O", "qcow2", src_disk)
    tar.fileobj.truncate(offset + measure["required"])

    # 4. Write compressed image data into the tar.

    nbd_sock = file_path("nbd-sock", base_dir=iotests.sock_dir)
    nbd_uri = "nbd+unix:///exp?socket=" + nbd_sock

    # Use raw format to allow creating qcow2 directly into tar file.
    with qemu_nbd_popen(
            "--socket", nbd_sock,
            "--export-name", "exp",
            "--format", "raw",
            "--offset", str(offset),
            tar_file):

        iotests.log("=== Target image info ===")
        # Not img_info_log as it enforces imgfmt, but now we print info on raw
        qemu_img_log("info", nbd_uri)

        qemu_img(
            "convert",
            "-f", iotests.imgfmt,
            "-O", "qcow2",
            "-c",
            src_disk,
            nbd_uri)

        iotests.log("=== Converted image info ===")
        img_info_log(nbd_uri)

        iotests.log("=== Converted image check ===")
        qemu_img_log("check", nbd_uri)

        iotests.log("=== Comparing to source disk ===")
        qemu_img_log("compare", src_disk, nbd_uri)

        actual_size = qemu_img_check(nbd_uri)["image-end-offset"]

    # 5. Add a tar entry with the actual image size.

    disk = tarfile.TarInfo("disk")
    disk.size = actual_size
    tar.addfile(disk)

    # 6. Shrink the tar to the actual size, aligned to 512 bytes.

    tar_size = offset + (disk.size + 511) & ~511
    tar.fileobj.seek(tar_size)
    tar.fileobj.truncate(tar_size)

with tarfile.open(tar_file) as tar:
    members = [{"name": m.name, "size": m.size, "offset": m.offset_data}
               for m in tar]
    iotests.log("=== OVA file contents ===")
    iotests.log(members)