1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
# Based on https://github.com/petkaantonov/bluebird/blob/master/src/promise.js
from collections import deque
from threading import local
if False:
from .promise import Promise
from typing import Any, Callable, Optional, Union # flake8: noqa
class Async(local):
def __init__(self, trampoline_enabled=True):
self.is_tick_used = False
self.late_queue = deque() # type: ignore
self.normal_queue = deque() # type: ignore
self.have_drained_queues = False
self.trampoline_enabled = trampoline_enabled
def enable_trampoline(self):
self.trampoline_enabled = True
def disable_trampoline(self):
self.trampoline_enabled = False
def have_items_queued(self):
return self.is_tick_used or self.have_drained_queues
def _async_invoke_later(self, fn, scheduler):
self.late_queue.append(fn)
self.queue_tick(scheduler)
def _async_invoke(self, fn, scheduler):
# type: (Callable, Any) -> None
self.normal_queue.append(fn)
self.queue_tick(scheduler)
def _async_settle_promise(self, promise):
# type: (Promise) -> None
self.normal_queue.append(promise)
self.queue_tick(promise.scheduler)
def invoke_later(self, fn):
if self.trampoline_enabled:
self._async_invoke_later(fn, scheduler)
else:
scheduler.call_later(0.1, fn)
def invoke(self, fn, scheduler):
# type: (Callable, Any) -> None
if self.trampoline_enabled:
self._async_invoke(fn, scheduler)
else:
scheduler.call(fn)
def settle_promises(self, promise):
# type: (Promise) -> None
if self.trampoline_enabled:
self._async_settle_promise(promise)
else:
promise.scheduler.call(promise._settle_promises)
def throw_later(self, reason, scheduler):
# type: (Exception, Any) -> None
def fn():
# type: () -> None
raise reason
scheduler.call(fn)
fatal_error = throw_later
def drain_queue(self, queue):
# type: (deque) -> None
from .promise import Promise
while queue:
fn = queue.popleft()
if isinstance(fn, Promise):
fn._settle_promises()
continue
fn()
def drain_queue_until_resolved(self, promise):
# type: (Promise) -> None
from .promise import Promise
queue = self.normal_queue
while queue:
if not promise.is_pending:
return
fn = queue.popleft()
if isinstance(fn, Promise):
fn._settle_promises()
continue
fn()
self.reset()
self.have_drained_queues = True
self.drain_queue(self.late_queue)
def wait(self, promise, timeout=None):
# type: (Promise, Optional[float]) -> None
if not promise.is_pending:
# We return if the promise is already
# fulfilled or rejected
return
target = promise._target()
if self.trampoline_enabled:
if self.is_tick_used:
self.drain_queue_until_resolved(target)
if not promise.is_pending:
# We return if the promise is already
# fulfilled or rejected
return
target.scheduler.wait(target, timeout)
def drain_queues(self):
# type: () -> None
assert self.is_tick_used
self.drain_queue(self.normal_queue)
self.reset()
self.have_drained_queues = True
self.drain_queue(self.late_queue)
def queue_tick(self, scheduler):
# type: (Any) -> None
if not self.is_tick_used:
self.is_tick_used = True
scheduler.call(self.drain_queues)
def reset(self):
# type: () -> None
self.is_tick_used = False
|