File: poll.py

package info (click to toggle)
systemtap 5.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 47,556 kB
  • sloc: cpp: 81,117; ansic: 54,933; xml: 49,795; exp: 43,595; sh: 11,526; python: 5,003; perl: 2,252; tcl: 1,312; makefile: 1,006; javascript: 149; lisp: 105; awk: 101; asm: 91; java: 70; sed: 16
file content (325 lines) | stat: -rw-r--r-- 10,526 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# Fork of jupyter-ui-poll
# Copyright (C) 2023 Red Hat Inc.
#
# This file is part of systemtap, and is free software.  You can
# redistribute it and/or modify it under the terms of the GNU General
# Public License (GPL); either version 2, or (at your option) any
# later version.
#
# TODO: jupyter-ui-poll assumes the ipython kernel, which has an interactive shell
# while isystemtap does not. So The changes largely consist of working around that.
# Would probably benefit from adding changes to their repo and remove this file all
# together: https://github.com/Kirill888/jupyter-ui-poll/blob/develop/jupyter_ui_poll/_poll.py
import asyncio
import sys
import time
from collections import abc
from functools import singledispatch
from inspect import iscoroutinefunction, isawaitable
from typing import (
    Any,
    AsyncIterable,
    AsyncIterator,
    Callable,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)

import zmq
from IPython import get_ipython
from tornado.queues import QueueEmpty

from jupyter_ui_poll._async_thread import AsyncThread

T = TypeVar("T")


class KernelWrapper:
    _current: Optional["KernelWrapper"] = None

    def __init__(self, shell, loop, ker=None) -> None:
        if shell and hasattr(shell, "kernel"):
            kernel = shell.kernel
            self._shell = shell
        else:
            kernel = ker
            self._shell = None

        self._kernel = kernel
        self._loop = loop
        self._original_parent = (
            kernel._parent_ident,
            kernel.get_parent()  # ipykernel 6+
            if hasattr(kernel, "get_parent")
            else kernel._parent_header,  # ipykernel < 6
        )
        self._events: List[Tuple[Any, Any, Any]] = []
        self._backup_execute_request = kernel.shell_handlers["execute_request"]
        self._aproc = None

        if iscoroutinefunction(self._backup_execute_request):  # ipykernel 6+
            kernel.shell_handlers["execute_request"] = self._execute_request_async
        else:
            # ipykernel < 6
            kernel.shell_handlers["execute_request"] = self._execute_request

        if shell:
            shell.events.register("post_run_cell", self._post_run_cell_hook)
        

    def restore(self):
        if self._backup_execute_request is not None:
            self._kernel.shell_handlers[
                "execute_request"
            ] = self._backup_execute_request
            self._backup_execute_request = None

    def _reset_output(self):
        self._kernel.set_parent(*self._original_parent)

    def _execute_request(self, stream, ident, parent):
        # store away execute request for later and reset io back to the original cell
        self._events.append((stream, ident, parent))
        self._reset_output()

    async def _execute_request_async(self, stream, ident, parent):
        self._execute_request(stream, ident, parent)

    async def replay(self):
        kernel = self._kernel
        self.restore()

        sys.stdout.flush()
        sys.stderr.flush()
        shell_stream = getattr(
            kernel, "shell_stream", None
        )  # ipykernel 6 vs 5 differences

        for stream, ident, parent in self._events:
            kernel.set_parent(ident, parent)
            if kernel._aborting:
                kernel._send_abort_reply(stream, parent, ident)
            else:
                rr = kernel.execute_request(stream, ident, parent)
                if isawaitable(rr):
                    await rr

                # replicate shell_dispatch behaviour
                sys.stdout.flush()
                sys.stderr.flush()
                if shell_stream is not None:  # 6+
                    kernel._publish_status("idle", "shell")
                    shell_stream.flush(zmq.POLLOUT)
                else:
                    kernel._publish_status("idle")

    async def do_one_iteration(self):
        try:
            rr = self._kernel.do_one_iteration()
            if isawaitable(rr):
                await rr
        except QueueEmpty:
            # it's probably a bug in ipykernel,
            # .do_one_iteration() should not throw
            return
        finally:
            # reset stdio back to original cell
            self._reset_output()

    def _post_run_cell_hook(self, *args, **kw):
        if self._shell:
            self._shell.events.unregister("post_run_cell", self._post_run_cell_hook)
        self.restore()
        KernelWrapper._current = None
        asyncio.ensure_future(self.replay(), loop=self._loop)

    async def _poll_async(self, n=1):
        for _ in range(n):
            await self.do_one_iteration()

    async def __aenter__(self):
        return self._poll_async

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    def __enter__(self):
        if self._aproc is not None:
            raise ValueError("Nesting not supported")
        self._aproc = AsyncThread()
        return self._aproc.wrap(self._poll_async)

    def __exit__(self, exc_type, exc_val, exc_tb):
        if not self._shell:
            self._post_run_cell_hook()
        
        self._aproc.terminate()
        self._aproc = None

    @staticmethod
    def get(kernel=None) -> "KernelWrapper":
        if KernelWrapper._current is None:
            KernelWrapper._current = KernelWrapper(
                get_ipython(), asyncio.get_event_loop(), ker=kernel
            )
        return KernelWrapper._current


