1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
# Copyright (C) 2013, 2014 Red Hat, Inc.
#
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.
import os
import tempfile
import pytest
import virtinst
from virtinst import DeviceDisk
from tests import utils
def test_disk_numtotarget():
# Various testing our target generation
#
# Note: using single quotes in strings to avoid
# codespell flagging the 'ba' assert
assert DeviceDisk.num_to_target(1) == "a"
assert DeviceDisk.num_to_target(2) == "b"
assert DeviceDisk.num_to_target(26) == "z"
assert DeviceDisk.num_to_target(27) == "aa"
assert DeviceDisk.num_to_target(28) == "ab"
assert DeviceDisk.num_to_target(52) == "az"
assert DeviceDisk.num_to_target(53) == "ba"
assert DeviceDisk.num_to_target(27 * 26) == "zz"
assert DeviceDisk.num_to_target(27 * 26 + 1) == "aaa"
assert DeviceDisk.target_to_num("hda") == 0
assert DeviceDisk.target_to_num("hdb") == 1
assert DeviceDisk.target_to_num("sdz") == 25
assert DeviceDisk.target_to_num("sdaa") == 26
assert DeviceDisk.target_to_num("vdab") == 27
assert DeviceDisk.target_to_num("vdaz") == 51
assert DeviceDisk.target_to_num("xvdba") == 52
assert DeviceDisk.target_to_num("xvdzz") == 26 * (25 + 1) + 25
assert DeviceDisk.target_to_num("xvdaaa") == 26 * 26 * 1 + 26 * 1 + 0
conn = utils.URIs.open_testdefault_cached()
disk = virtinst.DeviceDisk(conn)
disk.bus = "ide"
assert disk.generate_target([]) == "hda"
assert disk.generate_target(["hda"]) == "hdb"
assert disk.generate_target(["hdb", "sda"]) == "hdc"
assert disk.generate_target(["hda", "hdd"]) == "hdb"
def test_disk_dir_searchable(monkeypatch):
# Normally the dir searchable test is skipped in the test suite,
# but let's contrive an example that should trigger all the code
# to ensure it isn't horribly broken
conn = utils.URIs.open_kvm()
def _set_caps_baselabel_uid(uid):
secmodel = [s for s in conn.caps.host.secmodels if s.model == "dac"][0]
for baselabel in [b for b in secmodel.baselabels if b.type in ["qemu", "kvm"]]:
baselabel.content = "+%s:+%s" % (uid, uid)
tmpobj = tempfile.TemporaryDirectory(prefix="virtinst-test-search")
tmpdir = tmpobj.name
try:
# path="" should trigger early return
searchdata = virtinst.DeviceDisk.check_path_search(conn, "")
assert searchdata.uid is None
# path=None should trigger early return
searchdata = virtinst.DeviceDisk.check_path_search(conn, None)
assert searchdata.uid is None
# Invalid uid
_set_caps_baselabel_uid(-1)
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
assert searchdata.uid is None
# Use our uid, verify it shows we have expected access
_set_caps_baselabel_uid(os.getuid())
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir + "/footest")
assert searchdata.uid == os.getuid()
# pylint: disable=use-implicit-booleaness-not-comparison
assert searchdata.fixlist == []
# Remove perms on the tmpdir, now it should report failures
os.chmod(tmpdir, 0o000)
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
assert searchdata.fixlist == [tmpdir]
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
assert not bool(errdict)
# Mock setfacl to definitely fail
with monkeypatch.context() as m:
m.setattr("virtinst.diskbackend.SETFACL", "getfacl")
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
finally:
# Reset changes we made
conn.invalidate_caps()
os.chmod(tmpdir, 0o777)
def test_disk_path_in_use_kernel():
# Extra tests for DeviceDisk.path_in_use
conn = utils.URIs.open_kvm()
# Comparing against kernel
vms = virtinst.DeviceDisk.path_in_use_by(conn, "/pool-dir/test-arm-kernel")
assert vms == ["test-arm-kernel"]
def test_disk_paths_in_use():
conn = utils.URIs.open_kvm()
vms = virtinst.DeviceDisk.paths_in_use_by(
conn, ["/pool-dir/test-arm-kernel", "/pool-dir/test-arm-initrd"]
)
assert vms == [["test-arm-kernel"], ["test-arm-kernel"]]
def test_disk_diskbackend_misc():
# Test get_size() with vol_install
conn = utils.URIs.open_testdefault_cached()
disk = virtinst.DeviceDisk(conn)
pool = conn.storagePoolLookupByName("pool-dir")
vol_install = disk.build_vol_install(conn, "newvol1.img", pool, 1, False)
disk.set_vol_install(vol_install)
assert disk.get_size() == 1.0
# Test some blockdev inspecting
conn = utils.URIs.openconn("test:///default")
if os.path.exists("/dev/loop0"):
disk = virtinst.DeviceDisk(conn)
disk.set_source_path("/dev/loop0")
assert disk.type == "block"
disk.get_size()
# Test sparse cloning
tmpinput = tempfile.NamedTemporaryFile()
open(tmpinput.name, "wb").write(b"\0" * 10000)
srcdisk = virtinst.DeviceDisk(conn)
srcdisk.set_source_path(tmpinput.name)
newdisk = virtinst.DeviceDisk(conn)
tmpoutput = tempfile.NamedTemporaryFile()
os.unlink(tmpoutput.name)
newdisk.set_source_path(tmpoutput.name)
newdisk.set_local_disk_to_clone(srcdisk, True)
newdisk.build_storage(None)
# Test cloning onto existing disk
newdisk = virtinst.DeviceDisk(conn, parsexml=newdisk.get_xml())
newdisk.set_source_path(newdisk.get_source_path())
newdisk.set_local_disk_to_clone(srcdisk, True)
newdisk.build_storage(None)
newdisk = virtinst.DeviceDisk(conn)
newdisk.type = "block"
newdisk.set_source_path("/dev/foo/idontexist")
assert newdisk.get_size() == 0
conn = utils.URIs.open_testdriver_cached()
volpath = "/pool-dir/test-clone-simple.img"
assert virtinst.DeviceDisk.path_definitely_exists(conn, volpath)
disk = virtinst.DeviceDisk(conn)
disk.set_source_path(volpath)
assert disk.get_size()
disk = virtinst.DeviceDisk(conn)
with pytest.raises(RuntimeError, match="StoragePool.install testsuite mocked failure"):
disk.set_source_path("/virtinst-testsuite-fail-pool-install/test.qcow2")
def test_disk_diskbackend_parse():
# Test that calling validate() on parsed disk XML doesn't attempt
# to verify the path exists. Assume it's a working config
conn = utils.URIs.open_testdriver_cached()
xml = "<disk type='file' device='disk'><source file='/A/B/C/D/NOPE'/></disk>"
disk = virtinst.DeviceDisk(conn, parsexml=xml)
disk.validate()
disk.is_size_conflict()
disk.build_storage(None)
assert getattr(disk, "_storage_backend").is_stub() is True
# Stub backend coverage testing
backend = getattr(disk, "_storage_backend")
assert disk.get_parent_pool() is None
assert disk.get_vol_object() is None
assert disk.get_vol_install() is None
assert disk.get_size() == 0
assert backend.get_vol_xml() is None
assert backend.get_dev_type() == "file"
assert backend.get_driver_type() is None
assert backend.get_parent_pool() is None
disk.set_backend_for_existing_path()
assert getattr(disk, "_storage_backend").is_stub() is False
with pytest.raises(ValueError):
disk.validate()
# Ensure set_backend_for_existing_path resolves a path
# to its existing storage volume
xml = "<disk type='file' device='disk'><source file='/pool-dir/default-vol'/></disk>"
disk = virtinst.DeviceDisk(conn, parsexml=xml)
disk.set_backend_for_existing_path()
assert disk.get_vol_object()
# Verify set_backend_for_existing_path doesn't error
# for a variety of disks
dom = conn.lookupByName("test-many-devices")
guest = virtinst.Guest(conn, parsexml=dom.XMLDesc(0))
for disk in guest.devices.disk:
disk.set_backend_for_existing_path()
def test_disk_rbd_path():
conn = utils.URIs.open_testdriver_cached()
diskxml1 = """
<disk type="network" device="disk">
<source protocol="rbd" name="rbd-sourcename/some-rbd-vol">
<host name="ceph-mon-1.example.com" port="6789"/>
<host name="ceph-mon-2.example.com" port="6789"/>
<host name="ceph-mon-3.example.com" port="6789"/>
</source>
<target dev="vdag" bus="virtio"/>
</disk>
"""
disk1 = virtinst.DeviceDisk(conn, parsexml=diskxml1)
disk1.set_backend_for_existing_path()
assert disk1.get_vol_object().name() == "some-rbd-vol"
|