1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
import logging
from datetime import datetime
from typing import Any, Optional, TypeVar
from reactivex import abc, typing
from reactivex.disposable import (
CompositeDisposable,
Disposable,
SingleAssignmentDisposable,
)
from ..periodicscheduler import PeriodicScheduler
_TState = TypeVar("_TState")
log = logging.getLogger("Rx")
class IOLoopScheduler(PeriodicScheduler):
"""A scheduler that schedules work via the Tornado I/O main event loop.
Note, as of Tornado 6, this is just a wrapper around the asyncio loop.
http://tornado.readthedocs.org/en/latest/ioloop.html"""
def __init__(self, loop: Any) -> None:
"""Create a new IOLoopScheduler.
Args:
loop: The ioloop to use; typically, you would get this by
tornado import ioloop; ioloop.IOLoop.current()
"""
super().__init__()
self._loop = loop
def schedule(
self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
) -> abc.DisposableBase:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
sad = SingleAssignmentDisposable()
disposed = False
def interval() -> None:
if not disposed:
sad.disposable = self.invoke_action(action, state=state)
self._loop.add_callback(interval)
def dispose() -> None:
nonlocal disposed
disposed = True
return CompositeDisposable(sad, Disposable(dispose))
def schedule_relative(
self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
seconds = self.to_seconds(duetime)
if seconds <= 0.0:
return self.schedule(action, state=state)
sad = SingleAssignmentDisposable()
def interval() -> None:
sad.disposable = self.invoke_action(action, state=state)
log.debug("timeout: %s", seconds)
timer = self._loop.call_later(seconds, interval)
def dispose() -> None:
self._loop.remove_timeout(timer)
self._loop.remove_timeout(timer)
return CompositeDisposable(sad, Disposable(dispose))
def schedule_absolute(
self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time at which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
duetime = self.to_datetime(duetime)
return self.schedule_relative(duetime - self.now, action, state=state)
@property
def now(self) -> datetime:
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by this
property.
Returns:
The scheduler's current time, as a datetime instance.
"""
return self.to_datetime(self._loop.time())
|