File: green.py

package info (click to toggle)
pytango 10.1.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,304 kB
  • sloc: python: 27,795; cpp: 16,150; sql: 252; sh: 152; makefile: 43
file content (350 lines) | stat: -rw-r--r-- 10,566 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later


# Imports
import os
import inspect
import textwrap
import re

from functools import wraps

from threading import get_ident

# Tango imports
from tango._tango import GreenMode
from tango.utils import _forcefully_traced_method

__all__ = (
    "get_green_mode",
    "set_green_mode",
    "green",
    "green_callback",
    "get_executor",
    "get_object_executor",
    "switch_existing_global_executors_to_thread",
)

try:
    import gevent

    del gevent
    _gevent_available = True
except ImportError:
    _gevent_available = False

# Handle current green mode

try:
    _CURRENT_GREEN_MODE = getattr(
        GreenMode, os.environ["PYTANGO_GREEN_MODE"].capitalize()
    )
except Exception:
    _CURRENT_GREEN_MODE = GreenMode.Synchronous


def set_green_mode(green_mode=None):
    """Sets the global default PyTango green mode.

    Advice: Use only in your final application. Don't use this in a python
    library in order not to interfere with the beavior of other libraries
    and/or application where your library is being.

    :param green_mode: the new global default PyTango green mode
    :type green_mode: GreenMode
    """
    global _CURRENT_GREEN_MODE
    # Make sure the green mode is available
    get_executor(green_mode)
    # Set the green mode
    _CURRENT_GREEN_MODE = green_mode


def get_green_mode():
    """Returns the current global default PyTango green mode.

    :returns: the current global default PyTango green mode
    :rtype: GreenMode
    """
    return _CURRENT_GREEN_MODE


# Abstract executor class


class AbstractExecutor:
    asynchronous = NotImplemented
    default_wait = NotImplemented

    def __init__(self):
        self.ident = get_ident(), os.getpid()

    def get_ident(self):
        return self.ident

    def in_executor_context(self):
        return self.ident == (get_ident(), os.getpid())

    def delegate(self, fn, *args, **kwargs):
        """Delegate an operation and return an accessor."""
        if not self.asynchronous:
            raise ValueError("Not supported in synchronous mode")
        raise NotImplementedError

    def access(self, accessor, timeout=None):
        """Return a result from an accessor."""
        if not self.asynchronous:
            raise ValueError("Not supported in synchronous mode")
        raise NotImplementedError

    def submit(self, fn, *args, **kwargs):
        """Submit an operation"""
        if not self.asynchronous:
            return fn(*args, **kwargs)
        raise NotImplementedError

    def execute(self, fn, *args, **kwargs):
        """Execute an operation and return the result."""
        if not self.asynchronous:
            return fn(*args, **kwargs)
        raise NotImplementedError

    def run(self, fn, args=(), kwargs={}, wait=None, timeout=None):
        if wait is None:
            wait = self.default_wait
        # Wait and timeout are not supported in synchronous mode
        if not self.asynchronous and (not wait or timeout):
            raise ValueError("Not supported in synchronous mode")
        # Synchronous (no delegation)
        if not self.asynchronous or not self.in_executor_context():
            return fn(*args, **kwargs)
        # Asynchronous delegation
        accessor = self.delegate(fn, *args, **kwargs)
        if not wait:
            return accessor
        return self.access(accessor, timeout=timeout)


class SynchronousExecutor(AbstractExecutor):
    asynchronous = False
    default_wait = True


# Default synchronous executor


def get_synchronous_executor():
    return _SYNCHRONOUS_EXECUTOR


_SYNCHRONOUS_EXECUTOR = SynchronousExecutor()


# Getters


def get_object_green_mode(obj):
    if hasattr(obj, "get_green_mode"):
        return obj.get_green_mode()
    return get_green_mode()


def get_executor(green_mode=None):
    if green_mode is None:
        green_mode = get_green_mode()
    # Valid green modes
    if green_mode == GreenMode.Synchronous:
        return get_synchronous_executor()
    if green_mode == GreenMode.Gevent:
        from tango import gevent_executor

        return gevent_executor.get_global_executor()
    if green_mode == GreenMode.Futures:
        from tango import futures_executor

        return futures_executor.get_global_executor()
    if green_mode == GreenMode.Asyncio:
        from tango import asyncio_executor

        return asyncio_executor.get_global_executor()
    # Invalid green mode
    raise TypeError("Not a valid green mode")


def switch_existing_global_executors_to_thread():
    """
    checks which global executor existing, and if they are belong to the caller thread
    if not - creates a new executor, linked to thread, and set it as global
    """
    from tango import asyncio_executor
    from tango import futures_executor

    if _gevent_available:
        from tango import gevent_executor
    else:
        gevent_executor = None

    for executor in [asyncio_executor, futures_executor, gevent_executor]:
        if executor:
            executor._switch_global_executor_to_thread()


