File: parser.py

package info (click to toggle)
python-aiocsv 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 220 kB
  • sloc: python: 899; ansic: 714; makefile: 5
file content (308 lines) | stat: -rw-r--r-- 11,093 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# © Copyright 2020-2025 Mikołaj Kuranowski
# SPDX-License-Identifier: MIT
# pyright: reportUnnecessaryComparison=false

import csv
import sys
from enum import IntEnum, auto
from typing import Any, AsyncIterator, Awaitable, Final, Generator, List, Optional, Sequence, Union

from .protocols import DialectLike, WithAsyncRead


class ParserState(IntEnum):
    START_RECORD = auto()
    START_FIELD = auto()
    IN_FIELD = auto()
    ESCAPE = auto()
    IN_QUOTED_FIELD = auto()
    ESCAPE_IN_QUOTED = auto()
    QUOTE_IN_QUOTED = auto()
    EAT_NEWLINE = auto()

    def is_end_of_record(self) -> bool:
        return self is ParserState.START_RECORD or self is ParserState.EAT_NEWLINE

    def is_unexpected_at_eof(self) -> bool:
        return self in (
            ParserState.ESCAPE,
            ParserState.IN_QUOTED_FIELD,
            ParserState.ESCAPE_IN_QUOTED,
        )


class Decision(IntEnum):
    CONTINUE = auto()
    DONE = auto()
    DONE_WITHOUT_CONSUMING = auto()


QUOTE_MINIMAL = csv.QUOTE_MINIMAL
QUOTE_ALL = csv.QUOTE_ALL
QUOTE_NONNUMERIC = csv.QUOTE_NONNUMERIC
QUOTE_NONE = csv.QUOTE_NONE
if sys.version_info >= (3, 12):
    QUOTE_STRINGS = csv.QUOTE_STRINGS
    QUOTE_NOTNULL = csv.QUOTE_NOTNULL
else:
    QUOTE_STRINGS: Final = 4
    QUOTE_NOTNULL: Final = 5


