1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
from __future__ import with_statement
from contextlib import contextmanager
import gevent
from gevent.event import Event
from time import time
SMALL = 0.1
FUZZY = SMALL / 2
# setting up signal does not affect join()
gevent.signal(1, lambda: None) # wouldn't work on windows
@contextmanager
def expected_time(expected, fuzzy=None):
if fuzzy is None:
fuzzy = expected / 2.
start = time()
yield
elapsed = time() - start
assert expected - fuzzy <= elapsed <= expected + fuzzy, 'Expected: %r; elapsed: %r' % (expected, elapsed)
def no_time(fuzzy=0.001):
return expected_time(0, fuzzy=fuzzy)
for _a in xrange(2):
# exiting because the spawned greenlet finished execution (spawn (=callback) variant)
for _ in xrange(2):
x = gevent.spawn(lambda: 5)
with no_time(SMALL):
result = gevent.wait(timeout=10)
assert result is True, repr(result)
assert x.dead, x
assert x.value == 5, x
# exiting because the spawned greenlet finished execution (spawn_later (=timer) variant)
for _ in xrange(2):
x = gevent.spawn_later(SMALL, lambda: 5)
with expected_time(SMALL):
result = gevent.wait(timeout=10)
assert result is True, repr(result)
assert x.dead, x
# exiting because of timeout (the spawned greenlet still runs)
for _ in xrange(2):
x = gevent.spawn_later(10, lambda: 5)
with expected_time(SMALL):
result = gevent.wait(timeout=SMALL)
assert result is False, repr(result)
assert not x.dead, x
x.kill()
with no_time():
result = gevent.wait()
assert result is True
# exiting because of event (the spawned greenlet still runs)
for _ in xrange(2):
x = gevent.spawn_later(10, lambda: 5)
event = Event()
event_set = gevent.spawn_later(SMALL, event.set)
with expected_time(SMALL):
result = gevent.wait([event])
assert result == [event], repr(result)
assert not x.dead, x
assert event_set.dead
assert event.is_set()
x.kill()
with no_time():
result = gevent.wait()
assert result is True
# checking "ref=False" argument
for _ in xrange(2):
gevent.get_hub().loop.timer(10, ref=False).start(lambda: None)
with no_time():
result = gevent.wait()
assert result is True
# checking "ref=False" attribute
for _d in xrange(2):
w = gevent.get_hub().loop.timer(10)
w.start(lambda: None)
w.ref = False
with no_time():
result = gevent.wait()
assert result is True
|