File: baseclass.py

package info (click to toggle)
pyparted 3.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 952 kB
  • sloc: ansic: 7,453; python: 4,579; makefile: 91; sh: 4
file content (257 lines) | stat: -rw-r--r-- 7,473 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#
# Copyright The pyparted Project Authors
# SPDX-License-Identifier: GPL-2.0-or-later
#

import _ped
import parted
import os
import tempfile
import unittest

# Base class for any test case that requires a temp device node
class RequiresDeviceNode(unittest.TestCase):
    def setUp(self):
        super().setUp()

        self.temp_prefix = "temp-device-"
        (self.fd, self.path) = tempfile.mkstemp(prefix=self.temp_prefix)
        self.f = os.fdopen(self.fd)
        self.f.seek(140000)
        os.write(self.fd, b"0")

    def tearDown(self):
        os.close(self.fd)

        if self.path and os.path.exists(self.path):
            os.unlink(self.path)

        self.fd = None
        self.path = None
        self.temp_prefix = None


# Base class for any test case that requires a _ped.Device or parted.Device
# object first.
class RequiresDevice(RequiresDeviceNode):
    def setUp(self):
        super().setUp()
        self.addCleanup(self.removeDevice)
        self._device = _ped.device_get(self.path)
        self.device = parted.getDevice(self.path)

    def removeDevice(self):
        self.device = None
        self._device = None


# Base class for any test case that requires a filesystem on a device.
class RequiresFileSystem(unittest.TestCase):
    def setUp(self):
        super().setUp()

        self._fileSystemType = {}
        ty = _ped.file_system_type_get_next()
        self._fileSystemType[ty.name] = ty

        while True:
            try:
                ty = _ped.file_system_type_get_next(ty)
                self._fileSystemType[ty.name] = ty
            except (IndexError, TypeError, _ped.UnknownTypeException):
                break

        self.temp_prefix = "temp-device-"
        (
            self.fd,
            self.path,
        ) = tempfile.mkstemp(prefix=self.temp_prefix)
        self.f = os.fdopen(self.fd)
        self.f.seek(140000)
        os.write(self.fd, b"0")
        self.f.close()

        os.system("mke2fs -F -q %s" % (self.path,))

        self._device = _ped.device_get(self.path)
        self._geometry = _ped.Geometry(self._device, 0, self._device.length - 1)

    def tearDown(self):
        if self.path and os.path.exists(self.path):
            os.unlink(self.path)

        self.mountpoint = None


# Base class for certain alignment tests that require a _ped.Device
class RequiresDeviceAlignment(RequiresDevice):
    def roundDownTo(self, sector, grain_size):
        if sector < 0:
            shift = sector % grain_size + grain_size
        else:
            shift = sector % grain_size

        return sector - shift

    def roundUpTo(self, sector, grain_size):
        if sector % grain_size:
            return self.roundDownTo(sector, grain_size) + grain_size
        else:
            return sector

    def closestInsideGeometry(self, alignment, geometry, sector):
        if alignment.grain_size == 0:
            if alignment.is_aligned(geometry, sector) and (
                (geometry is None) or geometry.test_sector_inside(sector)
            ):
                return sector
            else:
                return -1

        if sector < geometry.start:
            sector += self.roundUpTo(geometry.start - sector, alignment.grain_size)

        if sector > geometry.end:
            sector -= self.roundUpTo(sector - geometry.end, alignment.grain_size)

        if not geometry.test_sector_inside(sector):
            return -1

        return sector

    def closest(self, sector, a, b):
        if a == -1:
            return b

        if b == -1:
            return a

        if abs(sector - a) < abs(sector - b):
            return a
        else:
            return b


# Base class for any test case that requires a labeled device
class RequiresLabeledDevice(RequiresDevice):
    def setUp(self):
        super().setUp()
        os.system("parted -s %s mklabel msdos" % (self.path,))


# Base class for any test case that requires a _ped.Disk or parted.Disk.
class RequiresDisk(RequiresDevice):
    def setUp(self):
        super().setUp()
        self._disk = _ped.disk_new_fresh(self._device, _ped.disk_type_get("msdos"))
        self.disk = parted.Disk(PedDisk=self._disk)

    def reopen(self):
        self._disk = _ped.disk_new(self._device)
        self.disk = parted.Disk(PedDisk=self._disk)


# Base class for any test case that requires a GPT-labeled _ped.Disk or parted.Disk.
class RequiresGPTDisk(RequiresDevice):
    def setUp(self):
        super().setUp()
        self._disk = _ped.disk_new_fresh(self._device, _ped.disk_type_get("gpt"))
        self.disk = parted.Disk(PedDisk=self._disk)

    def reopen(self):
        self._disk = _ped.disk_new(self._device)
        self.disk = parted.Disk(PedDisk=self._disk)


# Base class for any test case that requires a filesystem made and mounted.
class RequiresMount(RequiresDevice):
    def setUp(self):
        super().setUp()
        self.addCleanup(self.removeMountpoint)
        self.mountpoint = None

    def mkfs(self):
        os.system("mkfs.ext2 -F -q %s" % self.path)

    def doMount(self):
        self.mountpoint = tempfile.mkdtemp()
        os.system("mount -o loop %s %s" % (self.path, self.mountpoint))

    def removeMountpoint(self):
        if self.mountpoint and os.path.exists(self.mountpoint):
            os.system("umount %s" % self.mountpoint)
            os.rmdir(self.mountpoint)


# Base class for any test case that requires a _ped.Partition.
class RequiresPartition(RequiresDisk):
    def setUp(self):
        super().setUp()
        self._part = _ped.Partition(
            disk=self._disk,
            type=_ped.PARTITION_NORMAL,
            start=1,
            end=100,
            fs_type=_ped.file_system_type_get("ext2"),
        )

    def reopen(self):
        super().reopen()
        self._part = self._disk.next_partition(self._disk.next_partition())


# Base class for any test case that requires a _ped.Partition on GPT disk.
class RequiresGPTPartition(RequiresGPTDisk):
    def setUp(self):
        super().setUp()
        self._part = _ped.Partition(
            disk=self._disk,
            type=_ped.PARTITION_NORMAL,
            start=0,
            end=100,
            fs_type=_ped.file_system_type_get("ext2"),
        )

    def reopen(self):
        super().reopen()
        self._part = self._disk.next_partition()


# Base class for any test case that requires a hash table of all
# _ped.DiskType objects available
class RequiresDiskTypes(unittest.TestCase):
    def setUp(self):
        super().setUp()
        self.disktype = {}
        ty = _ped.disk_type_get_next()
        self.disktype[ty.name] = ty

        while True:
            try:
                ty = _ped.disk_type_get_next(ty)
                self.disktype[ty.name] = ty
            except (IndexError, TypeError, _ped.UnknownTypeException):
                break


# Base class for any test case that requires a list being built via successive
# calls of some function.  The function must raise IndexError when there's no
# more output to add to the return list.  This class is most useful for all
# those _get_next methods.
class BuildList:
    def getDeviceList(self, func):
        lst = []
        prev = None

        while True:
            try:
                if not prev:
                    prev = func()
                else:
                    prev = func(prev)

                lst.append(prev)
            except IndexError:
                break

        return lst