File: ftpfs.py

package info (click to toggle)
python-fs 2.4.16-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,944 kB
  • sloc: python: 13,048; makefile: 226; sh: 3
file content (57 lines) | stat: -rw-r--r-- 1,597 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# coding: utf-8
"""`FTPFS` opener definition.
"""

from __future__ import absolute_import, print_function, unicode_literals

import typing

from ..errors import CreateFailed
from .base import Opener
from .registry import registry

if typing.TYPE_CHECKING:
    from typing import Text, Union

    from ..ftpfs import FTPFS  # noqa: F401
    from ..subfs import SubFS
    from .parse import ParseResult


@registry.install
class FTPOpener(Opener):
    """`FTPFS` opener."""

    protocols = ["ftp", "ftps"]

    @CreateFailed.catch_all
    def open_fs(
        self,
        fs_url,  # type: Text
        parse_result,  # type: ParseResult
        writeable,  # type: bool
        create,  # type: bool
        cwd,  # type: Text
    ):
        # type: (...) -> Union[FTPFS, SubFS[FTPFS]]
        from ..ftpfs import FTPFS
        from ..subfs import ClosingSubFS

        ftp_host, _, dir_path = parse_result.resource.partition("/")
        ftp_host, _, ftp_port = ftp_host.partition(":")
        ftp_port = int(ftp_port) if ftp_port.isdigit() else 21
        ftp_fs = FTPFS(
            ftp_host,
            port=ftp_port,
            user=parse_result.username,
            passwd=parse_result.password,
            proxy=parse_result.params.get("proxy"),
            timeout=int(parse_result.params.get("timeout", "10")),
            tls=bool(parse_result.protocol == "ftps"),
        )
        if dir_path:
            if create:
                ftp_fs.makedirs(dir_path, recreate=True)
            return ftp_fs.opendir(dir_path, factory=ClosingSubFS)
        else:
            return ftp_fs