1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
import os
import unittest
import time
import overrides_hack
from utils import create_sparse_tempfile, TestTags, tag_test
from gi.repository import BlockDev, GLib
class LoopTestCase(unittest.TestCase):
requested_plugins = BlockDev.plugin_specs_from_names(("loop",))
@classmethod
def setUpClass(cls):
if not BlockDev.is_initialized():
BlockDev.init(cls.requested_plugins, None)
else:
BlockDev.reinit(cls.requested_plugins, True, None)
def setUp(self):
self.addCleanup(self._clean_up)
self.dev_file = create_sparse_tempfile("loop_test", 1024**3)
self.loop = None
def _clean_up(self):
try:
BlockDev.loop_teardown(self.loop)
except:
pass
os.unlink(self.dev_file)
class LoopTestSetupBasic(LoopTestCase):
@tag_test(TestTags.CORE)
def testLoop_setup_teardown_basic(self):
"""Verify that basic loop_setup and loop_teardown work as expected"""
succ, self.loop = BlockDev.loop_setup(self.dev_file)
self.assertTrue(succ)
self.assertTrue(self.loop)
succ = BlockDev.loop_teardown(self.loop)
self.assertTrue(succ)
# give kernel+udev time to update stuff under /sys and try to get the
# backing file -- there should be none after the teardown
time.sleep(1)
b_file = BlockDev.loop_get_backing_file(self.loop)
self.assertIsNone(b_file)
class LoopTestSetupOffset(LoopTestCase):
def testLoop_setup_with_offset(self):
"""Verify that loop_setup with offset specified works as expected"""
# now test with the offset
succ, self.loop = BlockDev.loop_setup(self.dev_file, 10 * 1024**2)
self.assertTrue(succ)
self.assertTrue(self.loop)
# should have smaller size due to the offset
with open("/sys/block/%s/size" % self.loop, "r") as f:
size = int(f.read()) * 512
self.assertEqual(size, 1024**3 - 10 * 1024 **2)
succ = BlockDev.loop_teardown(self.loop)
self.assertTrue(succ)
class LoopTestSetupOffsetSize(LoopTestCase):
def testLoop_setup_with_offset_and_size(self):
"""Verify that loop_setup with offset and size specified works as expected"""
# now test with the offset and size
succ, self.loop = BlockDev.loop_setup(self.dev_file, 10 * 1024**2, 50 * 1024**2)
self.assertTrue(succ)
self.assertTrue(self.loop)
# should have size as specified
with open("/sys/block/%s/size" % self.loop, "r") as f:
size = int(f.read()) * 512
self.assertEqual(size, 50 * 1024**2)
succ = BlockDev.loop_teardown(self.loop)
self.assertTrue(succ)
class LoopTestSetupReadOnly(LoopTestCase):
def testLoop_setup_read_only(self):
"""Verify that loop_setup with read_only specified works as expected"""
# test read-only
succ, self.loop = BlockDev.loop_setup(self.dev_file, 0, 0, True)
self.assertTrue(succ)
self.assertTrue(self.loop)
# should be read-only
with open("/sys/block/%s/ro" % self.loop, "r") as f:
self.assertEqual(f.read().strip(), "1")
# XXX: any sane way how to test part_probe=True/False?
class LoopTestGetLoopName(LoopTestCase):
@tag_test(TestTags.CORE)
def testLoop_get_loop_name(self):
"""Verify that loop_get_loop_name works as expected"""
self.assertIs(BlockDev.loop_get_loop_name("/non/existing"), None)
succ, self.loop = BlockDev.loop_setup(self.dev_file)
ret_loop = BlockDev.loop_get_loop_name(self.dev_file)
self.assertEqual(ret_loop, self.loop)
class LoopTestGetBackingFile(LoopTestCase):
@tag_test(TestTags.CORE)
def testLoop_get_backing_file(self):
"""Verify that loop_get_backing_file works as expected"""
self.assertIs(BlockDev.loop_get_backing_file("/non/existing"), None)
succ, self.loop = BlockDev.loop_setup(self.dev_file)
f_name = BlockDev.loop_get_backing_file(self.loop)
self.assertEqual(f_name, self.dev_file)
class LoopTestGetSetAutoclear(LoopTestCase):
def testLoop_get_set_autoclear(self):
"""Verify that getting and setting the autoclear flag works as expected"""
with self.assertRaises(GLib.Error):
BlockDev.loop_get_autoclear("/non/existing")
with self.assertRaises(GLib.Error):
BlockDev.loop_set_autoclear("/non/existing", True)
succ, self.loop = BlockDev.loop_setup(self.dev_file)
self.assertFalse(BlockDev.loop_get_autoclear(self.loop))
# open the loop device so that it doesn't disappear once we set
# autoclear to True (it's otherwise not being used so it may get cleared
# automatically)
fd = os.open("/dev/" + self.loop, os.O_RDONLY)
self.addCleanup(os.close, fd)
self.assertTrue(BlockDev.loop_set_autoclear(self.loop, True))
self.assertTrue(BlockDev.loop_get_autoclear(self.loop))
self.assertTrue(BlockDev.loop_set_autoclear(self.loop, False))
self.assertFalse(BlockDev.loop_get_autoclear(self.loop))
# now the same, but with the "/dev/" prefix
loop = "/dev/" + self.loop
self.assertTrue(BlockDev.loop_set_autoclear(loop, True))
self.assertTrue(BlockDev.loop_get_autoclear(loop))
self.assertTrue(BlockDev.loop_set_autoclear(loop, False))
self.assertFalse(BlockDev.loop_get_autoclear(loop))
|