File: test__greenletset.py

package info (click to toggle)
python-gevent 0.13.6-1%2Bnmu3
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 2,324 kB
  • sloc: python: 13,296; makefile: 95; ansic: 37
file content (130 lines) | stat: -rw-r--r-- 3,217 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import time
import greentest
import gevent
from gevent import pool

DELAY = 0.1


class SpecialError(Exception):
    pass


class Undead(object):

    def __init__(self):
        self.shot_count = 0

    def __call__(self):
        while True:
            try:
                gevent.sleep(1)
            except SpecialError:
                break
            except:
                self.shot_count += 1


class Test(greentest.TestCase):

    def test_basic(self):
        DELAY = 0.05
        s = pool.Group()
        s.spawn(gevent.sleep, DELAY)
        assert len(s) == 1, s
        s.spawn(gevent.sleep, DELAY * 2.)
        assert len(s) == 2, s
        gevent.sleep(DELAY * 3. / 2.)
        assert len(s) == 1, s
        gevent.sleep(DELAY)
        assert not s, s

    def test_waitall(self):
        s = pool.Group()
        s.spawn(gevent.sleep, DELAY)
        s.spawn(gevent.sleep, DELAY * 2)
        assert len(s) == 2, s
        start = time.time()
        s.join(raise_error=True)
        delta = time.time() - start
        assert not s, s
        assert len(s) == 0, s
        assert DELAY * 1.9 <= delta <= DELAY * 2.5, (delta, DELAY)

    def test_kill_block(self):
        s = pool.Group()
        s.spawn(gevent.sleep, DELAY)
        s.spawn(gevent.sleep, DELAY * 2)
        assert len(s) == 2, s
        start = time.time()
        s.kill()
        assert not s, s
        assert len(s) == 0, s
        delta = time.time() - start
        assert delta < DELAY * 0.8, delta

    def test_kill_noblock(self):
        s = pool.Group()
        s.spawn(gevent.sleep, DELAY)
        s.spawn(gevent.sleep, DELAY * 2)
        assert len(s) == 2, s
        s.kill(block=False)
        assert len(s) == 2, s
        gevent.sleep(0)
        assert not s, s
        assert len(s) == 0, s

    def test_kill_fires_once(self):
        u1 = Undead()
        u2 = Undead()
        p1 = gevent.spawn(u1)
        p2 = gevent.spawn(u2)

        def check(count1, count2):
            assert p1, p1
            assert p2, p2
            assert not p1.dead, p1
            assert not p2.dead, p2
            self.assertEqual(u1.shot_count, count1)
            self.assertEqual(u2.shot_count, count2)

        gevent.sleep(0.01)
        s = pool.Group([p1, p2])
        assert len(s) == 2, s
        check(0, 0)
        s.killone(p1, block=False)
        check(0, 0)
        gevent.sleep(0)
        check(1, 0)
        s.killone(p1)
        check(1, 0)
        s.killone(p1)
        check(1, 0)
        s.kill(block=False)
        s.kill(block=False)
        s.kill(block=False)
        check(1, 0)
        gevent.sleep(DELAY)
        check(1, 1)
        X = object()
        kill_result = gevent.with_timeout(DELAY, s.kill, block=True, timeout_value=X)
        assert kill_result is X, repr(kill_result)
        assert len(s) == 2, s
        check(1, 1)

        p1.kill(SpecialError)
        p2.kill(SpecialError)

    def test_killall_subclass(self):
        p1 = GreenletSubclass.spawn(lambda: 1 / 0)
        p2 = GreenletSubclass.spawn(lambda: gevent.sleep(10))
        s = pool.Group([p1, p2])
        s.kill()


class GreenletSubclass(gevent.Greenlet):
    pass


if __name__ == '__main__':
    greentest.main()