class Parser:
    def __init__(self, reader: WithAsyncRead, dialect: DialectLike) -> None:
        self.dialect = dialect
        self.reader = reader

        self.current_read: Optional[Generator[Any, None, str]] = None
        self.buffer: str = ""
        self.eof: bool = False
        self.line_num: int = 0

        self.state = ParserState.START_RECORD
        self.record_so_far: List[str] = []
        self.field_so_far: List[str] = []
        self.field_limit: int = csv.field_size_limit()
        self.field_was_quoted: bool = False
        self.last_char_was_cr: bool = False

    # AsyncIterator[List[str]] interface

    def __aiter__(self) -> AsyncIterator[List[str]]:
        return self

    def __anext__(self) -> Awaitable[List[str]]:
        return self

    # Awaitable[List[str]] interface

    def __await__(self) -> Generator[Any, None, List[str]]:
        return self  # type: ignore

    # Generator[Any, None, List[str]] interface

    def __iter__(self) -> Generator[Any, None, List[str]]:
        return self  # type: ignore

    def __next__(self) -> Any:
        # Loop until a record has been successfully parsed or EOF has been hit
        record: Optional[List[str]] = None
        while record is None and (self.buffer or not self.eof):
            # No pending read and no data available - initiate one
            if not self.buffer and self.current_read is None:
                self.current_read = self.reader.read(4096).__await__()

            # Await on the pending read
            if self.current_read is not None:
                try:
                    return next(self.current_read)
                except StopIteration as e:
                    assert not self.buffer, "a read was pending even though data was available"
                    self.current_read.close()
                    self.current_read = None
                    self.buffer = e.value
                    self.eof = not e.value

            # Advance parsing
            record = self.try_parse()

        # Generate a row, or stop iteration altogether
        if record is None:
            raise StopAsyncIteration
        else:
            raise StopIteration(record)

    # Straightforward parser interface

    def try_parse(self) -> Optional[List[str]]:
        decision = Decision.CONTINUE

        while decision is Decision.CONTINUE and self.buffer:
            decision = self.process_char(self.get_char_and_increment_line_num())
            if decision is not Decision.DONE_WITHOUT_CONSUMING:
                self.buffer = self.buffer[1:]

        if decision is not Decision.CONTINUE or (self.eof and not self.state.is_end_of_record()):
            if self.dialect.strict and self.state.is_unexpected_at_eof():
                raise csv.Error("unexpected end of data")
            self.add_field_at_eof()
            return self.extract_record()
        else:
            return None

    def process_char(self, c: str) -> Decision:
        if self.state == ParserState.START_RECORD:
            return self.process_char_in_start_record(c)
        elif self.state == ParserState.START_FIELD:
            return self.process_char_in_start_field(c)
        elif self.state == ParserState.ESCAPE:
            return self.process_char_in_escape(c)
        elif self.state == ParserState.IN_FIELD:
            return self.process_char_in_field(c)
        elif self.state == ParserState.IN_QUOTED_FIELD:
            return self.process_char_in_quoted_field(c)
        elif self.state == ParserState.ESCAPE_IN_QUOTED:
            return self.process_char_in_escape_in_quoted(c)
        elif self.state == ParserState.QUOTE_IN_QUOTED:
            return self.process_char_in_quote_in_quoted(c)
        elif self.state == ParserState.EAT_NEWLINE:
            return self.process_char_in_eat_newline(c)
        else:
            raise RuntimeError(f"unhandled parser state: {self.state}")

    def process_char_in_start_record(self, c: str) -> Decision:
        if c == "\r":
            self.state = ParserState.EAT_NEWLINE
            return Decision.CONTINUE
        elif c == "\n":
            self.state = ParserState.START_RECORD
            return Decision.DONE
        else:
            return self.process_char_in_start_field(c)

    def process_char_in_start_field(self, c: str) -> Decision:
        if c == "\r":
            self.save_field()
            self.state = ParserState.EAT_NEWLINE
        elif c == "\n":
            self.save_field()
            self.state = ParserState.START_RECORD
            return Decision.DONE
        elif c == self.dialect.quotechar and self.dialect.quoting != QUOTE_NONE:
            self.field_was_quoted = True
            self.state = ParserState.IN_QUOTED_FIELD
        elif c == self.dialect.escapechar:
            self.state = ParserState.ESCAPE
        # XXX: skipinitialspace handling is done in save_field()
        elif c == self.dialect.delimiter:
            self.save_field()
            self.state = ParserState.START_FIELD
        else:
            self.add_char(c)
            self.state = ParserState.IN_FIELD
        return Decision.CONTINUE

    def process_char_in_escape(self, c: str) -> Decision:
        self.add_char(c)
        self.state = ParserState.IN_FIELD
        return Decision.CONTINUE

    def process_char_in_field(self, c: str) -> Decision:
        if c == "\r":
            self.save_field()
            self.state = ParserState.EAT_NEWLINE
        elif c == "\n":
            self.save_field()
            self.state = ParserState.START_RECORD
            return Decision.DONE
        elif c == self.dialect.escapechar:
            self.state = ParserState.ESCAPE
        elif c == self.dialect.delimiter:
            self.save_field()
            self.state = ParserState.START_FIELD
        else:
            self.add_char(c)
        return Decision.CONTINUE

    def process_char_in_quoted_field(self, c: str) -> Decision:
        if c == self.dialect.escapechar:
            self.state = ParserState.ESCAPE_IN_QUOTED
        elif c == self.dialect.quotechar and self.dialect.quoting != QUOTE_NONE:
            # XXX: Is this check for quoting necessary?
            if self.dialect.doublequote:
                self.state = ParserState.QUOTE_IN_QUOTED
            else:
                self.state = ParserState.IN_FIELD
        else:
            self.add_char(c)
        return Decision.CONTINUE

    def process_char_in_escape_in_quoted(self, c: str) -> Decision:
        self.add_char(c)
        self.state = ParserState.IN_QUOTED_FIELD
        return Decision.CONTINUE

    def process_char_in_quote_in_quoted(self, c: str) -> Decision:
        if c == self.dialect.quotechar and self.dialect.quoting != QUOTE_NONE:
            # XXX: Is this check for quoting necessary?
            self.add_char(c)
            self.state = ParserState.IN_QUOTED_FIELD
        elif c == self.dialect.delimiter:
            self.save_field()
            self.state = ParserState.START_FIELD
        elif c == "\r":
            self.save_field()
            self.state = ParserState.EAT_NEWLINE
        elif c == "\n":
            self.save_field()
            self.state = ParserState.START_RECORD
            return Decision.DONE
        elif not self.dialect.strict:
            self.add_char(c)
            self.state = ParserState.IN_FIELD
        else:
            raise csv.Error(
                f"{self.dialect.delimiter!r} expected after {self.dialect.quotechar!r}",
            )
        return Decision.CONTINUE

    def process_char_in_eat_newline(self, c: str) -> Decision:
        self.state = ParserState.START_RECORD
        return Decision.DONE if c == "\n" else Decision.DONE_WITHOUT_CONSUMING

    def add_char(self, c: str) -> None:
        if len(self.field_so_far) == self.field_limit:
            raise csv.Error(f"field larger than field limit ({self.field_limit})")
        self.field_so_far.append(c)

    def save_field(self) -> None:
        field: Union[str, float, None]
        if self.dialect.skipinitialspace:
            field = "".join(self.field_so_far[self.find_first_non_space(self.field_so_far) :])
        else:
            field = "".join(self.field_so_far)

        # Handle unquoted fields for special quote modes
        if self.dialect.quoting == QUOTE_NONNUMERIC and not self.field_was_quoted:
            field = float(field) if field else ""
        elif self.dialect.quoting == QUOTE_STRINGS and not self.field_was_quoted:
            field = float(field) if field else None
        elif self.dialect.quoting == QUOTE_NOTNULL and not self.field_was_quoted:
            field = field if field else None

        self.field_was_quoted = False
        self.record_so_far.append(field)  # type: ignore
        self.field_so_far.clear()

    def add_field_at_eof(self) -> None:
        # Decide if self.record_so_far needs to be added at an EOF
        if self.state in (ParserState.ESCAPE, ParserState.ESCAPE_IN_QUOTED):
            self.add_char("\n")
        if not self.state.is_end_of_record():
            self.save_field()

    def extract_record(self) -> List[str]:
        r = self.record_so_far.copy()
        self.record_so_far.clear()
        return r

    def get_char_and_increment_line_num(self) -> str:
        c = self.buffer[0]
        if c == "\r":
            self.line_num += 1
            self.last_char_was_cr = True
        elif c == "\n":
            if self.last_char_was_cr:
                self.last_char_was_cr = False
            else:
                self.line_num += 1
        else:
            self.last_char_was_cr = False
        return c

    @staticmethod
    def find_first_non_space(x: Sequence[str]) -> int:
        for i, c in enumerate(x):
            if not c.isspace():
                return i
        return len(x)