File: tempdirectory.py

package info (click to toggle)
python-testfixtures 9.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,036 kB
  • sloc: python: 10,373; makefile: 76; sh: 9
file content (404 lines) | stat: -rw-r--r-- 14,006 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
import atexit
import os
import warnings
from pathlib import Path
from re import compile
from tempfile import mkdtemp
from types import TracebackType
from typing import Sequence, Callable, TypeAlias, Self

from testfixtures.comparison import compare
from testfixtures.utils import wrap
from .rmtree import rmtree

PathStrings: TypeAlias = str | Sequence[str]


class TempDirectory:
    """
    A class representing a temporary directory on disk.

    :param ignore: A sequence of strings containing regular expression
                   patterns that match filenames that should be
                   ignored by the :class:`TempDirectory` listing and
                   checking methods.

    :param create: If `True`, the temporary directory will be created
                   as part of class instantiation.

    :param path: If passed, this should be a string containing an
                 absolute path to use as the temporary directory. When
                 passed, :class:`TempDirectory` will not create a new
                 directory to use.

    :param encoding: A default encoding to use for :meth:`read` and
                     :meth:`write` operations when the ``encoding`` parameter
                     is not passed to those methods.

    :param cwd: If ``True``, set the current working directory to be that of the temporary directory
                when used as a decorator or context manager.
    """

    instances = set['TempDirectory']()
    atexit_setup = False

    #: The absolute path of the :class:`TempDirectory` on disk
    path = None

    def __init__(
            self,
            path: str | Path | None = None,
            *,
            ignore: Sequence[str] = (),
            create: bool | None = None,
            encoding: str | None = None,
            cwd: bool = False,
    ):
        self.ignore = []
        for regex in ignore:
            self.ignore.append(compile(regex))
        self.path = str(path) if path else None
        self.encoding = encoding
        self.cwd = cwd
        self.original_cwd: str | None = None
        self.dont_remove = bool(path)
        if create or (path is None and create is None):
            self.create()

    @classmethod
    def atexit(cls) -> None:
        if cls.instances:
            warnings.warn(
                'TempDirectory instances not cleaned up by shutdown:\n'
                '%s' % ('\n'.join(i.path for i in cls.instances if i.path))
                )

    def create(self) -> 'TempDirectory':
        """
        Create a temporary directory for this instance to use if one
        has not already been created.
        """
        if self.path:
            return self
        self.path = mkdtemp()
        self.instances.add(self)
        if not self.__class__.atexit_setup:
            atexit.register(self.atexit)
            self.__class__.atexit_setup = True
        if self.cwd:
            self.original_cwd = os.getcwd()
            os.chdir(self.path)
        return self

    def cleanup(self) -> None:
        """
        Delete the temporary directory and anything in it.
        This :class:`TempDirectory` cannot be used again unless
        :meth:`create` is called.
        """
        if self.cwd and self.original_cwd:
            os.chdir(self.original_cwd)
            self.original_cwd = None
        if self.path and os.path.exists(self.path) and not self.dont_remove:
            rmtree(self.path)
            del self.path
        if self in self.instances:
            self.instances.remove(self)

    @classmethod
    def cleanup_all(cls) -> None:
        """
        Delete all temporary directories associated with all
        :class:`TempDirectory` objects.
        """
        for i in tuple(cls.instances):
            i.cleanup()

    def actual(
            self,
            path: PathStrings | None = None,
            recursive: bool = False,
            files_only: bool = False,
            followlinks: bool = False,
    ) -> list[str]:
        path = self._join(path)

        result: list[str] = []
        if recursive:
            for dirpath, dirnames, filenames in os.walk(path, followlinks=followlinks):
                dirpath = '/'.join(dirpath[len(path)+1:].split(os.sep))
                if dirpath:
                    dirpath += '/'

                for dirname in dirnames:
                    if not files_only:
                        result.append(dirpath+dirname+'/')

                for name in sorted(filenames):
                    result.append(dirpath+name)
        else:
            for n in os.listdir(path):
                result.append(n)

        filtered = []
        for result_path in sorted(result):
            ignore = False
            for regex in self.ignore:
                if regex.search(result_path):
                    ignore = True
                    break
            if ignore:
                continue
            filtered.append(result_path)
        return filtered

    def listdir(self, path: PathStrings | None = None, recursive: bool = False) -> None:
        """
        Print the contents of the specified directory.

        :param path: The path to list, which can be:

                     * `None`, indicating the root of the temporary
                       directory should be listed.

                     * A tuple of strings, indicating that the
                       elements of the tuple should be used as directory
                       names to traverse from the root of the
                       temporary directory to find the directory to be
                       listed.

                     * A forward-slash separated string, indicating
                       the directory or subdirectory that should be
                       traversed to from the temporary directory and
                       listed.

        :param recursive: If `True`, the directory specified will have
                          its subdirectories recursively listed too.
        """
        actual = self.actual(path, recursive)
        if not actual:
            print('No files or directories found.')
        for n in actual:
            print(n)

    def compare(
            self,
            expected: Sequence[str],
            path: PathStrings | None = None,
            files_only: bool = False,
            recursive: bool = True,
            followlinks: bool = False,
    ) -> None:
        """
        Compare the expected contents with the actual contents of the temporary
        directory. An :class:`AssertionError` will be raised if they are not the
        same.

        :param expected: A sequence of strings containing the paths
                         expected in the directory. These paths should
                         be forward-slash separated and relative to
                         the root of the temporary directory.

        :param path: The path to use as the root for the comparison,
                     relative to the root of the temporary directory.
                     This can either be:

                     * A tuple of strings, making up the relative path.

                     * A forward-slash separated string.

                     If it is not provided, the root of the temporary
                     directory will be used.

        :param files_only: If specified, directories will be excluded from
                           the list of actual paths used in the comparison.

        :param recursive: If ``False``, only the direct contents of
                          the directory specified by ``path`` will be included
                          in the actual contents used for comparison.

        :param followlinks: If ``True``, symlinks and hard links
                            will be followed when recursively building up
                            the actual list of directory contents.
        """

        __tracebackhide__ = True
    
        compare(expected=sorted(expected),
                actual=tuple(self.actual(
                    path, recursive, files_only, followlinks
                )),
                recursive=False)

    def _join(self, parts: str | Sequence[str] | None) -> str:
        if self.path is None:
            raise RuntimeError('Instantiated with create=False and .create() not called')
        # make things platform independent
        if parts is None:
            return self.path
        if isinstance(parts, str):
            parts = parts.split('/')
        relative = os.sep.join(parts).rstrip(os.sep)
        if relative.startswith(os.sep):
            if relative.startswith(self.path):
                return relative
            raise ValueError(
                'Attempt to read or write outside the temporary Directory'
                )
        return os.path.join(self.path, relative)

    def makedir(self, dirpath: PathStrings) -> str:
        """
        Make an empty directory at the specified path within the
        temporary directory. Any intermediate subdirectories that do
        not exist will also be created.

        :param dirpath: The directory to create, which can be:

                        * A tuple of strings.

                        * A forward-slash separated string.

        :returns: The absolute path of the created directory.
        """
        thepath = self._join(dirpath)
        os.makedirs(thepath)
        return thepath

    def write(self, filepath: PathStrings, data: str | bytes, encoding: str | None = None) -> str:
        """
        Write the supplied data to a file at the specified path within
        the temporary directory. Any subdirectories specified that do
        not exist will also be created.

        The file will always be written in binary mode. The data supplied must
        either be bytes or an encoding must be supplied to convert the string
        into bytes.

        :param filepath: The path to the file to create, which can be:

                         * A tuple of strings.

                         * A forward-slash separated string.

        :param data:

          :class:`bytes` containing the data to be written, or a :class:`str`
          if ``encoding`` has been supplied.

        :param encoding: The encoding to be used if data is not bytes. Should
                         not be passed if data is already bytes.

        :returns: The absolute path of the file written.
        """
        filepath_parts = filepath.split('/') if isinstance(filepath, str) else filepath
        if len(filepath_parts) > 1:
            dirpath = self._join(filepath_parts[:-1])
            if not os.path.exists(dirpath):
                os.makedirs(dirpath)
        thepath = self._join(filepath_parts)
        encoding = encoding or self.encoding
        if encoding is not None:
            if isinstance(data, bytes):
                raise TypeError('Cannot specify encoding when data is bytes')
            data = data.encode(encoding)
        elif isinstance(data, str):
            data = data.encode()
        with open(thepath, 'wb') as f:
            f.write(data)
        return thepath

    def as_string(self, path: str | Sequence[str] | None = None) -> str:
        """
        Return the full path on disk that corresponds to the path
        relative to the temporary directory that is passed in.

        :param path: The path to the file to create, which can be:

                     * A tuple of strings.

                     * A forward-slash separated string.

        :returns: A string containing the absolute path.

        """
        return self._join(path)

    #: .. deprecated:: 7
    #:
    #:   Use :meth:`as_string` instead.
    getpath = as_string

    def as_path(self, path: PathStrings | None = None) -> Path:
        """
        Return the :class:`~pathlib.Path` that corresponds to the path
        relative to the temporary directory that is passed in.

        :param path: The path to the file to create, which can be:

                     * A tuple of strings.

                     * A forward-slash separated string.
        """
        return Path(self._join(path))

    def __truediv__(self, other: str) -> Path:
        return self.as_path() / other

    def read(self, filepath: PathStrings, encoding: str | None = None) -> str | bytes:
        """
        Reads the file at the specified path within the temporary
        directory.

        The file is always read in binary mode. Bytes will be returned unless
        an encoding is supplied, in which case a unicode string of the decoded
        data will be returned.

        :param filepath: The path to the file to read, which can be:

                         * A tuple of strings.

                         * A forward-slash separated string.

        :param encoding: The encoding used to decode the data in the file.

        :returns:

          The contents of the file as a :class:`str` or :class:`bytes`, if ``encoding``
          is not specified.
        """
        with open(self._join(filepath), 'rb') as f:
            data = f.read()
        encoding = encoding or self.encoding
        if encoding is not None:
            return data.decode(encoding)
        return data

    def __enter__(self) -> Self:
        return self

    def __exit__(
            self,
            exc_type: type[BaseException] | None,
            value: BaseException | None,
            traceback: TracebackType | None,
    ) -> None:
        self.cleanup()


def tempdir(
        path: str | Path | None = None,
        *,
        ignore: Sequence[str] = (),
        encoding: str | None = None,
        cwd: bool = False,
) -> Callable[[Callable], Callable]:
    """
    A decorator for making a :class:`TempDirectory` available for the
    duration of a test function.

    All arguments and parameters are passed through to the
    :class:`TempDirectory` constructor.
    """
    l = TempDirectory(path, ignore=ignore, encoding=encoding, cwd=cwd, create=False)
    return wrap(l.create, l.cleanup)