1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
import asyncio
import libvirt
import libvirtaio
import sys
import unittest
from unittest import mock
import eventmock
class TestLibvirtAio(unittest.TestCase):
async def _run(self, register):
def lifecycleCallback(conn, dom, event, detail, domainChangedEvent):
if (event == libvirt.VIR_DOMAIN_EVENT_STOPPED or
event == libvirt.VIR_DOMAIN_EVENT_STARTED):
domainChangedEvent.set()
if register:
libvirtEvents = libvirtaio.virEventRegisterAsyncIOImpl()
else:
libvirtEvents = libvirtaio.getCurrentImpl()
conn = libvirt.open("test:///default")
dom = conn.lookupByName("test")
eventRegistered = False
domainStopped = False
try:
# Ensure the VM is running.
self.assertEqual([libvirt.VIR_DOMAIN_RUNNING, libvirt.VIR_DOMAIN_RUNNING_UNKNOWN], dom.state())
self.assertTrue(libvirtEvents.is_idle())
# Register VM start/stopped event handler.
domainChangedEvent = asyncio.Event()
conn.domainEventRegisterAny(dom, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, lifecycleCallback, domainChangedEvent)
eventRegistered = True
self.assertFalse(libvirtEvents.is_idle())
# Stop the VM.
dom.destroy()
domainStopped = True
# Ensure domain stopped event is received.
await asyncio.wait_for(domainChangedEvent.wait(), 2)
self.assertEqual([libvirt.VIR_DOMAIN_SHUTOFF, libvirt.VIR_DOMAIN_SHUTOFF_DESTROYED], dom.state())
# Start the VM.
domainChangedEvent.clear()
domainStopped = False
dom.create()
# Ensure domain started event is received.
await asyncio.wait_for(domainChangedEvent.wait(), 2)
self.assertEqual([libvirt.VIR_DOMAIN_RUNNING, libvirt.VIR_DOMAIN_RUNNING_BOOTED], dom.state())
self.assertFalse(libvirtEvents.is_idle())
# Deregister the VM start/stopped event handler.
eventRegistered = False
conn.domainEventDeregisterAny(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE)
# Wait for event queue to clear.
await libvirtEvents.drain()
# Make sure event queue is cleared.
self.assertTrue(libvirtEvents.is_idle())
finally:
if eventRegistered:
conn.domainEventDeregisterAny(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE)
if domainStopped:
dom.create()
@mock.patch('libvirt.virEventRegisterImpl',
side_effect=eventmock.virEventRegisterImplMock)
def testEventsWithManualLoopSetup(self, mock_event_register):
# Register libvirt events after starting the asyncio loop.
#
# Manually create and set the event loop against this
# thread.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run(register=True))
loop.close()
asyncio.set_event_loop(None)
mock_event_register.assert_called_once()
@mock.patch('libvirt.virEventRegisterImpl',
side_effect=eventmock.virEventRegisterImplMock)
@unittest.skipIf(sys.version_info < (3,7), "test requires Python 3.7+")
def testEventsWithAsyncioRun(self, mock_event_register):
# Register libvirt events after starting the asyncio loop.
#
# Use asyncio helper to create and set the event loop
# against this thread.
asyncio.run(self._run(register=True))
mock_event_register.assert_called_once()
@mock.patch('libvirt.virEventRegisterImpl',
side_effect=eventmock.virEventRegisterImplMock)
def testEventsPreInitExplicit(self, mock_event_register):
# Register libvirt events before starting the asyncio loop.
#
# Tell virEventRegisterAsyncIOImpl() explicitly what loop
# to use before we set a loop for this thread.
loop = asyncio.new_event_loop()
libvirtaio.virEventRegisterAsyncIOImpl(loop)
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run(register=False))
loop.close()
asyncio.set_event_loop(None)
mock_event_register.assert_called_once()
@mock.patch('libvirt.virEventRegisterImpl',
side_effect=eventmock.virEventRegisterImplMock)
@unittest.skipIf(sys.version_info >= (3,10), "test incompatible with 3.10+")
def testEventsPreInitImplicit(self, mock_event_register):
# Register libvirt events before starting the asyncio loop.
#
# Allow virEventRegisterAsyncIOImpl() to implicitly find the
# loop we set for this thread.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
libvirtaio.virEventRegisterAsyncIOImpl()
loop.run_until_complete(self._run(register=False))
loop.close()
asyncio.set_event_loop(None)
mock_event_register.assert_called_once()
@mock.patch('libvirt.virEventRegisterImpl',
side_effect=eventmock.virEventRegisterImplMock)
@unittest.skipIf(sys.version_info >= (3,10), "test incompatible with 3.10+")
def testEventsImplicitLoopInit(self, mock_event_register):
# Register libvirt events before starting the asyncio loop.
#
# Let virEventRegisterAsyncIOImpl() auto-create a default
# event loop, which we then register against this thread.
#
# Historically this often worked if called from the main thead,
# but since Python 3.10 this triggers a deprecation warning,
# which will turn into a RuntimeError in a later release.
libvirtaio.virEventRegisterAsyncIOImpl()
loop = asyncio.get_event_loop()
loop.run_until_complete(self._run(register=False))
loop.close()
asyncio.set_event_loop(None)
mock_event_register.assert_called_once()
|