File: test_bufferedreader.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (116 lines) | stat: -rw-r--r-- 3,565 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import io
from cffi import FFI

import pytest
from hypothesis import strategies as st
from hypothesis import given, assume, settings
from hypothesis.stateful import (
    RuleBasedStateMachine, Bundle, rule, run_state_machine_as_test, precondition)
ffi = FFI()

MAX_READ_SIZE = 1024
MIN_READ_SIZE = 1
MAX_SIZE = 0xfff

@st.composite
def data_and_sizes(draw, reads=st.lists(st.integers(MIN_READ_SIZE, MAX_READ_SIZE))):
    reads = draw(reads)
    total_size = sum(reads)
    assume(0 < total_size < MAX_SIZE)
    data = draw(st.binary(min_size=total_size, max_size=total_size))
    return data, reads

class Stream(io.RawIOBase):
    def __init__(self, data, read_sizes):
        assert sum(read_sizes) == len(data)
        self.data = data
        self.n = 0
        self.read_sizes = iter(read_sizes)
        self.partial_read = 0

    def readinto(self, buf):
        if self.n == len(self.data):
            return 0
        if self.partial_read:
            read_size = self.partial_read
        else:
            read_size = next(self.read_sizes)
        if len(buf) < read_size:
            self.partial_read = read_size - len(buf)
            read_size = len(buf)
        else:
            self.partial_read = 0
        self.update_buffer(buf, self.data[self.n:self.n + read_size])
        self.n += read_size
        return read_size

    def update_buffer(self, buf, data):
        n = len(data)
        buf[:n] = data

    def readable(self):
        return True

class StreamCFFI(Stream):
    def update_buffer(self, buf, data):
        n = len(data)
        ffi.buffer(ffi.from_buffer(buf), n)[:] = data


@pytest.mark.parametrize('StreamCls', [Stream, StreamCFFI])
@given(params=data_and_sizes(), chunk_size=st.integers(MIN_READ_SIZE, 8192))
def test_buf(params, chunk_size, StreamCls):
    data, sizes = params
    stream = StreamCls(data, sizes)
    assert io.BufferedReader(stream, chunk_size).read(len(data)) == data

class StateMachine(RuleBasedStateMachine):
    def __init__(self, stream, reference):
        super().__init__()
        self.stream = stream
        self.reference = reference

    @rule(size=st.integers(MIN_READ_SIZE, MAX_READ_SIZE))
    def read(self, size):
        expected = self.reference.read(size)
        assert self.stream.read(size) == expected

    @rule(size=st.integers(MIN_READ_SIZE, MAX_READ_SIZE))
    def readinto(self, size):
        expected = self.reference.read(size)
        buf = bytearray(size)
        n = self.stream.readinto(buf)
        assert buf[:n] == expected

    @rule()
    def readline(self):
        expected = self.reference.readline(80)
        assert self.stream.readline(80) == expected

@pytest.mark.parametrize('StreamCls', [Stream, StreamCFFI])
@settings(max_examples=30, deadline=None)
@given(params=data_and_sizes(), chunk_size=st.integers(MIN_READ_SIZE, 8192))
def test_stateful(params, chunk_size, StreamCls):
    data, sizes = params
    raw_stream = StreamCls(data, sizes)
    reference = io.BytesIO(data)
    stream = io.BufferedReader(raw_stream, chunk_size)
    sm = StateMachine(stream, reference)
    run_state_machine_as_test(lambda: sm)


def test_buffered_reader_concurrency_bug():
    import subprocess
    import threading
    import sys
    if sys.platform == 'win32':
        cmd = "cmd /c color"
    else:
        cmd = "true"
    process=subprocess.Popen(cmd, stdin=subprocess.PIPE,stdout=subprocess.PIPE)
    threading.Timer(1, process.communicate).start()
    while 1:
        try:
            process.stdout.read(1)
        except ValueError:
            break