class IteratorWrapperAsync(abc.AsyncIterable, Generic[T]):
    def __init__(
        self, its: AsyncIterable[T], n: int = 1,
    ):
        self._its = its
        self._n = n

    def __aiter__(self) -> AsyncIterator[T]:
        async def _loop(
            kernel: KernelWrapper, its: AsyncIterable[T], n: int
        ) -> AsyncIterator[T]:
            async with kernel as poll:
                async for x in its:
                    await poll(n)
                    yield x

        return _loop(KernelWrapper.get(), self._its, self._n)


class IteratorWrapper(abc.Iterable, Generic[T]):
    def __init__(
        self, its: Iterable[T], n: int = 1,
    ):
        self._its = its
        self._n = n

    def __iter__(self) -> Iterator[T]:
        def _loop(kernel: KernelWrapper, its: Iterable[T], n: int) -> Iterator[T]:
            with kernel as poll:
                try:
                    for x in its:
                        poll(n)
                        yield x
                except GeneratorExit:
                    pass
                except Exception as e:
                    raise e

        return _loop(KernelWrapper.get(), self._its, self._n)

    def __aiter__(self) -> AsyncIterator[T]:
        async def _loop(
            kernel: KernelWrapper, its: Iterable[T], n: int
        ) -> AsyncIterator[T]:
            async with kernel as poll:
                for x in its:
                    await poll(n)
                    yield x

        return _loop(KernelWrapper.get(), self._its, self._n)


def ui_events(kernel=None):
    """
    Gives you a function you can call to process UI events while running a long
    task inside a Jupyter cell.
    .. code-block:: python
       with ui_events() as ui_poll:
          while some_condition:
             ui_poll(10)  # Process upto 10 UI events if any happened
             do_some_more_compute()
    Async mode is also supported:
    .. code-block:: python
       async with ui_events() as ui_poll:
          while some_condition:
             await ui_poll(10)  # Process upto 10 UI events if any happened
             do_some_more_compute()
    #. Call ``kernel.do_one_iteration()`` taking care of IO redirects
    #. Intercept ``execute_request`` IPython kernel events and delay their execution
    #. Schedule replay of any blocked ``execute_request`` events when
       cell execution is finished.
    """
    return KernelWrapper.get(kernel)


@singledispatch
def with_ui_events(its, n: int = 1):
    """
    Deal with kernel ui events while processing a long sequence
    Iterable returned from this can be used in both async and sync contexts.
    .. code-block:: python
       for x in with_ui_events(some_data_stream, n=10):
          do_things_with(x)
       async for x in with_ui_events(some_data_stream, n=10):
          await do_things_with(x)
    This is basically equivalent to:
    .. code-block:: python
       with ui_events() as poll:
         for x in some_data_stream:
             poll(10)
             do_things_with(x)
    :param its:
      Iterator to pass through, this should be either
      :class:`~collections.abc.Iterable` or :class:`~collections.abc.AsyncIterable`
    :param n:
      Number of events to process in between items
    :returns:
      :class:`~collections.abc.AsyncIterable` when input is
      :class:`~collections.abc.AsyncIterable`
    :returns:
      Object that implements both :class:`~collections.abc.Iterable` and
      :class:`~collections.abc.AsyncIterable` interfaces when input is normal
      :class:`~collections.abc.Iterable`
    """
    raise TypeError("Expect Iterable[T]|AsyncIterable[T]")


@with_ui_events.register(abc.Iterable)
def with_ui_events_sync(its: Iterable[T], n: int = 1) -> IteratorWrapper[T]:
    return IteratorWrapper(its, n=n)


@with_ui_events.register(abc.AsyncIterable)
def with_ui_events_async(its: AsyncIterable[T], n: int = 1) -> AsyncIterable[T]:
    return IteratorWrapperAsync(its, n=n)


def run_ui_poll_loop(
    f: Callable[[], Optional[T]], sleep: float = 0.02, n: int = 1
) -> T:
    """
    Repeatedly call ``f()`` until it returns something other than ``None``
    while also responding to widget events.
    This blocks execution of cells below in the notebook while still preserving
    interactivity of jupyter widgets.
    :param f:
       Function to periodically call (``f()`` should not block for long)
    :param sleep:
       Amount of time to sleep in between polling (in seconds, 1/50 is the default)
    :param n:
       Number of events to process per iteration
    :returns:
       First non-``None`` value returned from ``f()``
    """

    def as_iterator(
        f: Callable[[], Optional[T]], sleep: float
    ) -> Iterator[Optional[T]]:
        x = None
        while x is None:
            if sleep is not None:
                time.sleep(sleep)

            x = f()
            yield x

    for x in with_ui_events(as_iterator(f, sleep), n):
        if x is not None:
            break

    assert x is not None
    return x