File: trololio.py

package info (click to toggle)
trololio 1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 88 kB
  • sloc: python: 110; makefile: 3
file content (147 lines) | stat: -rw-r--r-- 3,971 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from imp import find_module, load_module
import os


__all__ = [
    'ASYNCIO', 'TROLLIUS', 'asyncio',
    'coroutine', 'From', 'Return',

    # OSError and socket.error exceptions
    'BlockingIOError', 'BrokenPipeError', 'ChildProcessError',
    'ConnectionAbortedError', 'ConnectionRefusedError', 'ConnectionResetError',
    'FileNotFoundError', 'InterruptedError', 'PermissionError',

    # SSLError
    'BACKPORT_SSL_ERRORS',
    'SSLEOFError', 'SSLWantReadError', 'SSLWantWriteError',

    # SSLContext
    'BACKPORT_SSL_CONTEXT',
    'SSLContext'
]


for _mod_name in ['trollius', 'asyncio']:
    try:
        _mod_file, _mod_pathname, _mod_description = find_module(_mod_name)
    except ImportError:
        asyncio = None
    else:
        ASYNCIO = _mod_name == 'asyncio'
        TROLLIUS = not ASYNCIO
        asyncio = load_module(
            'trololio.asyncio', _mod_file,
            _mod_pathname, _mod_description
        )

        if _mod_file is not None:
            _mod_file.close()

        break

if asyncio is None:
    # Generate a nice traceback
    import trollius


_asyncio_debug = os.environ.get('PYTHONASYNCIODEBUG') in ['0', '1']
_trollius_debug = os.environ.get('TROLLIUSDEBUG') in ['0', '1']

if _asyncio_debug and not _trollius_debug:
    os.environ['TROLLIUSDEBUG'] = os.environ['PYTHONASYNCIODEBUG']
elif _trollius_debug and not _asyncio_debug:
    os.environ['PYTHONASYNCIODEBUG'] = os.environ['TROLLIUSDEBUG']


if TROLLIUS:
    from trololio.asyncio import (
        coroutine, From, Return,

        # OSError and socket.error exceptions
        BlockingIOError, BrokenPipeError, ChildProcessError,
        ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError,
        FileNotFoundError, InterruptedError, PermissionError,

        # SSLError
        BACKPORT_SSL_ERRORS,
        SSLEOFError, SSLWantReadError, SSLWantWriteError,

        # SSLContext
        BACKPORT_SSL_CONTEXT,
        SSLContext
    )

else:
    # trollius/coroutines.py
    def From(obj):
        return obj

    class Return(Exception):

        def __init__(self, *args):
            if not args:
                self.value = None
            elif len(args) == 1:
                self.value = args[0]
            else:
                self.value = args

    from builtins import (
        # OSError and socket.error exceptions
        BlockingIOError, BrokenPipeError, ChildProcessError,
        ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError,
        FileNotFoundError, InterruptedError, PermissionError,
    )

    BACKPORT_SSL_CONTEXT = False
    BACKPORT_SSL_ERRORS = False

    from ssl import (
        # SSLError
        SSLEOFError, SSLWantReadError, SSLWantWriteError,

        # SSLContext
        SSLContext
    )

    from textwrap import dedent

    # exec is required because of `yield from` SyntaxError in Py2
    # exec on definition instead of on every yield for less overhead
    exec(dedent(
        """
        from functools import wraps
        from inspect import isgeneratorfunction

        def coroutine(func):
            if not isgeneratorfunction(func):
                coro = asyncio.coroutine(func)
            else:
                @wraps(func)
                def coro(*args, **kwargs):
                    gen = func(*args, **kwargs)

                    for coro_or_fut in gen:
                        res = yield from coro_or_fut
                        gen.send(res)

            @wraps(coro)
            def return_handler(*args, **kwargs):
                try:
                    res = yield from coro(*args, **kwargs)
                except Return as res:
                    return res.value
                else:
                    return res

            return asyncio.coroutine(return_handler)
        """
    ))

    del dedent

del (
    find_module, load_module, os,
    _mod_description, _mod_file, _mod_name, _mod_pathname,
    _asyncio_debug, _trollius_debug
)