File: futures_executor.py

package info (click to toggle)
pytango 10.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 10,216 kB
  • sloc: python: 28,206; cpp: 16,380; sql: 255; sh: 82; makefile: 43
file content (88 lines) | stat: -rw-r--r-- 2,592 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later


import os

# Concurrent imports
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

# Tango imports
from tango.green import AbstractExecutor, get_ident

__all__ = (
    "FuturesExecutor",
    "get_global_executor",
    "set_global_executor",
    "_switch_global_executor_to_thread",
)

# Global executor

_MAIN_EXECUTOR = None
_THREAD_EXECUTORS = {}


def _switch_global_executor_to_thread():
    """
    internal PyTango function, use only if you sure, what you are doing!
    Used for correct behavior of TestDeviceContext
    checks, that global executor belongs to the caller thread, and,
    if not - creates a new one and saves it as a new global
    """
    global _MAIN_EXECUTOR
    if _MAIN_EXECUTOR is not None and not _MAIN_EXECUTOR.in_executor_context():
        # we save current executor in the known subthread executors to be used later
        _THREAD_EXECUTORS[_MAIN_EXECUTOR.get_ident()] = _MAIN_EXECUTOR
        _MAIN_EXECUTOR = FuturesExecutor()


def get_global_executor():
    global _MAIN_EXECUTOR
    if _MAIN_EXECUTOR is None:
        _MAIN_EXECUTOR = FuturesExecutor()

    # the following patch is used for correct behavior of TestDeviceContext,
    # which has two different executors for main and device threads
    if not _MAIN_EXECUTOR.in_executor_context():
        ident = get_ident(), os.getpid()
        if ident in _THREAD_EXECUTORS:
            return _THREAD_EXECUTORS[ident]

    return _MAIN_EXECUTOR


def set_global_executor(executor):
    global _MAIN_EXECUTOR
    _MAIN_EXECUTOR = executor


# Futures executor


class FuturesExecutor(AbstractExecutor):
    """Futures tango executor"""

    asynchronous = True
    default_wait = True

    def __init__(self, process=False, max_workers=20):
        super().__init__()
        cls = ProcessPoolExecutor if process else ThreadPoolExecutor
        self.subexecutor = cls(max_workers=max_workers)

    def delegate(self, fn, *args, **kwargs):
        """Return the given operation as a concurrent future."""
        return self.subexecutor.submit(fn, *args, **kwargs)

    def access(self, accessor, timeout=None):
        """Return a result from a single callable."""
        return accessor.result(timeout=timeout)

    def submit(self, fn, *args, **kwargs):
        """Submit an operation"""
        return fn(*args, **kwargs)

    def execute(self, fn, *args, **kwargs):
        """Execute an operation and return the result."""
        return fn(*args, **kwargs)