1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
from os import pipe
from gevent import os
from greentest import TestCase, main
from gevent import Greenlet, joinall
try:
import fcntl
except ImportError:
fcntl = None
try:
import errno
except ImportError:
errno = None
class TestOS_tp(TestCase):
__timeout__ = 5
def pipe(self):
return pipe()
def read(self, *args):
return os.tp_read(*args)
def write(self, *args):
return os.tp_write(*args)
def test_if_pipe_blocks(self):
r, w = self.pipe()
# set nbytes such that for sure it is > maximum pipe buffer
nbytes = 1000000
block = 'x' * 4096
buf = buffer(block)
# Lack of "nonlocal" keyword in Python 2.x:
bytesread = [0]
byteswritten = [0]
def produce():
while byteswritten[0] != nbytes:
bytesleft = nbytes - byteswritten[0]
byteswritten[0] += self.write(w, buf[:min(bytesleft, 4096)])
def consume():
while bytesread[0] != nbytes:
bytesleft = nbytes - bytesread[0]
bytesread[0] += len(self.read(r, min(bytesleft, 4096)))
producer = Greenlet(produce)
producer.start()
consumer = Greenlet(consume)
consumer.start_later(1)
# If patching was not succesful, the producer will have filled
# the pipe before the consumer starts, and would block the entire
# process. Therefore the next line would never finish.
joinall([producer, consumer])
assert bytesread[0] == nbytes
assert bytesread[0] == byteswritten[0]
if hasattr(os, 'make_nonblocking'):
class TestOS_nb(TestOS_tp):
def pipe(self):
r, w = pipe()
os.make_nonblocking(r)
os.make_nonblocking(w)
return r, w
def read(self, *args):
return os.nb_read(*args)
def write(self, *args):
return os.nb_write(*args)
if __name__ == '__main__':
main()
|