File: gevent_executor.py

package info (click to toggle)
pytango 10.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 10,216 kB
  • sloc: python: 28,206; cpp: 16,380; sql: 255; sh: 82; makefile: 43
file content (198 lines) | stat: -rw-r--r-- 5,581 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later

# Imports
import sys
import os
import functools
from collections import namedtuple

# Gevent imports
import gevent.event
import gevent.queue
import gevent.monkey
import gevent.threadpool

# Bypass gevent monkey patching
ThreadSafeEvent = gevent.monkey.get_original("threading", "Event")

# Tango imports
from tango.green import AbstractExecutor, get_ident

__all__ = (
    "GeventExecutor",
    "get_global_executor",
    "set_global_executor",
    "_switch_global_executor_to_thread",
)

# Global executor

_MAIN_EXECUTOR = None
_THREAD_POOL = None
_THREAD_EXECUTORS = {}


def _switch_global_executor_to_thread():
    """
    internal PyTango function, use only if you sure, what you are doing!
    Used for correct behavior of TestDeviceContext
    checks, that global executor belongs to the caller thread, and,
    if not - creates a new one and saves it as a new global
    """
    global _MAIN_EXECUTOR
    if _MAIN_EXECUTOR is not None and not _MAIN_EXECUTOR.in_executor_context():
        # we save current executor in the known subthread executors to be used later
        _THREAD_EXECUTORS[_MAIN_EXECUTOR.get_ident()] = _MAIN_EXECUTOR
        _MAIN_EXECUTOR = GeventExecutor(subexecutor=ThreadPool(maxsize=100))


def get_global_executor():
    global _MAIN_EXECUTOR
    if _MAIN_EXECUTOR is None:
        _MAIN_EXECUTOR = GeventExecutor()

    # the following patch is used for correct behavior of TestDeviceContext,
    # which has two different executors for main and device threads
    if not _MAIN_EXECUTOR.in_executor_context():
        ident = get_ident(), os.getpid()
        if ident in _THREAD_EXECUTORS:
            return _THREAD_EXECUTORS[ident]

    return _MAIN_EXECUTOR


def set_global_executor(executor):
    global _MAIN_EXECUTOR
    _MAIN_EXECUTOR = executor


def get_global_threadpool():
    global _THREAD_POOL
    if _THREAD_POOL is None:
        _THREAD_POOL = ThreadPool(maxsize=10**4)
    return _THREAD_POOL


ExceptionInfo = namedtuple("ExceptionInfo", "type value traceback")


def wrap_error(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            return ExceptionInfo(*sys.exc_info())

    return wrapper


def unwrap_error(source):
    destination = gevent.event.AsyncResult()

    def link(source):
        if isinstance(source.value, ExceptionInfo):
            try:
                destination.set_exception(source.value.value, exc_info=source.value)
            # Gevent 1.0 compatibility
            except TypeError:
                destination.set_exception(source.value.value)
            return
        destination(source)

    source.rawlink(link)
    return destination


class ThreadPool(gevent.threadpool.ThreadPool):
    def spawn(self, fn, *args, **kwargs):
        wrapped = wrap_error(fn)
        raw = super().spawn(wrapped, *args, **kwargs)
        return unwrap_error(raw)


# Gevent task and event loop


class GeventTask:
    def __init__(self, func, *args, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.value = None
        self.exception = None
        self.done = ThreadSafeEvent()
        self.started = ThreadSafeEvent()

    def run(self):
        self.started.set()
        try:
            self.value = self.func(*self.args, **self.kwargs)
        except Exception:
            self.exception = sys.exc_info()
        finally:
            self.done.set()

    def spawn(self):
        return gevent.spawn(self.run)

    def result(self):
        self.done.wait()
        if self.exception:
            raise self.exception[1]
        return self.value


# Gevent executor


class GeventExecutor(AbstractExecutor):
    """Gevent tango executor"""

    asynchronous = True
    default_wait = True

    def __init__(self, loop=None, subexecutor=None):
        super().__init__()
        if loop is None:
            loop = gevent.get_hub().loop
        if subexecutor is None:
            subexecutor = get_global_threadpool()
        self.loop = loop
        self.subexecutor = subexecutor

    def delegate(self, fn, *args, **kwargs):
        """Return the given operation as a gevent future."""
        return self.subexecutor.spawn(fn, *args, **kwargs)

    def access(self, accessor, timeout=None):
        """Return a result from an gevent future."""
        return accessor.get(timeout=timeout)

    def create_watcher(self):
        try:
            return self.loop.async_()
        except AttributeError:
            return getattr(self.loop, "async")()

    def submit(self, fn, *args, **kwargs):
        task = GeventTask(fn, *args, **kwargs)
        watcher = self.create_watcher()
        watcher.start(task.spawn)
        watcher.send()
        task.started.wait()
        # The watcher has to be stopped in order to be garbage-collected.
        # This step is crucial since the watcher holds a reference to the
        # `task.spawn` method which itself holds a reference to the task.
        # It's also important to wait for the task to be spawned before
        # stopping the watcher, otherwise the task won't run.
        watcher.stop()
        return task

    def execute(self, fn, *args, **kwargs):
        """Execute an operation and return the result."""
        if self.in_executor_context():
            return fn(*args, **kwargs)
        task = self.submit(fn, *args, **kwargs)
        return task.result()