1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import asyncio
import os
import threading
import unittest
from datetime import datetime, timedelta
import pytest
from reactivex.scheduler.eventloop import AsyncIOThreadSafeScheduler
CI = os.getenv("CI") is not None
class TestAsyncIOThreadSafeScheduler(unittest.TestCase):
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
def test_asyncio_threadsafe_schedule_now(self):
loop = asyncio.get_event_loop()
scheduler = AsyncIOThreadSafeScheduler(loop)
diff = scheduler.now - datetime.utcfromtimestamp(loop.time())
assert abs(diff) < timedelta(milliseconds=2)
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
def test_asyncio_threadsafe_schedule_now_units(self):
loop = asyncio.get_event_loop()
scheduler = AsyncIOThreadSafeScheduler(loop)
diff = scheduler.now
yield from asyncio.sleep(0.1)
diff = scheduler.now - diff
assert timedelta(milliseconds=80) < diff < timedelta(milliseconds=180)
def test_asyncio_threadsafe_schedule_action(self):
loop = asyncio.get_event_loop()
async def go():
scheduler = AsyncIOThreadSafeScheduler(loop)
ran = False
def action(scheduler, state):
nonlocal ran
ran = True
def schedule():
scheduler.schedule(action)
threading.Thread(target=schedule).start()
await asyncio.sleep(0.1)
assert ran is True
loop.run_until_complete(go())
def test_asyncio_threadsafe_schedule_action_due(self):
loop = asyncio.get_event_loop()
async def go():
scheduler = AsyncIOThreadSafeScheduler(loop)
starttime = loop.time()
endtime = None
def action(scheduler, state):
nonlocal endtime
endtime = loop.time()
def schedule():
scheduler.schedule_relative(0.2, action)
threading.Thread(target=schedule).start()
await asyncio.sleep(0.3)
assert endtime is not None
diff = endtime - starttime
assert diff > 0.18
loop.run_until_complete(go())
def test_asyncio_threadsafe_schedule_action_cancel(self):
loop = asyncio.get_event_loop()
async def go():
ran = False
scheduler = AsyncIOThreadSafeScheduler(loop)
def action(scheduler, state):
nonlocal ran
ran = True
def schedule():
d = scheduler.schedule_relative(0.05, action)
d.dispose()
threading.Thread(target=schedule).start()
await asyncio.sleep(0.3)
assert ran is False
loop.run_until_complete(go())
def cancel_same_thread_common(self, test_body):
update_state = {"ran": False, "dispose_completed": False}
def action(scheduler, state):
update_state["ran"] = True
# Make the actual test body run in deamon thread, so that in case of
# failure it doesn't hang indefinitely.
def thread_target():
loop = asyncio.new_event_loop()
scheduler = AsyncIOThreadSafeScheduler(loop)
test_body(scheduler, action, update_state)
async def go():
await asyncio.sleep(0.2)
loop.run_until_complete(go())
thread = threading.Thread(target=thread_target)
thread.daemon = True
thread.start()
thread.join(0.3)
assert update_state["dispose_completed"] is True
assert update_state["ran"] is False
def test_asyncio_threadsafe_cancel_non_relative_same_thread(self):
def test_body(scheduler, action, update_state):
d = scheduler.schedule(action)
# Test case when dispose is called on thread on which loop is not
# yet running, and non-relative schedele is used.
d.dispose()
update_state["dispose_completed"] = True
self.cancel_same_thread_common(test_body)
def test_asyncio_threadsafe_schedule_action_cancel_same_thread(self):
def test_body(scheduler, action, update_state):
d = scheduler.schedule_relative(0.05, action)
# Test case when dispose is called on thread on which loop is not
# yet running, and relative schedule is used.
d.dispose()
update_state["dispose_completed"] = True
self.cancel_same_thread_common(test_body)
def test_asyncio_threadsafe_schedule_action_cancel_same_loop(self):
def test_body(scheduler, action, update_state):
d = scheduler.schedule_relative(0.1, action)
def do_dispose():
d.dispose()
update_state["dispose_completed"] = True
# Test case when dispose is called in loop's callback.
scheduler._loop.call_soon(do_dispose)
self.cancel_same_thread_common(test_body)
|