def get_object_executor(obj, green_mode=None):
    """Returns the proper executor for the given object.

    If the object has *_executors* and *_green_mode* members it returns
    the submit callable for the executor corresponding to the green_mode.
    Otherwise it returns the global executor for the given green_mode.

    Note: *None* is a valid object.

    :returns: submit callable"""
    # Get green mode
    if green_mode is None:
        green_mode = get_object_green_mode(obj)
    # Get executor
    executor = None
    if hasattr(obj, "_executors"):
        executor = obj._executors.get(green_mode, None)
    if executor is None:
        executor = get_executor(green_mode)
    # Get submitter
    return executor


# Green modifiers


def green(fn=None, consume_green_mode=True, update_signature_and_docstring=False):
    """Make a function green. Can be used as a decorator."""

    def decorator(fn):
        @wraps(fn)
        def greener(obj, *args, **kwargs):
            args = (obj,) + args
            wait = kwargs.pop("wait", None)
            timeout = kwargs.pop("timeout", None)
            access = kwargs.pop if consume_green_mode else kwargs.get
            green_mode = access("green_mode", None)
            executor = get_object_executor(obj, green_mode)
            return executor.run(fn, args, kwargs, wait=wait, timeout=timeout)

        sig = inspect.signature(fn)

        if update_signature_and_docstring:

            # Build green parameters
            green_mode_param = inspect.Parameter(
                "green_mode",
                kind=inspect.Parameter.KEYWORD_ONLY,
                default=None,
                annotation=None,
            )

            wait_param = inspect.Parameter(
                "wait",
                kind=inspect.Parameter.KEYWORD_ONLY,
                default=None,
                annotation=None,
            )

            timeout_param = inspect.Parameter(
                "timeout",
                kind=inspect.Parameter.KEYWORD_ONLY,
                default=None,
                annotation=None,
            )

            # Append it to the existing parameters
            old_params = list(sig.parameters.values())
            add_kwargs = False
            if old_params[-1].kind == inspect.Parameter.VAR_KEYWORD:
                old_params = old_params[:-1]
                add_kwargs = True

            if "green_mode" in [param.name for param in old_params]:
                new_params = old_params + [wait_param, timeout_param]
            else:
                new_params = old_params + [green_mode_param, wait_param, timeout_param]

            if add_kwargs:
                new_params += [
                    inspect.Parameter("kwargs", kind=inspect.Parameter.VAR_KEYWORD)
                ]

            new_sig = sig.replace(parameters=new_params)

            if greener.__doc__ is not None:
                fill_green_doc(greener)

            greener.__signature__ = new_sig

        else:
            greener.__signature__ = sig

        return greener

    if fn is None:
        return decorator
    return decorator(fn)


def green_callback(fn, obj=None, green_mode=None):
    """Return a green verion of the given callback."""
    executor = get_object_executor(obj, green_mode)

    @wraps(fn)
    def greener(*args, **kwargs):
        return executor.submit(_forcefully_traced_method(fn), *args, **kwargs)

    return greener


__GREEN_KWARGS__ = "green_mode=None, wait=True, timeout=None"
__GREEN_KWARGS_DESCRIPTION__ = """
:param green_mode: Defaults to the current tango GreenMode. Refer to :meth:`~tango.DeviceProxy.get_green_mode` and :meth:`~tango.DeviceProxy.set_green_mode` for more details.
:type green_mode: :obj:`tango.GreenMode`, optional

:param wait: Specifies whether to wait for the result. If `green_mode` is *Synchronous*, this parameter is ignored as the operation always waits for the result. This parameter is also ignored when `green_mode` is Synchronous.
:type wait: bool, optional

:param timeout: The number of seconds to wait for the result. If set to `None`, there is no limit on the wait time. This parameter is ignored when `green_mode` is Synchronous or when `wait` is False.
:type timeout: float, optional
"""
__GREEN_RAISES__ = """
:throws: :obj:`TimeoutError`: (green_mode == Futures) If the future didn't finish executing before the given timeout.
:throws: :obj:`Timeout`: (green_mode == Gevent) If the async result didn't finish executing before the given timeout.
"""


def fill_green_doc(method):
    """
    Replace the __GREEN_KWARGS__ __GREEN_KWARGS_DESCRIPTION__ placeholders in `doc`
    preserving the placeholder’s indentation.
    """

    dedented = textwrap.dedent(method.__doc__)
    dedented = dedented.replace("__GREEN_KWARGS__", __GREEN_KWARGS__)

    m = re.search(
        r"^(?P<indent>[ \t]*)__GREEN_KWARGS_DESCRIPTION__", dedented, flags=re.MULTILINE
    )
    if not m:
        return

    indent = m.group("indent")

    indented_desc = textwrap.indent(__GREEN_KWARGS_DESCRIPTION__.strip("\n"), indent)
    indented_raises = textwrap.indent(__GREEN_RAISES__.strip("\n"), indent)

    dedented = re.sub(
        r"^[ \t]*__GREEN_KWARGS_DESCRIPTION__",
        indented_desc,
        dedented,
        flags=re.MULTILINE,
    )

    method.__doc__ = re.sub(
        r"^[ \t]*__GREEN_RAISES__", indented_raises, dedented, flags=re.MULTILINE
    )