1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
import time
import greentest
import gevent
from gevent import pool
DELAY = 0.1
class SpecialError(Exception):
pass
class Undead(object):
def __init__(self):
self.shot_count = 0
def __call__(self):
while True:
try:
gevent.sleep(1)
except SpecialError:
break
except:
self.shot_count += 1
class Test(greentest.TestCase):
def test_basic(self):
DELAY = 0.05
s = pool.Group()
s.spawn(gevent.sleep, DELAY)
assert len(s) == 1, s
s.spawn(gevent.sleep, DELAY * 2.)
assert len(s) == 2, s
gevent.sleep(DELAY * 3. / 2.)
assert len(s) == 1, s
gevent.sleep(DELAY)
assert not s, s
def test_waitall(self):
s = pool.Group()
s.spawn(gevent.sleep, DELAY)
s.spawn(gevent.sleep, DELAY * 2)
assert len(s) == 2, s
start = time.time()
s.join(raise_error=True)
delta = time.time() - start
assert not s, s
assert len(s) == 0, s
assert DELAY * 1.9 <= delta <= DELAY * 2.5, (delta, DELAY)
def test_kill_block(self):
s = pool.Group()
s.spawn(gevent.sleep, DELAY)
s.spawn(gevent.sleep, DELAY * 2)
assert len(s) == 2, s
start = time.time()
s.kill()
assert not s, s
assert len(s) == 0, s
delta = time.time() - start
assert delta < DELAY * 0.8, delta
def test_kill_noblock(self):
s = pool.Group()
s.spawn(gevent.sleep, DELAY)
s.spawn(gevent.sleep, DELAY * 2)
assert len(s) == 2, s
s.kill(block=False)
assert len(s) == 2, s
gevent.sleep(0)
assert not s, s
assert len(s) == 0, s
def test_kill_fires_once(self):
u1 = Undead()
u2 = Undead()
p1 = gevent.spawn(u1)
p2 = gevent.spawn(u2)
def check(count1, count2):
assert p1, p1
assert p2, p2
assert not p1.dead, p1
assert not p2.dead, p2
self.assertEqual(u1.shot_count, count1)
self.assertEqual(u2.shot_count, count2)
gevent.sleep(0.01)
s = pool.Group([p1, p2])
assert len(s) == 2, s
check(0, 0)
s.killone(p1, block=False)
check(0, 0)
gevent.sleep(0)
check(1, 0)
s.killone(p1)
check(1, 0)
s.killone(p1)
check(1, 0)
s.kill(block=False)
s.kill(block=False)
s.kill(block=False)
check(1, 0)
gevent.sleep(DELAY)
check(1, 1)
X = object()
kill_result = gevent.with_timeout(DELAY, s.kill, block=True, timeout_value=X)
assert kill_result is X, repr(kill_result)
assert len(s) == 2, s
check(1, 1)
p1.kill(SpecialError)
p2.kill(SpecialError)
def test_killall_subclass(self):
p1 = GreenletSubclass.spawn(lambda: 1 / 0)
p2 = GreenletSubclass.spawn(lambda: gevent.sleep(10))
s = pool.Group([p1, p2])
s.kill()
class GreenletSubclass(gevent.Greenlet):
pass
if __name__ == '__main__':
greentest.main()
|