File: timeout_test.py

package info (click to toggle)
python-eventlet 0.9.16-3
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,884 kB
  • sloc: python: 16,306; makefile: 97
file content (54 lines) | stat: -rw-r--r-- 1,783 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from tests import LimitedTestCase
from eventlet import timeout
from eventlet import greenthread
DELAY = 0.01

class TestDirectRaise(LimitedTestCase):
    def test_direct_raise_class(self):
        try:
            raise timeout.Timeout
        except timeout.Timeout, t:
            assert not t.pending, repr(t)

    def test_direct_raise_instance(self):
        tm = timeout.Timeout()
        try:
            raise tm
        except timeout.Timeout, t:
            assert tm is t, (tm, t)
            assert not t.pending, repr(t)
            
    def test_repr(self):
        # just verify these don't crash
        tm = timeout.Timeout(1)
        greenthread.sleep(0)
        repr(tm)
        str(tm)
        tm.cancel()
        tm = timeout.Timeout(None, RuntimeError)
        repr(tm)
        str(tm)
        tm = timeout.Timeout(None, False)
        repr(tm)
        str(tm)

class TestWithTimeout(LimitedTestCase):
    def test_with_timeout(self):
        self.assertRaises(timeout.Timeout, timeout.with_timeout, DELAY, greenthread.sleep, DELAY*10)
        X = object()
        r = timeout.with_timeout(DELAY, greenthread.sleep, DELAY*10, timeout_value=X)
        self.assert_(r is X, (r, X))
        r = timeout.with_timeout(DELAY*10, greenthread.sleep, 
                                 DELAY, timeout_value=X)
        self.assert_(r is None, r)


    def test_with_outer_timer(self):
        def longer_timeout():
            # this should not catch the outer timeout's exception
            return timeout.with_timeout(DELAY * 10, 
                                        greenthread.sleep, DELAY * 20,
                                        timeout_value='b')
        self.assertRaises(timeout.Timeout,
            timeout.with_timeout, DELAY, longer_timeout)