1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
import sys
from .__version__ import __version__ as __version__
from contextlib import contextmanager
try:
from flux import current_timeline as time_module
except ImportError:
import time as time_module
from .deadlines import make_deadline as _make_deadline
from .exceptions import TimeoutExpired, IllegalArgumentError, NestedStopIteration
def wait(*args, **kwargs):
result = _Result()
try:
for x in iterwait(result=result, *args, **kwargs):
pass
except NestedStopIteration as e:
e.reraise()
return result.result
def iterwait(predicate, timeout_seconds=None, sleep_seconds=1, result=None, waiting_for=None,
on_poll=None, expected_exceptions=()):
if not isinstance(expected_exceptions, tuple) and not (isinstance(expected_exceptions, type)
and issubclass(expected_exceptions, Exception)):
raise IllegalArgumentError(
'expected_exceptions should be tuple or Exception subclass')
if on_poll is not None and not callable(on_poll):
raise IllegalArgumentError(
'on_poll should be callable')
timeout = _make_deadline(timeout_seconds)
if result is None:
result = _Result()
if waiting_for is None:
waiting_for = str(predicate)
sleep_generator = _get_sleep_generator(timeout, sleep_seconds)
while True:
with _end_sleeping(next(sleep_generator)) as cancel_sleep:
try:
result.result = predicate()
if on_poll is not None:
on_poll()
except expected_exceptions:
pass
except StopIteration:
exc_info = sys.exc_info()
raise NestedStopIteration(exc_info)
if result.result:
cancel_sleep()
return
if timeout.is_expired():
raise TimeoutExpired(timeout_seconds, waiting_for)
yield
@contextmanager
def _end_sleeping(total_seconds):
deadline = _make_deadline(total_seconds)
sleep_toggle = _SleepToggle()
yield sleep_toggle
if sleep_toggle.enabled:
time_module.sleep(max(0, deadline.get_num_seconds_remaining()))
class _SleepToggle(object):
enabled = True
def __call__(self):
self.enabled = False
class _Result(object):
result = None
def _get_sleep_generator(timeout, sleep_seconds):
if type(sleep_seconds) not in (tuple, list):
sleep_seconds = (sleep_seconds, sleep_seconds, 1)
if len(sleep_seconds) <= 2:
sleep_seconds = (sleep_seconds[0], sleep_seconds[1], 2)
elif len(sleep_seconds) < 2:
sleep_seconds = (sleep_seconds[0], None, 2)
current_sleep, end_sleep, multiplier = sleep_seconds
while True:
time_remaining = timeout.get_num_seconds_remaining()
if time_remaining is not None:
current_sleep = min(time_remaining, current_sleep)
current_sleep = max(0, current_sleep)
yield current_sleep
current_sleep *= multiplier
if end_sleep is not None:
current_sleep = min(end_sleep, current_sleep)
class Aggregate(object):
def __init__(self, predicates):
super(Aggregate, self).__init__()
self.predicates = list(predicates)
class ANY(Aggregate):
def __call__(self):
return any(p() for p in self.predicates)
class ALL(Aggregate):
def __call__(self):
for index in range(len(self.predicates), 0, -1):
index -= 1
if self.predicates[index]():
self.predicates.pop(index)
return not self.predicates
|