File: green.py

package info (click to toggle)
pytango 10.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 10,216 kB
  • sloc: python: 28,206; cpp: 16,380; sql: 255; sh: 82; makefile: 43
file content (243 lines) | stat: -rw-r--r-- 6,761 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later


# Imports
import os
from functools import wraps

from threading import get_ident

# Tango imports
from tango._tango import GreenMode

__all__ = (
    "get_green_mode",
    "set_green_mode",
    "green",
    "green_callback",
    "get_executor",
    "get_object_executor",
    "switch_existing_global_executors_to_thread",
)

try:
    import gevent

    del gevent
    _gevent_available = True
except ImportError:
    _gevent_available = False

# Handle current green mode

try:
    _CURRENT_GREEN_MODE = getattr(
        GreenMode, os.environ["PYTANGO_GREEN_MODE"].capitalize()
    )
except Exception:
    _CURRENT_GREEN_MODE = GreenMode.Synchronous


def set_green_mode(green_mode=None):
    """Sets the global default PyTango green mode.

    Advice: Use only in your final application. Don't use this in a python
    library in order not to interfere with the beavior of other libraries
    and/or application where your library is being.

    :param green_mode: the new global default PyTango green mode
    :type green_mode: GreenMode
    """
    global _CURRENT_GREEN_MODE
    # Make sure the green mode is available
    get_executor(green_mode)
    # Set the green mode
    _CURRENT_GREEN_MODE = green_mode


def get_green_mode():
    """Returns the current global default PyTango green mode.

    :returns: the current global default PyTango green mode
    :rtype: GreenMode
    """
    return _CURRENT_GREEN_MODE


# Abstract executor class


class AbstractExecutor:
    asynchronous = NotImplemented
    default_wait = NotImplemented

    def __init__(self):
        self.ident = get_ident(), os.getpid()

    def get_ident(self):
        return self.ident

    def in_executor_context(self):
        return self.ident == (get_ident(), os.getpid())

    def delegate(self, fn, *args, **kwargs):
        """Delegate an operation and return an accessor."""
        if not self.asynchronous:
            raise ValueError("Not supported in synchronous mode")
        raise NotImplementedError

    def access(self, accessor, timeout=None):
        """Return a result from an accessor."""
        if not self.asynchronous:
            raise ValueError("Not supported in synchronous mode")
        raise NotImplementedError

    def submit(self, fn, *args, **kwargs):
        """Submit an operation"""
        if not self.asynchronous:
            return fn(*args, **kwargs)
        raise NotImplementedError

    def execute(self, fn, *args, **kwargs):
        """Execute an operation and return the result."""
        if not self.asynchronous:
            return fn(*args, **kwargs)
        raise NotImplementedError

    def run(self, fn, args=(), kwargs={}, wait=None, timeout=None):
        if wait is None:
            wait = self.default_wait
        # Wait and timeout are not supported in synchronous mode
        if not self.asynchronous and (not wait or timeout):
            raise ValueError("Not supported in synchronous mode")
        # Synchronous (no delegation)
        if not self.asynchronous or not self.in_executor_context():
            return fn(*args, **kwargs)
        # Asynchronous delegation
        accessor = self.delegate(fn, *args, **kwargs)
        if not wait:
            return accessor
        return self.access(accessor, timeout=timeout)


class SynchronousExecutor(AbstractExecutor):
    asynchronous = False
    default_wait = True


# Default synchronous executor


def get_synchronous_executor():
    return _SYNCHRONOUS_EXECUTOR


_SYNCHRONOUS_EXECUTOR = SynchronousExecutor()


# Getters


def get_object_green_mode(obj):
    if hasattr(obj, "get_green_mode"):
        return obj.get_green_mode()
    return get_green_mode()


def get_executor(green_mode=None):
    if green_mode is None:
        green_mode = get_green_mode()
    # Valid green modes
    if green_mode == GreenMode.Synchronous:
        return get_synchronous_executor()
    if green_mode == GreenMode.Gevent:
        from tango import gevent_executor

        return gevent_executor.get_global_executor()
    if green_mode == GreenMode.Futures:
        from tango import futures_executor

        return futures_executor.get_global_executor()
    if green_mode == GreenMode.Asyncio:
        from tango import asyncio_executor

        return asyncio_executor.get_global_executor()
    # Invalid green mode
    raise TypeError("Not a valid green mode")


def switch_existing_global_executors_to_thread():
    """
    checks which global executor existing, and if they are belong to the caller thread
    if not - creates a new executor, linked to thread, and set it as global
    """
    from tango import asyncio_executor
    from tango import futures_executor

    if _gevent_available:
        from tango import gevent_executor
    else:
        gevent_executor = None

    for executor in [asyncio_executor, futures_executor, gevent_executor]:
        if executor:
            executor._switch_global_executor_to_thread()


def get_object_executor(obj, green_mode=None):
    """Returns the proper executor for the given object.

    If the object has *_executors* and *_green_mode* members it returns
    the submit callable for the executor corresponding to the green_mode.
    Otherwise it returns the global executor for the given green_mode.

    Note: *None* is a valid object.

    :returns: submit callable"""
    # Get green mode
    if green_mode is None:
        green_mode = get_object_green_mode(obj)
    # Get executor
    executor = None
    if hasattr(obj, "_executors"):
        executor = obj._executors.get(green_mode, None)
    if executor is None:
        executor = get_executor(green_mode)
    # Get submitter
    return executor


# Green modifiers


def green(fn=None, consume_green_mode=True):
    """Make a function green. Can be used as a decorator."""

    def decorator(fn):
        @wraps(fn)
        def greener(obj, *args, **kwargs):
            args = (obj,) + args
            wait = kwargs.pop("wait", None)
            timeout = kwargs.pop("timeout", None)
            access = kwargs.pop if consume_green_mode else kwargs.get
            green_mode = access("green_mode", None)
            executor = get_object_executor(obj, green_mode)
            return executor.run(fn, args, kwargs, wait=wait, timeout=timeout)

        return greener

    if fn is None:
        return decorator
    return decorator(fn)


def green_callback(fn, obj=None, green_mode=None):
    """Return a green verion of the given callback."""
    executor = get_object_executor(obj, green_mode)

    @wraps(fn)
    def greener(*args, **kwargs):
        return executor.submit(fn, *args, **kwargs)

    return greener