File: thread.py

package info (click to toggle)
python-bellows 0.40.5-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 996 kB
  • sloc: python: 13,630; sh: 7; makefile: 5
file content (122 lines) | stat: -rw-r--r-- 4,145 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
from concurrent.futures import ThreadPoolExecutor
import functools
import logging
import sys

LOGGER = logging.getLogger(__name__)


class EventLoopThread:
    """Run a parallel event loop in a separate thread."""

    def __init__(self):
        self.loop = None
        self.thread_complete = None

    def run_coroutine_threadsafe(self, coroutine):
        current_loop = asyncio.get_event_loop()
        future = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
        return asyncio.wrap_future(future, loop=current_loop)

    def _thread_main(self, init_task):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        try:
            self.loop.run_until_complete(init_task)
            self.loop.run_forever()
        finally:
            self.loop.close()
            self.loop = None

    async def start(self):
        current_loop = asyncio.get_event_loop()
        if self.loop is not None and not self.loop.is_closed():
            return

        executor_opts = {"max_workers": 1}
        if sys.version_info[:2] >= (3, 6):
            executor_opts["thread_name_prefix"] = __name__
        executor = ThreadPoolExecutor(**executor_opts)

        thread_started_future = current_loop.create_future()

        async def init_task():
            current_loop.call_soon_threadsafe(thread_started_future.set_result, None)

        # Use current loop so current loop has a reference to the long-running thread
        # as one of its tasks
        thread_complete = current_loop.run_in_executor(
            executor, self._thread_main, init_task()
        )
        self.thread_complete = thread_complete
        current_loop.call_soon(executor.shutdown, False)
        await thread_started_future
        return thread_complete

    def force_stop(self):
        if self.loop is None:
            return

        def cancel_tasks_and_stop_loop():
            tasks = asyncio.all_tasks(loop=self.loop)

            for task in tasks:
                self.loop.call_soon_threadsafe(task.cancel)

            gather = asyncio.gather(*tasks, return_exceptions=True)
            gather.add_done_callback(
                lambda _: self.loop.call_soon_threadsafe(self.loop.stop)
            )

        self.loop.call_soon_threadsafe(cancel_tasks_and_stop_loop)


class ThreadsafeProxy:
    """Proxy class which enforces threadsafe non-blocking calls
    This class can be used to wrap an object to ensure any calls
    using that object's methods are done on a particular event loop
    """

    def __init__(self, obj, obj_loop):
        self._obj = obj
        self._obj_loop = obj_loop

    def __getattr__(self, name):
        func = getattr(self._obj, name)
        if not callable(func):
            raise TypeError(
                "Can only use ThreadsafeProxy with callable attributes: {}.{}".format(
                    self._obj.__class__.__name__, name
                )
            )

        def func_wrapper(*args, **kwargs):
            loop = self._obj_loop
            curr_loop = asyncio.get_running_loop()
            call = functools.partial(func, *args, **kwargs)
            if loop == curr_loop:
                return call()
            if loop.is_closed():
                # Disconnected
                LOGGER.warning("Attempted to use a closed event loop")
                return
            if asyncio.iscoroutinefunction(func):
                future = asyncio.run_coroutine_threadsafe(call(), loop)
                return asyncio.wrap_future(future, loop=curr_loop)
            else:

                def check_result_wrapper():
                    result = call()
                    if result is not None:
                        raise TypeError(
                            (
                                "ThreadsafeProxy can only wrap functions with no return"
                                "value \nUse an async method to return values: {}.{}"
                            ).format(self._obj.__class__.__name__, name)
                        )

                loop.call_soon_threadsafe(check_result_wrapper)

        return func_wrapper