1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Optional
from reactivex import abc, typing
from .newthreadscheduler import NewThreadScheduler
class ThreadPoolScheduler(NewThreadScheduler):
"""A scheduler that schedules work via the thread pool."""
class ThreadPoolThread(abc.StartableBase):
"""Wraps a concurrent future as a thread."""
def __init__(
self, executor: ThreadPoolExecutor, target: typing.StartableTarget
):
self.executor: ThreadPoolExecutor = executor
self.target: typing.StartableTarget = target
self.future: Optional["Future[Any]"] = None
def start(self) -> None:
self.future = self.executor.submit(self.target)
def cancel(self) -> None:
if self.future:
self.future.cancel()
def __init__(self, max_workers: Optional[int] = None) -> None:
self.executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=max_workers)
def thread_factory(
target: typing.StartableTarget,
) -> ThreadPoolScheduler.ThreadPoolThread:
return self.ThreadPoolThread(self.executor, target)
super().__init__(thread_factory)
|