File: test_interp_textio.py

package info (click to toggle)
pypy3 7.3.11%2Bdfsg-2%2Bdeb12u3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 201,024 kB
  • sloc: python: 1,950,308; ansic: 517,580; sh: 21,417; asm: 14,419; cpp: 4,263; makefile: 4,228; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 11; awk: 4
file content (134 lines) | stat: -rw-r--r-- 4,431 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import pytest
try:
    from hypothesis import given, strategies as st, settings, example
except ImportError:
    pytest.skip("hypothesis required")
import os
from pypy.module._io.interp_bytesio import W_BytesIO
from pypy.module._io.interp_textio import (W_TextIOWrapper, DecodeBuffer,
        SEEN_CR, SEEN_LF)

# workaround suggestion for slowness by David McIver:
# force hypothesis to initialize some lazy stuff
# (which takes a lot of time, which trips the timer otherwise)
st.text().example()

def translate_newlines(text):
    text = text.replace(u'\r\n', u'\n')
    text = text.replace(u'\r', u'\n')
    return text.replace(u'\n', os.linesep)

@st.composite
def st_readline(draw, st_nlines=st.integers(min_value=0, max_value=10)):
    n_lines = draw(st_nlines)
    fragments = []
    limits = []
    for _ in range(n_lines):
        line = draw(st.text(st.characters(blacklist_characters=u'\r\n')))
        fragments.append(line)
        ending = draw(st.sampled_from([u'\n', u'\r', u'\r\n']))
        fragments.append(ending)
        limit = draw(st.integers(min_value=0, max_value=len(line) + 5))
        limits.append(limit)
        limits.append(-1)
    return (u''.join(fragments), limits)

def test_newlines_bug(space):
    import _io
    w_stream = W_BytesIO(space)
    w_stream.descr_init(space, space.newbytes(b"a\nb\nc\r"))
    w_textio = W_TextIOWrapper(space)
    w_textio.descr_init(
        space, w_stream,
        encoding='utf-8', w_errors=space.newtext('surrogatepass'),
        w_newline=None)
    w_textio.read_w(space)
    assert w_textio.w_decoder.seennl == SEEN_LF | SEEN_CR

@given(data=st_readline(),
       mode=st.sampled_from(['\r', '\n', '\r\n', '']))
@settings(deadline=None, database=None)
@example(data=(u'\n\r\n', [0, -1, 2, -1, 0, -1]), mode='\r')
def test_readline(space, data, mode):
    txt, limits = data
    w_stream = W_BytesIO(space)
    w_stream.descr_init(space, space.newbytes(txt.encode('utf-8')))
    w_textio = W_TextIOWrapper(space)
    w_textio.descr_init(
        space, w_stream,
        encoding='utf-8', w_errors=space.newtext('surrogatepass'),
        w_newline=space.newtext(mode))
    lines = []
    for limit in limits:
        w_line = w_textio.readline_w(space, space.newint(limit))
        line = space.utf8_w(w_line).decode('utf-8')
        if limit >= 0:
            assert len(line) <= limit
        if line:
            lines.append(line)
        elif limit:
            break
    assert txt.startswith(u''.join(lines))

@given(data=st_readline())
@settings(deadline=None, database=None)
@example(data=(u'\n\r\n', [0, -1, 2, -1, 0, -1]))
def test_readline_none(space, data):
    txt, limits = data
    w_stream = W_BytesIO(space)
    w_stream.descr_init(space, space.newbytes(txt.encode('utf-8')))
    w_textio = W_TextIOWrapper(space)
    w_textio.descr_init(
        space, w_stream,
        encoding='utf-8', w_errors=space.newtext('surrogatepass'),
        w_newline=space.w_None)
    lines = []
    for limit in limits:
        w_line = w_textio.readline_w(space, space.newint(limit))
        line = space.utf8_w(w_line).decode('utf-8')
        if limit >= 0:
            assert len(line) <= limit
        if line:
            lines.append(line)
        elif limit:
            break
    output = txt.replace("\r\n", "\n").replace("\r", "\n")
    assert output.startswith(u''.join(lines))

@given(st.text())
def test_read_buffer(text):
    buf = DecodeBuffer(text.encode('utf8'), len(text))
    chars, size = buf.get_chars(-1)
    assert chars.decode('utf8') == text
    assert len(text) == size
    assert buf.exhausted()

@given(st.text(), st.lists(st.integers(min_value=0)))
@example(u'\x80', [1])
def test_readn_buffer(text, sizes):
    buf = DecodeBuffer(text.encode('utf8'), len(text))
    strings = []
    for n in sizes:
        chars, size = buf.get_chars(n)
        s = chars.decode('utf8')
        assert size == len(s)
        if not buf.exhausted():
            assert len(s) == n
        else:
            assert len(s) <= n
        strings.append(s)
    assert ''.join(strings) == text[:sum(sizes)]

@given(st.text())
@example(u'\x800')
def test_next_char(text):
    buf = DecodeBuffer(text.encode('utf8'), len(text))
    chars = []
    try:
        while True:
            ch = buf.next_char().decode('utf8')
            chars.append(ch)
    except StopIteration:
        pass
    assert buf.exhausted()
    assert u''.join(chars) == text