1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later
import os
# Concurrent imports
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# Tango imports
from tango.green import AbstractExecutor, get_ident
__all__ = (
"FuturesExecutor",
"get_global_executor",
"set_global_executor",
"_switch_global_executor_to_thread",
)
# Global executor
_MAIN_EXECUTOR = None
_THREAD_EXECUTORS = {}
def _switch_global_executor_to_thread():
"""
internal PyTango function, use only if you sure, what you are doing!
Used for correct behavior of TestDeviceContext
checks, that global executor belongs to the caller thread, and,
if not - creates a new one and saves it as a new global
"""
global _MAIN_EXECUTOR
if _MAIN_EXECUTOR is not None and not _MAIN_EXECUTOR.in_executor_context():
# we save current executor in the known subthread executors to be used later
_THREAD_EXECUTORS[_MAIN_EXECUTOR.get_ident()] = _MAIN_EXECUTOR
_MAIN_EXECUTOR = FuturesExecutor()
def get_global_executor():
global _MAIN_EXECUTOR
if _MAIN_EXECUTOR is None:
_MAIN_EXECUTOR = FuturesExecutor()
# the following patch is used for correct behavior of TestDeviceContext,
# which has two different executors for main and device threads
if not _MAIN_EXECUTOR.in_executor_context():
ident = get_ident(), os.getpid()
if ident in _THREAD_EXECUTORS:
return _THREAD_EXECUTORS[ident]
return _MAIN_EXECUTOR
def set_global_executor(executor):
global _MAIN_EXECUTOR
_MAIN_EXECUTOR = executor
# Futures executor
class FuturesExecutor(AbstractExecutor):
"""Futures tango executor"""
asynchronous = True
default_wait = True
def __init__(self, process=False, max_workers=20):
super().__init__()
cls = ProcessPoolExecutor if process else ThreadPoolExecutor
self.subexecutor = cls(max_workers=max_workers)
def delegate(self, fn, *args, **kwargs):
"""Return the given operation as a concurrent future."""
return self.subexecutor.submit(fn, *args, **kwargs)
def access(self, accessor, timeout=None):
"""Return a result from a single callable."""
return accessor.result(timeout=timeout)
def submit(self, fn, *args, **kwargs):
"""Submit an operation"""
return fn(*args, **kwargs)
def execute(self, fn, *args, **kwargs):
"""Execute an operation and return the result."""
return fn(*args, **kwargs)
|