File: ftp.py

package info (click to toggle)
universal-pathlib 0.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,656 kB
  • sloc: python: 20,552; makefile: 5
file content (67 lines) | stat: -rw-r--r-- 1,878 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from __future__ import annotations

import sys
from ftplib import error_perm as FTPPermanentError  # nosec B402
from typing import TYPE_CHECKING

from upath.core import UPath
from upath.types import UNSET_DEFAULT
from upath.types import JoinablePathLike

if TYPE_CHECKING:
    from typing import Any
    from typing import Literal

    if sys.version_info >= (3, 11):
        from typing import Self
        from typing import Unpack
    else:
        from typing_extensions import Self
        from typing_extensions import Unpack

    from upath._chain import FSSpecChainParser
    from upath.types import WritablePathLike
    from upath.types.storage_options import FTPStorageOptions

__all__ = ["FTPPath"]


class FTPPath(UPath):
    __slots__ = ()

    if TYPE_CHECKING:

        def __init__(
            self,
            *args: JoinablePathLike,
            protocol: Literal["ftp"] | None = ...,
            chain_parser: FSSpecChainParser = ...,
            **storage_options: Unpack[FTPStorageOptions],
        ) -> None: ...

    def mkdir(
        self,
        mode: int = 0o777,
        parents: bool = False,
        exist_ok: bool = False,
    ) -> None:
        try:
            return super().mkdir(mode, parents, exist_ok)
        except FTPPermanentError as e:
            if e.args[0].startswith("550") and exist_ok:
                return
            raise FileExistsError(str(self)) from e

    def rename(
        self,
        target: WritablePathLike,
        *,  # note: non-standard compared to pathlib
        recursive: bool = UNSET_DEFAULT,
        maxdepth: int | None = UNSET_DEFAULT,
        **kwargs: Any,
    ) -> Self:
        t = super().rename(target, recursive=recursive, maxdepth=maxdepth, **kwargs)
        self_dir = self.parent.path
        t.fs.invalidate_cache(self_dir)
        self.fs.invalidate_cache(self_dir)
        return t