File: _ftp_parse.py

package info (click to toggle)
python-fs 2.4.16-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,944 kB
  • sloc: python: 13,048; makefile: 226; sh: 3
file content (201 lines) | stat: -rw-r--r-- 5,098 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from __future__ import absolute_import, print_function, unicode_literals

import re
import time
import unicodedata
from datetime import datetime

try:
    from datetime import timezone
except ImportError:
    from ._tzcompat import timezone  # type: ignore

from .enums import ResourceType
from .permissions import Permissions

EPOCH_DT = datetime.fromtimestamp(0, timezone.utc)


RE_LINUX = re.compile(
    r"""
    ^
    ([-dlpscbD])
    ([r-][w-][xsS-][r-][w-][xsS-][r-][w-][xtT-][\.\+]?)
    \s+?
    (\d+)
    \s+?
    ([A-Za-z0-9][A-Za-z0-9\-\.\_\@]*\$?)
    \s+?
    ([A-Za-z0-9][A-Za-z0-9\-\.\_\@]*\$?)
    \s+?
    (\d+)
    \s+?
    (\w{3}\s+\d{1,2}\s+[\w:]+)
    \s+
    (.*?)
    $
    """,
    re.VERBOSE,
)


RE_WINDOWSNT = re.compile(
    r"""
    ^
    (?P<modified_date>\S+)
    \s+
    (?P<modified_time>\S+(AM|PM)?)
    \s+
    (?P<size>(<DIR>|\d+))
    \s+
    (?P<name>.*)
    $
    """,
    re.VERBOSE,
)


def get_decoders():
    """Return all available FTP LIST line decoders with their matching regexes."""
    decoders = [
        (RE_LINUX, decode_linux),
        (RE_WINDOWSNT, decode_windowsnt),
    ]
    return decoders


def parse(lines):
    info = []
    for line in lines:
        if not line.strip():
            continue
        raw_info = parse_line(line)
        if raw_info is not None:
            info.append(raw_info)
    return info


def parse_line(line):
    for line_re, decode_callable in get_decoders():
        match = line_re.match(line)
        if match is not None:
            return decode_callable(line, match)
    return None


def _parse_time(t, formats):
    for frmt in formats:
        try:
            _t = time.strptime(t, frmt)
            break
        except ValueError:
            continue
    else:
        return None

    year = _t.tm_year if _t.tm_year != 1900 else time.localtime().tm_year
    month = _t.tm_mon
    day = _t.tm_mday
    hour = _t.tm_hour
    minutes = _t.tm_min
    dt = datetime(year, month, day, hour, minutes, tzinfo=timezone.utc)

    epoch_time = (dt - EPOCH_DT).total_seconds()
    return epoch_time


def _decode_linux_time(mtime):
    return _parse_time(mtime, formats=["%b %d %Y", "%b %d %H:%M"])


def decode_linux(line, match):
    ty, perms, links, uid, gid, size, mtime, name = match.groups()
    is_link = ty == "l"
    is_dir = ty == "d" or is_link
    if is_link:
        name, _, _link_name = name.partition("->")
        name = name.strip()
        _link_name = _link_name.strip()
    permissions = Permissions.parse(perms)

    mtime_epoch = _decode_linux_time(mtime)

    name = unicodedata.normalize("NFC", name)

    raw_info = {
        "basic": {"name": name, "is_dir": is_dir},
        "details": {
            "size": int(size),
            "type": int(ResourceType.directory if is_dir else ResourceType.file),
        },
        "access": {"permissions": permissions.dump()},
        "ftp": {"ls": line},
    }
    access = raw_info["access"]
    details = raw_info["details"]
    if mtime_epoch is not None:
        details["modified"] = mtime_epoch

    access["user"] = uid
    access["group"] = gid

    return raw_info


def _decode_windowsnt_time(mtime):
    return _parse_time(mtime, formats=["%d-%m-%y %I:%M%p", "%d-%m-%y %H:%M"])


def decode_windowsnt(line, match):
    """Decode a Windows NT FTP LIST line.

    Examples:
        Decode a directory line::

            >>> line = "11-02-18  02:12PM       <DIR>          images"
            >>> match = RE_WINDOWSNT.match(line)
            >>> pprint(decode_windowsnt(line, match))
            {'basic': {'is_dir': True, 'name': 'images'},
             'details': {'modified': 1518358320.0, 'type': 1},
             'ftp': {'ls': '11-02-18  02:12PM       <DIR>          images'}}

        Decode a file line::

            >>> line = "11-02-18  03:33PM                 9276 logo.gif"
            >>> match = RE_WINDOWSNT.match(line)
            >>> pprint(decode_windowsnt(line, match))
            {'basic': {'is_dir': False, 'name': 'logo.gif'},
             'details': {'modified': 1518363180.0, 'size': 9276, 'type': 2},
             'ftp': {'ls': '11-02-18  03:33PM                 9276 logo.gif'}}

        Alternatively, the time might also be present in 24-hour format::

            >>> line = "11-02-18  15:33                   9276 logo.gif"
            >>> match = RE_WINDOWSNT.match(line)
            >>> decode_windowsnt(line, match)["details"]["modified"]
            1518363180.0

    """
    is_dir = match.group("size") == "<DIR>"

    raw_info = {
        "basic": {
            "name": match.group("name"),
            "is_dir": is_dir,
        },
        "details": {
            "type": int(ResourceType.directory if is_dir else ResourceType.file),
        },
        "ftp": {"ls": line},
    }

    if not is_dir:
        raw_info["details"]["size"] = int(match.group("size"))

    modified = _decode_windowsnt_time(
        match.group("modified_date") + " " + match.group("modified_time")
    )
    if modified is not None:
        raw_info["details"]["modified"] = modified

    return raw_info