1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
from __future__ import annotations
import io
from typing import Any
import numpy as np
import numpy.typing as npt
from ._parser import R_INT_NA, AltRepConstructorMap, Parser
class ParserASCII(Parser):
"""Parser for data in ASCII format."""
def __init__(
self,
data: memoryview,
*,
expand_altrep: bool,
altrep_constructor_dict: AltRepConstructorMap,
) -> None:
super().__init__(
expand_altrep=expand_altrep,
altrep_constructor_dict=altrep_constructor_dict,
)
self.file = io.TextIOWrapper(io.BytesIO(data), encoding="ascii")
def _readline(self) -> str:
r"""Read a line without trailing \n."""
return self.file.readline()[:-1]
def _parse_array_values(
self,
dtype: npt.DTypeLike,
length: int,
) -> npt.NDArray[Any]:
array = np.empty(length, dtype=dtype)
value: int | float | complex
for i in range(length):
line = self._readline()
if np.issubdtype(dtype, np.integer):
value = R_INT_NA if line == "NA" else int(line)
elif np.issubdtype(dtype, np.floating):
value = float(line)
elif np.issubdtype(dtype, np.complexfloating):
line2 = self._readline()
value = complex(float(line), float(line2))
else:
msg = f"Unknown dtype: {dtype}"
raise ValueError(msg)
array[i] = value
return array
def parse_string(self, length: int) -> bytes:
# Non-ascii characters in strings are written using octal byte codes,
# for example, a string 'aä' (2 chars) in UTF-8 is written as an ascii
# string r'a\303\244' (9 chars). We want to transform this to a byte
# string b'a\303\244' (3 bytes) corresponding to the byte
# representation of the original UTF-8 string.
# Let's use this string as an example to go through the code below
# Read the ascii string
s = self._readline()
# Now s = r'a\303\244' (9 chars)
# Convert characters to bytes (all characters are ascii)
b = s.encode("ascii")
# Now b = br'a\303\244' (9 bytes)
# There is a special 'unicode_escape' encoding that does
# basically two things here:
# 1) interpret e.g. br'\303' (4 bytes) as a single byte b'\303'
# 2) decode so-transformed byte string to a string with latin1 encoding
s = b.decode("unicode_escape")
# Now s = 'aä' (3 chars)
# We don't really want the latter latin1 decoding step done by
# the previous line of code, so we undo it by encoding in latin1
# back to bytes
b = s.encode("latin1")
# Now b = b'a\303\244' (3 bytes)
# We return this byte representation here. Later in the code there
# will be the decoding step from b'a\303\244' to 'aä',
# that is, s = b.decode('utf8')
assert len(b) == length
return b
def check_complete(self) -> None:
assert self.file.read(1) == ""
|