File: iterableio.py

package info (click to toggle)
python-iterable-io 1.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 92 kB
  • sloc: python: 228; makefile: 4
file content (168 lines) | stat: -rw-r--r-- 6,624 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python

import io


class RawIterableReader(io.RawIOBase):
    """A io.RawIOBase implemention for an iterable of bytes

    In most cases, this class should not be used directly. See the included
    `open_iterable` function for a high-level interface.
    """

    def __init__(self, iterable):
        self._iter = iter(iterable)
        self._extra = bytearray()
        self._total = 0

    def readable(self):
        return True

    def close(self):
        self._iter = None
        super().close()

    def tell(self):
        """The total number of bytes that have been read"""
        self._checkClosed()
        return self._total - len(self._extra)

    def readinto(self, b):
        """Read bytes into a pre-allocated bytes-like object b

        Returns the number of bytes read, 0 indicates EOF
        """
        self._checkClosed()
        num = len(b)
        if self._iter is not None:
            while len(self._extra) < num:
                try:
                    new = next(self._iter)
                except StopIteration:
                    self._iter = None
                    break
                else:
                    self._total += len(new)
                    self._extra += new

        ret, self._extra = self._extra[:num], self._extra[num:]

        lret = len(ret)
        b[:lret] = ret
        return lret


def open_iterable(iterable, mode="r", buffering=-1, encoding=None, errors=None, newline=None):
    """Open an iterable of bytes to read from it using a file-like interface

    The `iterable` must be an iterable of bytes.

    mode is an optional string that specifies the mode in which the file is
    opened. It defaults to 'rt' which means open for reading in text mode. In
    text mode, if encoding is not specified the encoding used is platform
    dependent. (For reading raw bytes use binary mode and leave encoding
    unspecified.) The available modes are:

    ========= ===============================================================
    Character Meaning
    --------- ---------------------------------------------------------------
    'r'       open for reading (default)
    'b'       binary mode
    't'       text mode (default)
    ========= ===============================================================

    Iterables opened in binary mode (appending 'b' to the mode argument) return
    contents as bytes objects without any decoding. In text mode (the default),
    the contents of the iterable are returned as strings, the bytes having been
    first decoded using a platform-dependent encoding or using the specified
    encoding if given.

    buffering is an optional integer used to set the buffering policy. Pass 0
    to switch buffering off (only allowed in binary mode), and an integer > 0
    to indicate the size of a fixed-size chunk buffer. When no buffering
    argument is given, `io.DEFAULT_BUFFER_SIZE` will be used. On many systems,
    the buffer will typically be 4096 or 8192 bytes long.

    encoding is the str name of the encoding used to decode or encode the
    file. This should only be used in text mode. The default encoding is
    platform dependent, but any encoding supported by Python can be
    passed. See the codecs module for the list of supported encodings.

    errors is an optional string that specifies how encoding errors are to
    be handled---this argument should not be used in binary mode. Pass
    'strict' to raise a ValueError exception if there is an encoding error
    (the default of None has the same effect), or pass 'ignore' to ignore
    errors. Note that ignoring encoding errors can lead to data loss.
    See the documentation for codecs.register for a list of the permitted
    encoding error strings.

    newline is a string controlling how universal newlines works (it only
    applies to text mode). It can be None, '', '\n', '\r', and '\r\n'. It works
    as follows:

    * On input, if newline is None, universal newlines mode is
      enabled. Lines in the input can end in '\n', '\r', or '\r\n', and
      these are translated into '\n' before being returned to the
      caller. If it is '', universal newline mode is enabled, but line
      endings are returned to the caller untranslated. If it has any of
      the other legal values, input lines are only terminated by the given
      string, and the line ending is returned to the caller untranslated.

    * On output, if newline is None, any '\n' characters written are
      translated to the system default line separator, os.linesep. If
      newline is '', no translation takes place. If newline is any of the
      other legal values, any '\n' characters written are translated to
      the given string.

    open_iterable() returns a file object whose type depends on the mode, and
    through which the standard file operations such as read() are performed.
    When open_iterable() is used to open an iterable in a text mode ('rt'), it
    returns an io.TextIOWrapper. When used to open an iterable in a binary
    mode, the returned class varies: For unbuffered access, a RawIterableReader
    is returned and in buffered mode it returns an io.BufferedReader.
    """
    # This function is modeled after `io.open`, found in `Lib/_pyio.py`

    modes = set(mode)
    if modes - set("rtb") or len(mode) > len(modes):
        raise ValueError("invalid mode: '{}'".format(mode))

    reading = "r" in modes
    binary = "b" in modes
    text = "t" in modes or (reading and not binary)

    if not reading:
        raise ValueError("Must specify read mode")
    if text and binary:
        raise ValueError("can't have text and binary mode at once")
    if binary and encoding is not None:
        raise ValueError("binary mode doesn't take an encoding argument")
    if binary and errors is not None:
        raise ValueError("binary mode doesn't take an errors argument")
    if binary and newline is not None:
        raise ValueError("binary mode doesn't take a newline argument")
    if text and buffering == 0:
        raise ValueError("can't have unbuffered text I/O")

    ret = RawIterableReader(iterable)
    try:
        if buffering == 0:
            # unbuffered binary mode
            return ret

        if buffering < 0:
            buffering = io.DEFAULT_BUFFER_SIZE

        ret = io.BufferedReader(ret, buffering)

        if binary:
            # buffered binary mode
            return ret

        # buffered text mode
        ret = io.TextIOWrapper(ret, encoding, errors, newline)
        ret.mode = mode
        return ret
    except:
        ret.close()
        raise