1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
import pytest
try:
from hypothesis import given, strategies as st, settings, example
except ImportError:
pytest.skip("hypothesis required")
import os
from pypy.module._io.interp_bytesio import W_BytesIO
from pypy.module._io.interp_textio import (W_TextIOWrapper, DecodeBuffer,
SEEN_CR, SEEN_LF)
# workaround suggestion for slowness by David McIver:
# force hypothesis to initialize some lazy stuff
# (which takes a lot of time, which trips the timer otherwise)
st.text().example()
def translate_newlines(text):
text = text.replace(u'\r\n', u'\n')
text = text.replace(u'\r', u'\n')
return text.replace(u'\n', os.linesep)
@st.composite
def st_readline(draw, st_nlines=st.integers(min_value=0, max_value=10)):
n_lines = draw(st_nlines)
fragments = []
limits = []
for _ in range(n_lines):
line = draw(st.text(st.characters(blacklist_characters=u'\r\n')))
fragments.append(line)
ending = draw(st.sampled_from([u'\n', u'\r', u'\r\n']))
fragments.append(ending)
limit = draw(st.integers(min_value=0, max_value=len(line) + 5))
limits.append(limit)
limits.append(-1)
return (u''.join(fragments), limits)
def test_newlines_bug(space):
import _io
w_stream = W_BytesIO(space)
w_stream.descr_init(space, space.newbytes(b"a\nb\nc\r"))
w_textio = W_TextIOWrapper(space)
w_textio.descr_init(
space, w_stream,
encoding='utf-8', w_errors=space.newtext('surrogatepass'),
w_newline=None)
w_textio.read_w(space)
assert w_textio.w_decoder.seennl == SEEN_LF | SEEN_CR
@given(data=st_readline(),
mode=st.sampled_from(['\r', '\n', '\r\n', '']))
@settings(deadline=None, database=None)
@example(data=(u'\n\r\n', [0, -1, 2, -1, 0, -1]), mode='\r')
def test_readline(space, data, mode):
txt, limits = data
w_stream = W_BytesIO(space)
w_stream.descr_init(space, space.newbytes(txt.encode('utf-8')))
w_textio = W_TextIOWrapper(space)
w_textio.descr_init(
space, w_stream,
encoding='utf-8', w_errors=space.newtext('surrogatepass'),
w_newline=space.newtext(mode))
lines = []
for limit in limits:
w_line = w_textio.readline_w(space, space.newint(limit))
line = space.utf8_w(w_line).decode('utf-8')
if limit >= 0:
assert len(line) <= limit
if line:
lines.append(line)
elif limit:
break
assert txt.startswith(u''.join(lines))
@given(data=st_readline())
@settings(deadline=None, database=None)
@example(data=(u'\n\r\n', [0, -1, 2, -1, 0, -1]))
def test_readline_none(space, data):
txt, limits = data
w_stream = W_BytesIO(space)
w_stream.descr_init(space, space.newbytes(txt.encode('utf-8')))
w_textio = W_TextIOWrapper(space)
w_textio.descr_init(
space, w_stream,
encoding='utf-8', w_errors=space.newtext('surrogatepass'),
w_newline=space.w_None)
lines = []
for limit in limits:
w_line = w_textio.readline_w(space, space.newint(limit))
line = space.utf8_w(w_line).decode('utf-8')
if limit >= 0:
assert len(line) <= limit
if line:
lines.append(line)
elif limit:
break
output = txt.replace("\r\n", "\n").replace("\r", "\n")
assert output.startswith(u''.join(lines))
@given(st.text())
def test_read_buffer(text):
buf = DecodeBuffer(text.encode('utf8'), len(text))
chars, size = buf.get_chars(-1)
assert chars.decode('utf8') == text
assert len(text) == size
assert buf.exhausted()
@given(st.text(), st.lists(st.integers(min_value=0)))
@example(u'\x80', [1])
def test_readn_buffer(text, sizes):
buf = DecodeBuffer(text.encode('utf8'), len(text))
strings = []
for n in sizes:
chars, size = buf.get_chars(n)
s = chars.decode('utf8')
assert size == len(s)
if not buf.exhausted():
assert len(s) == n
else:
assert len(s) <= n
strings.append(s)
assert ''.join(strings) == text[:sum(sizes)]
@given(st.text())
@example(u'\x800')
def test_next_char(text):
buf = DecodeBuffer(text.encode('utf8'), len(text))
chars = []
try:
while True:
ch = buf.next_char().decode('utf8')
chars.append(ch)
except StopIteration:
pass
assert buf.exhausted()
assert u''.join(chars) == text
|