File: async_.py

package info (click to toggle)
python-promise 2.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 400 kB
  • sloc: python: 2,681; sh: 13; makefile: 4
file content (135 lines) | stat: -rw-r--r-- 3,959 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Based on https://github.com/petkaantonov/bluebird/blob/master/src/promise.js
from collections import deque
from threading import local

if False:
    from .promise import Promise
    from typing import Any, Callable, Optional, Union  # flake8: noqa


class Async(local):
    def __init__(self, trampoline_enabled=True):
        self.is_tick_used = False
        self.late_queue = deque()  # type: ignore
        self.normal_queue = deque()  # type: ignore
        self.have_drained_queues = False
        self.trampoline_enabled = trampoline_enabled

    def enable_trampoline(self):
        self.trampoline_enabled = True

    def disable_trampoline(self):
        self.trampoline_enabled = False

    def have_items_queued(self):
        return self.is_tick_used or self.have_drained_queues

    def _async_invoke_later(self, fn, scheduler):
        self.late_queue.append(fn)
        self.queue_tick(scheduler)

    def _async_invoke(self, fn, scheduler):
        # type: (Callable, Any) -> None
        self.normal_queue.append(fn)
        self.queue_tick(scheduler)

    def _async_settle_promise(self, promise):
        # type: (Promise) -> None
        self.normal_queue.append(promise)
        self.queue_tick(promise.scheduler)

    def invoke_later(self, fn):
        if self.trampoline_enabled:
            self._async_invoke_later(fn, scheduler)
        else:
            scheduler.call_later(0.1, fn)

    def invoke(self, fn, scheduler):
        # type: (Callable, Any) -> None
        if self.trampoline_enabled:
            self._async_invoke(fn, scheduler)
        else:
            scheduler.call(fn)

    def settle_promises(self, promise):
        # type: (Promise) -> None
        if self.trampoline_enabled:
            self._async_settle_promise(promise)
        else:
            promise.scheduler.call(promise._settle_promises)

    def throw_later(self, reason, scheduler):
        # type: (Exception, Any) -> None
        def fn():
            # type: () -> None
            raise reason

        scheduler.call(fn)

    fatal_error = throw_later

    def drain_queue(self, queue):
        # type: (deque) -> None
        from .promise import Promise

        while queue:
            fn = queue.popleft()
            if isinstance(fn, Promise):
                fn._settle_promises()
                continue
            fn()

    def drain_queue_until_resolved(self, promise):
        # type: (Promise) -> None
        from .promise import Promise

        queue = self.normal_queue
        while queue:
            if not promise.is_pending:
                return
            fn = queue.popleft()
            if isinstance(fn, Promise):
                fn._settle_promises()
                continue
            fn()

        self.reset()
        self.have_drained_queues = True
        self.drain_queue(self.late_queue)

    def wait(self, promise, timeout=None):
        # type: (Promise, Optional[float]) -> None
        if not promise.is_pending:
            # We return if the promise is already
            # fulfilled or rejected
            return

        target = promise._target()

        if self.trampoline_enabled:
            if self.is_tick_used:
                self.drain_queue_until_resolved(target)

            if not promise.is_pending:
                # We return if the promise is already
                # fulfilled or rejected
                return
        target.scheduler.wait(target, timeout)

    def drain_queues(self):
        # type: () -> None
        assert self.is_tick_used
        self.drain_queue(self.normal_queue)
        self.reset()
        self.have_drained_queues = True
        self.drain_queue(self.late_queue)

    def queue_tick(self, scheduler):
        # type: (Any) -> None
        if not self.is_tick_used:
            self.is_tick_used = True
            scheduler.call(self.drain_queues)

    def reset(self):
        # type: () -> None
        self.is_tick_used = False