1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
#!/usr/bin/env python
import io
from iterableio import RawIterableReader, open_iterable
import pytest
@pytest.mark.parametrize("mode, buffering, encoding, errors, newline",[
# bad modes
("", -1, None, None, None),
("abc", -1, None, None, None),
("rtb", -1, None, None, None),
("rt", 0, None, None, None), # need buffering
("rt", "bad int", None, None, None), # invalid buffering int
# can't provide text decoding params in binary mode
("rb", 0, "utf-8", None, None),
("rb", 0, None, "ignore", None),
("rb", 0, None, None, "\n"),
])
def test_invalid_input(mode, buffering, encoding, errors, newline):
"""Test that invalid params are caught"""
with pytest.raises((ValueError, TypeError, LookupError)):
open_iterable([], mode, buffering, encoding, errors, newline)
@pytest.mark.parametrize("buffering", (0, -1, 1))
def test_reading(buffering):
def gen():
yield from (
b'\x01\x02\x03\x04\x05',
b"abcde",
b"fghij",
b"klmno",
b"qrstu",
b"vwxyz",
b'\x06\x07\x08\x09\x10',
)
_data = b"".join(gen())
with open_iterable(gen(), "rb", buffering=buffering) as i:
assert i.readable()
assert not i.seekable()
assert not i.writable()
cnt = 0
for amt in (0, 1, 2, 3, 4, 5, 10, 1, 1, 0):
d = i.read(amt)
assert len(d) == amt
assert d == _data[cnt:cnt+amt]
cnt += amt
assert i.tell() == cnt
assert i.read() == _data[cnt:]
assert i.read() == b""
assert i.tell() == len(_data)
def test_returned_class():
"""Test that the correct class is returned depending on the mode and buffering spec"""
assert isinstance(open_iterable([], "rb", buffering=0), RawIterableReader)
assert isinstance(open_iterable([], "rb", buffering=-1), io.BufferedReader)
assert isinstance(open_iterable([], "rb", buffering=1), io.BufferedReader)
assert isinstance(open_iterable([], "rt", buffering=-1), io.TextIOWrapper)
assert isinstance(open_iterable([], "rt", buffering=1), io.TextIOWrapper)
@pytest.mark.parametrize("mode, buffering",[
("rb", 0),
("rb", -1),
("rt", -1),
])
def test_contextmgr_close(mode, buffering):
with open_iterable([], mode, buffering) as i:
assert not i.closed
assert i.closed
@pytest.mark.parametrize("mode, buffering",[
("rb", 0),
("rb", -1),
("rt", -1),
])
def test_unreadable_after_close(mode, buffering):
i = open_iterable([b"12345"], mode, buffering)
assert not i.read(0)
assert i.read(1) in (b"1", "1")
assert not i.closed
i.close()
assert i.closed
with pytest.raises(ValueError, match="closed"):
i.read()
with pytest.raises(ValueError, match="closed"):
i.tell()
def test_yield_empty_bytes():
"""Test that a generator is only 'done' when it stops yielding, not when it yields empty bytes"""
def gen():
yield from (
b"1",
b"", b"", b"", b"", b"", b"", b"",
b"2", b"3",
b"", b"", b"", b"", b"", b"",
b"4",
)
i = RawIterableReader(gen())
out = []
while True:
b = i.read(1)
if not b:
break
out.append(b)
assert len(out) == 4
assert b"".join(out) == b"1234"
def test_read_text():
def gen():
# 9 lines yielded in non-line chunks
yield from (
x.encode("utf-8") for x in (
"this is a line\n",
"",
"",
"_a",
"another line\n",
"another line1\n",
"another line2\n",
"another line_",
"a",
"aaaaaaa\nbbbbbbbb",
"_",
"1",
"2",
"3",
"4",
"5",
"_line line line another line actually\n",
"another line\n",
"ending line\n",
"actual ending line no trailing newline",
)
)
real = "".join(x.decode("utf-8") for x in gen())
# read across chunks and lines
with open_iterable(gen(), encoding="utf-8") as i:
assert i.read(10) == real[:10]
assert i.read(10) == real[10:20]
with open_iterable(gen(), encoding="utf-8") as i:
lines = list(i)
with open_iterable(gen(), encoding="utf-8") as i:
assert lines == i.readlines()
assert len(lines) == len(real.splitlines()) == 9
assert "".join(lines) == real
|