File: uvloop.py

package info (click to toggle)
apt-mirror2 15-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,388 kB
  • sloc: python: 5,337; sh: 23; makefile: 14
file content (112 lines) | stat: -rw-r--r-- 3,876 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# SPDX-License-Identifer: GPL-3.0-or-later

# This file is mostly copied from the uvloop source with small changes.
# https://github.com/MagicStack/uvloop/blob/3c3bbeff3418c60b14f793a6541ad01d8036b706/uvloop/__init__.py#L117
# uvloop is dual-licensed under MIT and Apache 2.0 licenses.
# Copyright (C) 2016-present the uvloop authors and contributors.
# Copyright (C) 2024 Yuri Konotopov

import asyncio
import sys
from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING, Any, TypeVar

_T = TypeVar("_T")

try:
    import uvloop
    from uvloop import Loop

    # Copied from uvloop
    def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
        # Copied from python/cpython

        to_cancel = asyncio.all_tasks(loop)
        if not to_cancel:
            return

        for task in to_cancel:
            task.cancel()

        loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))

        for task in to_cancel:
            if task.cancelled():
                continue
            if task.exception() is not None:
                loop.call_exception_handler(
                    {
                        "message": "unhandled exception during asyncio.run() shutdown",
                        "exception": task.exception(),
                        "task": task,
                    }
                )

    if TYPE_CHECKING:
        # pylint: disable=unused-argument

        def run(  # type: ignore
            main: Coroutine[Any, Any, _T],
            *,
            loop_factory: Callable[[], Loop] | None = uvloop.new_event_loop,
            debug: bool | None = None,
        ) -> _T:  # type: ignore
            """The preferred way of running a coroutine with uvloop."""

        # pylint: enable=unused-argument
    else:

        def run(
            main: Coroutine[Any, Any, Any],
            *,
            loop_factory: Callable[
                [], asyncio.AbstractEventLoop
            ] = uvloop.new_event_loop,
            **run_kwargs: str,
        ):
            async def wrapper():
                # If `loop_factory` is provided we want it to return
                # either uvloop.Loop or a subtype of it, assuming the user
                # is using `uvloop.run()` intentionally.
                loop = asyncio.get_running_loop()
                if not isinstance(loop, Loop):
                    raise TypeError("uvloop.run() uses a non-uvloop event loop")
                return await main

            vi = sys.version_info[:2]

            if vi <= (3, 10):
                loop = loop_factory()
                try:
                    asyncio.set_event_loop(loop)
                    return loop.run_until_complete(wrapper())
                finally:
                    try:
                        _cancel_all_tasks(loop)
                        loop.run_until_complete(loop.shutdown_asyncgens())
                        if hasattr(loop, "shutdown_default_executor"):
                            loop.run_until_complete(loop.shutdown_default_executor())
                    finally:
                        asyncio.set_event_loop(None)
                        loop.close()

            elif vi == (3, 11):
                with asyncio.Runner(  # pylint: disable=no-member  # type: ignore
                    loop_factory=loop_factory, **run_kwargs
                ) as runner:  # type: ignore
                    return runner.run(wrapper())  # type: ignore

            else:
                return asyncio.run(  # pylint: disable=unexpected-keyword-arg
                    wrapper(),
                    loop_factory=loop_factory,  # type: ignore
                    **run_kwargs,
                )

    UVLOOP_AVAILABLE = True

except ImportError:
    UVLOOP_AVAILABLE = False  # type: ignore

    def run(main: Coroutine[Any, Any, Any]):
        return asyncio.run(main)