File: asynchronous.py

package info (click to toggle)
python-trame-server 3.6.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 480 kB
  • sloc: python: 4,075; javascript: 5; sh: 4; makefile: 3
file content (189 lines) | stat: -rw-r--r-- 5,397 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import asyncio
import logging

from . import is_dunder, is_private

__all__ = [
    "StateQueue",
    "create_state_queue_monitor_task",
    "create_task",
    "decorate_task",
    "handle_task_result",
    "task",
]

QUEUE_EXIT = "STOP"


def handle_task_result(task: asyncio.Task) -> None:
    try:
        task.result()
    except asyncio.CancelledError:
        pass  # Task cancellation should not be logged as an error.
    except Exception:  # pylint: disable=broad-except
        logging.exception("Exception raised by task = %r", task)


def create_task(coroutine, loop=None):
    """
    Create a task from a coroutine while also attaching a done callback so any
    exception or error could be caught and reported.

    :param coroutine: A coroutine to execute as an independent task
    :param loop: Optionally provide the loop on which the task should be
                 scheduled on. By default we will use the current running loop.

    :return: The decorated task
    :rtype: asyncio.Task
    """
    if loop is None:
        loop = asyncio.get_event_loop()

    return decorate_task(loop.create_task(coroutine))


def decorate_task(task):
    """
    Decorate a task by attaching a done callback so any exception or error could
    be caught and reported.

    :param task: A coroutine to execute as an independent task
    :type task: asyncio.Task

    :return: The same task object
    :rtype: asyncio.Task
    """
    task.add_done_callback(handle_task_result)
    return task


async def _queue_update_state(server, queue, delay=1):
    _monitor_queue = True
    while _monitor_queue:
        if queue.empty():
            await asyncio.sleep(delay)
        else:
            msg = queue.get_nowait()
            if isinstance(msg, str):
                if msg == QUEUE_EXIT:
                    _monitor_queue = False
            else:
                with server.state:
                    server.state.update(msg)


def create_state_queue_monitor_task(server, queue, delay=1):
    """
    Create and schedule a task to watch over the provided queue
    to update a server state.
    This is especially useful when using a multiprocess executor
    and you want to report progress into your current server.

    :param server: A coroutine to execute as an independent task
    :type server: trame_server.core.Server

    :param queue: A queue instance meant to exchange state from
                  the parallel process to the given server
    :type queue: multiprocessing.Queue

    :param delay: Time to sleep in seconds before processing
                  the queue once emptied
    :type delay: float

    :return: The monitoring task
    :rtype: asyncio.Task
    """
    return create_task(_queue_update_state(server, queue, delay=delay))


class StateQueue:
    """
    Class use to decorate a multiprocessing.Queue inside your external
    process to simulate your server state object.

    :param queue: A queue instance meant to exchange state from the parallel
                  process to the given server
    :type queue: multiprocessing.Queue

    :param auto_flush: Should you manage the state update phase or just
                       propagate as soon as you update a property
    :type auto_flush: Boolean
    """

    def __init__(self, queue, auto_flush=True):
        self._queue = queue
        self._pending_update = {}
        self._pushed_state = {}
        self._auto_flush = auto_flush
        self._ctx_count = 0

    @property
    def queue(self):
        """Provide access to the decorated queue"""
        return self._queue

    def __getitem__(self, key):
        return self._pending_update.get(key, self._pushed_state.get(key))

    def __setitem__(self, key, value):
        self._pending_update[key] = value
        if self._auto_flush:
            self.flush()

    def __getattr__(self, key):
        if is_dunder(key):
            # Forward dunder calls to object
            return getattr(object, key)

        if is_private(key):
            return self.__dict__.get(key)

        return self.__getitem__(key)

    def __setattr__(self, key, value):
        if is_private(key):
            self.__dict__[key] = value
        else:
            self.__setitem__(key, value)

    def update(self, _dict):
        """
        Update the distributed state from a set of key/value pair

        :param _dict: A dict containing one or many key/value pair
        :type _dict: dict
        """
        self._pending_update.update(_dict)
        if self._auto_flush:
            self.flush()

    def flush(self):
        """Explicitly push any local change to the queue."""
        if len(self._pending_update):
            self._queue.put_nowait(self._pending_update)
            self._pushed_state.update(self._pending_update)
            self._pending_update = {}

    def exit(self):
        """Release the monitoring task as we are done with our work"""
        self._queue.put_nowait(QUEUE_EXIT)

    def __enter__(self):
        self._ctx_count += 1
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self._ctx_count -= 1

        if self._ctx_count == 0:
            self.flush()
            self.exit()


def task(func):
    """Function decorator to make its async execution within a task"""

    def wrapper(*args, **kwargs):
        create_task(func(*args, **kwargs))

    return wrapper