1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
#!/usr/bin/env python3
import dbus
import libvirttest
import pytest
import xmldata
DBUS_EXCEPTION_MISSING_FUNCTION = 'this function is not supported by the connection driver'
class TestDomain(libvirttest.BaseTestClass):
def test_api(self):
obj, domain = self.get_test_domain()
props = obj.GetAll('org.libvirt.Domain', dbus_interface=dbus.PROPERTIES_IFACE)
assert isinstance(props['Active'], dbus.Boolean)
assert isinstance(props['Autostart'], dbus.Boolean)
assert isinstance(props['Id'], dbus.UInt32)
assert isinstance(props['Name'], dbus.String)
assert isinstance(props['OSType'], dbus.String)
assert isinstance(props['Persistent'], dbus.Boolean)
assert any([isinstance(props['SchedulerType'], dbus.Struct),
isinstance(props['SchedulerType'][0], dbus.String),
isinstance(props['SchedulerType'][1], dbus.Int32)])
assert isinstance(props['Updated'], dbus.Boolean)
assert isinstance(props['UUID'], dbus.String)
# Call all methods except Reset and GetStats, because the test backend
# doesn't support those
xml = domain.GetXMLDesc(0)
assert isinstance(xml, dbus.String)
domain.Reboot(0)
domain.Shutdown(0)
domain.Create(0)
try:
domain.Destroy(0)
except dbus.exceptions.DBusException as e:
if not any(DBUS_EXCEPTION_MISSING_FUNCTION in arg for arg in e.args):
raise e
domain.Undefine(0)
def test_domain_autostart(self):
_, domain = self.get_test_domain()
autostart_expected = True
domain.Set('org.libvirt.Domain', 'Autostart', autostart_expected, dbus_interface=dbus.PROPERTIES_IFACE)
autostart_current = domain.Get('org.libvirt.Domain', 'Autostart', dbus_interface=dbus.PROPERTIES_IFACE)
assert autostart_current == dbus.Boolean(autostart_expected)
def test_domain_managed_save(self):
def domain_stopped(path, event, detail):
if event != libvirttest.DomainEvent.STOPPED:
return
assert detail == libvirttest.DomainEventStoppedDetailType.SAVED
assert isinstance(path, dbus.ObjectPath)
self.loop.quit()
self.connect.connect_to_signal('DomainEvent', domain_stopped)
obj, domain = self.get_test_domain()
domain.ManagedSave(0)
assert domain.HasManagedSaveImage(0) == dbus.Boolean(True)
state, _ = domain.GetState(0)
assert state == libvirttest.DomainState.SHUTOFF
domain.ManagedSaveRemove(0)
assert domain.HasManagedSaveImage(0) == dbus.Boolean(False)
self.main_loop()
def test_domain_metadata(self):
metadata_description = 0
obj, domain = self.get_test_domain()
description_expected = "This is the Test domain"
domain.SetMetadata(metadata_description,
description_expected, "", "", 0)
assert description_expected == domain.GetMetadata(metadata_description, "", 0)
def test_resume(self):
def domain_resumed(path, event, detail):
if event != libvirttest.DomainEvent.RESUMED:
return
assert detail == libvirttest.DomainEventResumedDetailType.UNPAUSED
assert isinstance(path, dbus.ObjectPath)
self.loop.quit()
self.connect.connect_to_signal('DomainEvent', domain_resumed)
obj, domain = self.get_test_domain()
domain.Suspend()
domain.Resume()
state, _ = domain.GetState(0)
assert state == libvirttest.DomainState.RUNNING
self.main_loop()
def test_shutdown(self):
def domain_stopped(path, event, detail):
if event != libvirttest.DomainEvent.STOPPED:
return
assert detail == libvirttest.DomainEventStoppedDetailType.SHUTDOWN
assert isinstance(path, dbus.ObjectPath)
self.loop.quit()
self.connect.connect_to_signal('DomainEvent', domain_stopped)
obj, domain = self.get_test_domain()
domain.Shutdown(0)
state, _ = domain.GetState(0)
assert state == libvirttest.DomainState.SHUTOFF
self.main_loop()
def test_suspend(self):
def domain_suspended(path, event, detail):
if event != libvirttest.DomainEvent.SUSPENDED:
return
assert detail == libvirttest.DomainEventSuspendedDetailType.PAUSED
assert isinstance(path, dbus.ObjectPath)
self.loop.quit()
self.connect.connect_to_signal('DomainEvent', domain_suspended)
obj, domain = self.get_test_domain()
domain.Suspend()
state, _ = domain.GetState(0)
assert state == libvirttest.DomainState.PAUSED
self.main_loop()
def test_undefine(self):
def domain_undefined(path, event, detail):
if event != libvirttest.DomainEvent.UNDEFINED:
return
assert detail == libvirttest.DomainEventUndefinedDetailType.REMOVED
assert isinstance(path, dbus.ObjectPath)
self.loop.quit()
self.connect.connect_to_signal('DomainEvent', domain_undefined)
_, domain = self.get_test_domain()
domain.Shutdown(0)
domain.Undefine(0)
self.main_loop()
def test_domain_vcpus(self):
obj, domain = self.get_test_domain()
vcpus_expected = 2
domain.SetVcpus(vcpus_expected, 0)
assert domain.GetVcpus(0) == dbus.Int32(vcpus_expected)
def test_domain_vcpu_pin_info(self):
# Test driver was broken in 6.6.0 accidentally depending
# on host CPU topology instead of its fake topology
obj = self.bus.get_object('org.libvirt', '/org/libvirt/Test')
ver = obj.Get('org.libvirt.Connect', "LibVersion", dbus_interface=dbus.PROPERTIES_IFACE)
if ver == 6006000:
pytest.skip("CPU toploogy is broken in libvirt 6.6.0 test driver")
obj, domain = self.get_test_domain()
pinInfo_expected = [
[True, True, True, True, True, True, True, True],
[True, True, True, True, True, True, True, True]
]
pinInfo = domain.GetVcpuPinInfo(0)
assert pinInfo == pinInfo_expected
def test_snapshot(self):
obj, domain = self.get_test_domain()
domain.SnapshotCreateXML(xmldata.minimal_snapshot_xml, 0)
assert isinstance(domain.SnapshotCurrent(0), dbus.ObjectPath)
assert isinstance(domain.SnapshotLookupByName("my_snapshot", 0), dbus.ObjectPath)
assert isinstance(domain.ListDomainSnapshots(0), dbus.Array)
if __name__ == '__main__':
libvirttest.run()
|