1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
# Copyright 2016-2018 Dirk Thomas
# Licensed under the Apache License, Version 2.0
import asyncio
from collections import OrderedDict
import os
import signal
import sys
from threading import Thread
import time
from types import SimpleNamespace
from colcon_core.executor import Job
from colcon_core.executor import OnError
from colcon_core.subprocess import SIGINT_RESULT
from colcon_parallel_executor.executor.parallel \
import ParallelExecutorExtension
import pytest
ran_jobs = []
class Job1(Job):
def __init__(self):
super().__init__(
identifier='job1', dependencies=set(), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
global ran_jobs
ran_jobs.append(self.identifier)
class Job2(Job):
def __init__(self):
super().__init__(
identifier='job2', dependencies=set(), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
return 2
class Job3(Job):
def __init__(self):
super().__init__(
identifier='job3', dependencies=set(), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
raise RuntimeError('custom exception')
class Job4(Job):
def __init__(self):
super().__init__(
identifier='job4', dependencies=set(), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
global ran_jobs
try:
await asyncio.sleep(0.05)
except asyncio.CancelledError:
return SIGINT_RESULT
ran_jobs.append(self.identifier)
class Job5(Job):
def __init__(self):
super().__init__(
identifier='job5', dependencies=set(), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
return 5
class Job6(Job):
def __init__(self):
super().__init__(
identifier='job6', dependencies=('job2', ), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
global ran_jobs
ran_jobs.append(self.identifier)
class Job7(Job):
def __init__(self):
super().__init__(
identifier='job7', dependencies=('job1', ), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
global ran_jobs
ran_jobs.append(self.identifier)
def test_parallel():
global ran_jobs
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['one'] = Job1()
# success
rc = extension.execute(args, jobs)
assert rc == 0
assert ran_jobs == ['job1']
ran_jobs.clear()
# return error code
jobs['two'] = Job2()
jobs['four'] = Job4()
rc = extension.execute(args, jobs)
assert rc == 2
assert ran_jobs == ['job1']
ran_jobs.clear()
rc = extension.execute(args, jobs, on_error=OnError.skip_pending)
assert rc == 2
assert ran_jobs == ['job1']
ran_jobs.clear()
# return error code, but with more workers
args.parallel_workers = 3
rc = extension.execute(args, jobs)
assert rc == 2
assert ran_jobs == ['job1']
ran_jobs.clear()
rc = extension.execute(args, jobs, on_error=OnError.skip_pending)
assert rc == 2
assert ran_jobs == ['job1', 'job4']
ran_jobs.clear()
# continue after error, keeping first error code
jobs['five'] = Job5()
rc = extension.execute(args, jobs, on_error=OnError.continue_)
assert rc == 2
assert ran_jobs == ['job1', 'job4']
ran_jobs.clear()
# continue but skip downstream
jobs['six'] = Job6()
jobs['seven'] = Job7()
rc = extension.execute(args, jobs, on_error=OnError.skip_downstream)
assert rc == 2
assert ran_jobs == ['job1', 'job7', 'job4']
ran_jobs.clear()
# exception
jobs['two'] = Job3()
rc = extension.execute(args, jobs)
assert isinstance(rc, RuntimeError)
assert ran_jobs == ['job1']
ran_jobs.clear()
class Job8(Job):
def __init__(self):
super().__init__(
identifier='job8', dependencies=set(), task=None,
task_context=None)
async def __call__(self, *args, **kwargs):
global ran_jobs
await asyncio.sleep(3)
ran_jobs.append(self.identifier)
@pytest.fixture
def restore_sigint_handler():
handler = signal.getsignal(signal.SIGINT)
yield
signal.signal(signal.SIGINT, handler)
def test_parallel_keyboard_interrupt(restore_sigint_handler):
global ran_jobs
if sys.platform == 'win32':
pytest.skip(
'Skipping keyboard interrupt test since the signal will cause '
'pytest to return failure even if no tests fail.')
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=3)
jobs = OrderedDict()
jobs['one'] = Job1()
jobs['aborted'] = Job8()
jobs['four'] = Job4()
def delayed_sigint():
time.sleep(0.1)
# Note: a real Ctrl-C would signal the whole process group
os.kill(
os.getpid(),
signal.SIGINT if sys.platform != 'win32' else signal.CTRL_C_EVENT)
if sys.platform == 'win32':
os.kill(os.getpid(), signal.CTRL_C_EVENT)
thread = Thread(target=delayed_sigint)
thread.start()
try:
rc = extension.execute(args, jobs)
finally:
thread.join()
assert rc == signal.SIGINT
ran_jobs.clear()
|