File: _ascii.py

package info (click to toggle)
python-rdata 0.11.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 740 kB
  • sloc: python: 2,388; makefile: 22
file content (98 lines) | stat: -rw-r--r-- 3,159 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from __future__ import annotations

import io
from typing import Any

import numpy as np
import numpy.typing as npt

from ._parser import R_INT_NA, AltRepConstructorMap, Parser


class ParserASCII(Parser):
    """Parser for data in ASCII format."""

    def __init__(
        self,
        data: memoryview,
        *,
        expand_altrep: bool,
        altrep_constructor_dict: AltRepConstructorMap,
    ) -> None:
        super().__init__(
            expand_altrep=expand_altrep,
            altrep_constructor_dict=altrep_constructor_dict,
        )
        self.file = io.TextIOWrapper(io.BytesIO(data), encoding="ascii")

    def _readline(self) -> str:
        r"""Read a line without trailing \n."""
        return self.file.readline()[:-1]

    def _parse_array_values(
            self,
            dtype: npt.DTypeLike,
            length: int,
    ) -> npt.NDArray[Any]:

        array = np.empty(length, dtype=dtype)
        value: int | float | complex

        for i in range(length):
            line = self._readline()

            if np.issubdtype(dtype, np.integer):
                value = R_INT_NA if line == "NA" else int(line)

            elif np.issubdtype(dtype, np.floating):
                value = float(line)

            elif np.issubdtype(dtype, np.complexfloating):
                line2 = self._readline()
                value = complex(float(line), float(line2))

            else:
                msg = f"Unknown dtype: {dtype}"
                raise ValueError(msg)

            array[i] = value

        return array

    def parse_string(self, length: int) -> bytes:
        # Non-ascii characters in strings are written using octal byte codes,
        # for example, a string 'aä' (2 chars) in UTF-8 is written as an ascii
        # string r'a\303\244' (9 chars). We want to transform this to a byte
        # string b'a\303\244' (3 bytes) corresponding to the byte
        # representation of the original UTF-8 string.
        # Let's use this string as an example to go through the code below

        # Read the ascii string
        s = self._readline()
        # Now s = r'a\303\244' (9 chars)

        # Convert characters to bytes (all characters are ascii)
        b = s.encode("ascii")
        # Now b = br'a\303\244' (9 bytes)

        # There is a special 'unicode_escape' encoding that does
        # basically two things here:
        # 1) interpret e.g. br'\303' (4 bytes) as a single byte b'\303'
        # 2) decode so-transformed byte string to a string with latin1 encoding
        s = b.decode("unicode_escape")
        # Now s = 'aä' (3 chars)

        # We don't really want the latter latin1 decoding step done by
        # the previous line of code, so we undo it by encoding in latin1
        # back to bytes
        b = s.encode("latin1")
        # Now b = b'a\303\244' (3 bytes)

        # We return this byte representation here. Later in the code there
        # will be the decoding step from b'a\303\244' to 'aä',
        # that is, s = b.decode('utf8')
        assert len(b) == length
        return b

    def check_complete(self) -> None:
        assert self.file.read(1) == ""