1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import with_statement
import sys
import greentest
import weakref
import time
from gevent import sleep, Timeout
DELAY = 0.04
class Error(Exception):
pass
class Test(greentest.TestCase):
def test_api(self):
# Nothing happens if with-block finishes before the timeout expires
t = Timeout(DELAY * 2)
assert not t.pending, repr(t)
with t:
assert t.pending, repr(t)
sleep(DELAY)
# check if timer was actually cancelled
assert not t.pending, repr(t)
sleep(DELAY * 2)
# An exception will be raised if it's not
try:
with Timeout(DELAY) as t:
sleep(DELAY * 2)
except Timeout:
ex = sys.exc_info()[1]
assert ex is t, (ex, t)
else:
raise AssertionError('must raise Timeout')
# You can customize the exception raised:
try:
with Timeout(DELAY, IOError("Operation takes way too long")):
sleep(DELAY * 2)
except IOError:
ex = sys.exc_info()[1]
assert str(ex) == "Operation takes way too long", repr(ex)
# Providing classes instead of values should be possible too:
try:
with Timeout(DELAY, ValueError):
sleep(DELAY * 2)
except ValueError:
pass
try:
1 / 0
except:
try:
with Timeout(DELAY, sys.exc_info()[0]):
sleep(DELAY * 2)
raise AssertionError('should not get there')
raise AssertionError('should not get there')
except ZeroDivisionError:
pass
else:
raise AssertionError('should not get there')
# It's possible to cancel the timer inside the block:
with Timeout(DELAY) as timer:
timer.cancel()
sleep(DELAY * 2)
# To silent the exception before exiting the block, pass False as second parameter.
XDELAY = 0.1
start = time.time()
with Timeout(XDELAY, False):
sleep(XDELAY * 2)
delta = (time.time() - start)
assert delta < XDELAY * 2, delta
# passing None as seconds disables the timer
with Timeout(None):
sleep(DELAY)
sleep(DELAY)
def test_ref(self):
err = Error()
err_ref = weakref.ref(err)
with Timeout(DELAY * 2, err):
sleep(DELAY)
del err
assert not err_ref(), repr(err_ref())
def test_nested_timeout(self):
with Timeout(DELAY, False):
with Timeout(DELAY * 2, False):
sleep(DELAY * 3)
raise AssertionError('should not get there')
with Timeout(DELAY) as t1:
with Timeout(DELAY * 2) as t2:
try:
sleep(DELAY * 3)
except Timeout:
ex = sys.exc_info()[1]
assert ex is t1, (ex, t1)
assert not t1.pending, t1
assert t2.pending, t2
assert not t2.pending, t2
with Timeout(DELAY * 2) as t1:
with Timeout(DELAY) as t2:
try:
sleep(DELAY * 3)
except Timeout:
ex = sys.exc_info()[1]
assert ex is t2, (ex, t2)
assert t1.pending, t1
assert not t2.pending, t2
assert not t1.pending, t1
if __name__ == '__main__':
greentest.main()
|