File: wrapfs.py

package info (click to toggle)
python-fs 2.4.16-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,384 kB
  • sloc: python: 13,196; makefile: 226; sh: 3
file content (547 lines) | stat: -rw-r--r-- 17,457 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
"""Base class for filesystem wrappers.
"""

from __future__ import unicode_literals

import typing

import six

from . import errors
from .base import FS
from .copy import copy_dir, copy_file
from .error_tools import unwrap_errors
from .info import Info
from .path import abspath, join, normpath

if typing.TYPE_CHECKING:
    from typing import (
        IO,
        Any,
        AnyStr,
        BinaryIO,
        Callable,
        Collection,
        Iterable,
        Iterator,
        List,
        Mapping,
        Optional,
        Text,
        Tuple,
        Union,
    )

    from datetime import datetime
    from threading import RLock

    from .enums import ResourceType
    from .info import RawInfo
    from .permissions import Permissions
    from .subfs import SubFS
    from .walk import BoundWalker

    _T = typing.TypeVar("_T", bound="FS")
    _OpendirFactory = Callable[[_T, Text], SubFS[_T]]


_F = typing.TypeVar("_F", bound="FS", covariant=True)
_W = typing.TypeVar("_W", bound="WrapFS[FS]")


@six.python_2_unicode_compatible
class WrapFS(FS, typing.Generic[_F]):
    """A proxy for a filesystem object.

    This class exposes an filesystem interface, where the data is
    stored on another filesystem(s), and is the basis for
    `~fs.subfs.SubFS` and other *virtual* filesystems.

    """

    wrap_name = None  # type: Optional[Text]

    def __init__(self, wrap_fs):  # noqa: D107
        # type: (_F) -> None
        self._wrap_fs = wrap_fs
        super(WrapFS, self).__init__()

    def __repr__(self):
        # type: () -> Text
        return "{}({!r})".format(self.__class__.__name__, self._wrap_fs)

    def __str__(self):
        # type: () -> Text
        wraps = []
        _fs = self  # type: Union[FS, WrapFS[FS]]
        while hasattr(_fs, "_wrap_fs"):
            wrap_name = getattr(_fs, "wrap_name", None)
            if wrap_name is not None:
                wraps.append(wrap_name)
            _fs = _fs._wrap_fs  # type: ignore
        if wraps:
            _str = "{}({})".format(_fs, ", ".join(wraps[::-1]))
        else:
            _str = "{}".format(_fs)
        return _str

    def delegate_path(self, path):
        # type: (Text) -> Tuple[_F, Text]
        """Encode a path for proxied filesystem.

        Arguments:
            path (str): A path on the filesystem.

        Returns:
            (FS, str): a tuple of ``(<filesystem>, <new_path>)``

        """
        return self._wrap_fs, path

    def delegate_fs(self):
        # type: () -> _F
        """Get the proxied filesystem.

        This method should return a filesystem for methods not
        associated with a path, e.g. `~fs.base.FS.getmeta`.

        """
        return self._wrap_fs

    def appendbytes(self, path, data):
        # type: (Text, bytes) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.appendbytes(_path, data)

    def appendtext(
        self,
        path,  # type: Text
        text,  # type: Text
        encoding="utf-8",  # type: Text
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
    ):
        # type: (...) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.appendtext(
                _path, text, encoding=encoding, errors=errors, newline=newline
            )

    def getinfo(self, path, namespaces=None):
        # type: (Text, Optional[Collection[Text]]) -> Info
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            raw_info = _fs.getinfo(_path, namespaces=namespaces).raw
        if abspath(normpath(path)) == "/":
            raw_info = dict(raw_info)
            raw_info["basic"]["name"] = ""  # type: ignore
        return Info(raw_info)

    def listdir(self, path):
        # type: (Text) -> List[Text]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            dir_list = _fs.listdir(_path)
        return dir_list

    def lock(self):
        # type: () -> RLock
        self.check()
        _fs = self.delegate_fs()
        return _fs.lock()

    def makedir(
        self,
        path,  # type: Text
        permissions=None,  # type: Optional[Permissions]
        recreate=False,  # type: bool
    ):
        # type: (...) -> SubFS[FS]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.makedir(_path, permissions=permissions, recreate=recreate)

    def move(self, src_path, dst_path, overwrite=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -> None
        _fs, _src_path = self.delegate_path(src_path)
        _, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            _fs.move(
                _src_path, _dst_path, overwrite=overwrite, preserve_time=preserve_time
            )

    def movedir(self, src_path, dst_path, create=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -> None
        _fs, _src_path = self.delegate_path(src_path)
        _, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            _fs.movedir(
                _src_path, _dst_path, create=create, preserve_time=preserve_time
            )

    def openbin(self, path, mode="r", buffering=-1, **options):
        # type: (Text, Text, int, **Any) -> BinaryIO
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            bin_file = _fs.openbin(_path, mode=mode, buffering=-1, **options)
        return bin_file

    def remove(self, path):
        # type: (Text) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.remove(_path)

    def removedir(self, path):
        # type: (Text) -> None
        self.check()
        _path = abspath(normpath(path))
        if _path == "/":
            raise errors.RemoveRootError()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.removedir(_path)

    def removetree(self, dir_path):
        # type: (Text) -> None
        self.check()
        _path = abspath(normpath(dir_path))
        _delegate_fs, _delegate_path = self.delegate_path(dir_path)
        with unwrap_errors(dir_path):
            if _path == "/":
                # with root path, we must remove the contents but
                # not the directory itself, so we can't just directly
                # delegate
                for info in _delegate_fs.scandir(_delegate_path):
                    info_path = join(_delegate_path, info.name)
                    if info.is_dir:
                        _delegate_fs.removetree(info_path)
                    else:
                        _delegate_fs.remove(info_path)
            else:
                _delegate_fs.removetree(_delegate_path)

    def scandir(
        self,
        path,  # type: Text
        namespaces=None,  # type: Optional[Collection[Text]]
        page=None,  # type: Optional[Tuple[int, int]]
    ):
        # type: (...) -> Iterator[Info]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            for info in _fs.scandir(_path, namespaces=namespaces, page=page):
                yield info

    def setinfo(self, path, info):
        # type: (Text, RawInfo) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        return _fs.setinfo(_path, info)

    def settimes(self, path, accessed=None, modified=None):
        # type: (Text, Optional[datetime], Optional[datetime]) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.settimes(_path, accessed=accessed, modified=modified)

    def touch(self, path):
        # type: (Text) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.touch(_path)

    def copy(self, src_path, dst_path, overwrite=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -> None
        src_fs, _src_path = self.delegate_path(src_path)
        dst_fs, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            if not overwrite and dst_fs.exists(_dst_path):
                raise errors.DestinationExists(_dst_path)
            copy_file(src_fs, _src_path, dst_fs, _dst_path, preserve_time=preserve_time)

    def copydir(self, src_path, dst_path, create=False, preserve_time=False):
        # type: (Text, Text, bool, bool) -> None
        src_fs, _src_path = self.delegate_path(src_path)
        dst_fs, _dst_path = self.delegate_path(dst_path)
        with unwrap_errors({_src_path: src_path, _dst_path: dst_path}):
            if not create and not dst_fs.exists(_dst_path):
                raise errors.ResourceNotFound(dst_path)
            if not src_fs.getinfo(_src_path).is_dir:
                raise errors.DirectoryExpected(src_path)
            copy_dir(src_fs, _src_path, dst_fs, _dst_path, preserve_time=preserve_time)

    def create(self, path, wipe=False):
        # type: (Text, bool) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.create(_path, wipe=wipe)

    def desc(self, path):
        # type: (Text) -> Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            desc = _fs.desc(_path)
        return desc

    def download(self, path, file, chunk_size=None, **options):
        # type: (Text, BinaryIO, Optional[int], **Any) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.download(_path, file, chunk_size=chunk_size, **options)

    def exists(self, path):
        # type: (Text) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            exists = _fs.exists(_path)
        return exists

    def filterdir(
        self,
        path,  # type: Text
        files=None,  # type: Optional[Iterable[Text]]
        dirs=None,  # type: Optional[Iterable[Text]]
        exclude_dirs=None,  # type: Optional[Iterable[Text]]
        exclude_files=None,  # type: Optional[Iterable[Text]]
        namespaces=None,  # type: Optional[Collection[Text]]
        page=None,  # type: Optional[Tuple[int, int]]
    ):
        # type: (...) -> Iterator[Info]
        self.check()
        _fs, _path = self.delegate_path(path)
        iter_files = iter(
            _fs.filterdir(
                _path,
                exclude_dirs=exclude_dirs,
                exclude_files=exclude_files,
                files=files,
                dirs=dirs,
                namespaces=namespaces,
                page=page,
            )
        )
        with unwrap_errors(path):
            for info in iter_files:
                yield info

    def readbytes(self, path):
        # type: (Text) -> bytes
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _bytes = _fs.readbytes(_path)
        return _bytes

    def readtext(
        self,
        path,  # type: Text
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
    ):
        # type: (...) -> Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _text = _fs.readtext(
                _path, encoding=encoding, errors=errors, newline=newline
            )
        return _text

    def getmeta(self, namespace="standard"):
        # type: (Text) -> Mapping[Text, object]
        self.check()
        meta = self.delegate_fs().getmeta(namespace=namespace)
        return meta

    def getsize(self, path):
        # type: (Text) -> int
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            size = _fs.getsize(_path)
        return size

    def getsyspath(self, path):
        # type: (Text) -> Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            sys_path = _fs.getsyspath(_path)
        return sys_path

    def gettype(self, path):
        # type: (Text) -> ResourceType
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _type = _fs.gettype(_path)
        return _type

    def geturl(self, path, purpose="download"):
        # type: (Text, Text) -> Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.geturl(_path, purpose=purpose)

    def hassyspath(self, path):
        # type: (Text) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            has_sys_path = _fs.hassyspath(_path)
        return has_sys_path

    def hasurl(self, path, purpose="download"):
        # type: (Text, Text) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            has_url = _fs.hasurl(_path, purpose=purpose)
        return has_url

    def isdir(self, path):
        # type: (Text) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _isdir = _fs.isdir(_path)
        return _isdir

    def isfile(self, path):
        # type: (Text) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _isfile = _fs.isfile(_path)
        return _isfile

    def islink(self, path):
        # type: (Text) -> bool
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _islink = _fs.islink(_path)
        return _islink

    def makedirs(
        self,
        path,  # type: Text
        permissions=None,  # type: Optional[Permissions]
        recreate=False,  # type: bool
    ):
        # type: (...) -> SubFS[FS]
        self.check()
        _fs, _path = self.delegate_path(path)
        return _fs.makedirs(_path, permissions=permissions, recreate=recreate)

    # FIXME(@althonos): line_buffering is not a FS.open declared argument
    def open(
        self,
        path,  # type: Text
        mode="r",  # type: Text
        buffering=-1,  # type: int
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
        line_buffering=False,  # type: bool
        **options  # type: Any
    ):
        # type: (...) -> IO[AnyStr]
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            open_file = _fs.open(
                _path,
                mode=mode,
                buffering=buffering,
                encoding=encoding,
                errors=errors,
                newline=newline,
                line_buffering=line_buffering,
                **options
            )
        return open_file

    def opendir(
        self,  # type: _W
        path,  # type: Text
        factory=None,  # type: Optional[_OpendirFactory]
    ):
        # type: (...) -> SubFS[_W]
        from .subfs import SubFS

        factory = factory or SubFS
        if not self.getinfo(path).is_dir:
            raise errors.DirectoryExpected(path=path)
        with unwrap_errors(path):
            return factory(self, path)

    def writebytes(self, path, contents):
        # type: (Text, bytes) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.writebytes(_path, contents)

    def upload(self, path, file, chunk_size=None, **options):
        # type: (Text, BinaryIO, Optional[int], **Any) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.upload(_path, file, chunk_size=chunk_size, **options)

    def writefile(
        self,
        path,  # type: Text
        file,  # type: IO[AnyStr]
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
    ):
        # type: (...) -> None
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.writefile(
                _path, file, encoding=encoding, errors=errors, newline=newline
            )

    def validatepath(self, path):
        # type: (Text) -> Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            _fs.validatepath(_path)
        path = abspath(normpath(path))
        return path

    def hash(self, path, name):
        # type: (Text, Text) -> Text
        self.check()
        _fs, _path = self.delegate_path(path)
        with unwrap_errors(path):
            return _fs.hash(_path, name)

    @property
    def walk(self):
        # type: () -> BoundWalker
        return self._wrap_fs.walker_class.bind(self)