1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
|
#!/usr/bin/env python3
# Copyright 2014 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import logging
import os
import signal
import threading
import traceback
from contextlib import contextmanager
from multiprocessing import Process, Queue
from queue import Empty
def setup_testing():
"""For testing only: Use threading under the hood instead of multiprocessing
to make coverage work.
"""
global Queue
global Process
del Queue
del Process
from queue import Queue
from threading import Thread as Process
# Monkeypatch os.kill and add fake pid property on Thread.
os.kill = lambda *args: None
Process.pid = property(lambda self: None)
class AbortException(Exception):
"""Indicates early abort on SIGINT, SIGTERM or internal hard timeout."""
pass
class NormalResult():
def __init__(self, result):
self.result = result
self.exception = None
class ExceptionResult():
def __init__(self, exception):
self.exception = exception
class MaybeResult():
def __init__(self, heartbeat, value):
self.heartbeat = heartbeat
self.value = value
@staticmethod
def create_heartbeat():
return MaybeResult(True, None)
@staticmethod
def create_result(value):
return MaybeResult(False, value)
def Worker(fn, work_queue, done_queue,
process_context_fn=None, process_context_args=None):
"""Worker to be run in a child process.
The worker stops when the poison pill "STOP" is reached.
"""
# Install a default signal handler for SIGTERM that stops the processing
# loop below on the next occasion. The job function "fn" is supposed to
# register their own handler to avoid blocking, but still chain to this
# handler on SIGTERM to terminate the loop quickly.
stop = [False]
def handler(signum, frame):
stop[0] = True
signal.signal(signal.SIGTERM, handler)
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
if stop[0]:
# SIGINT, SIGTERM or internal hard timeout caught outside the execution
# of "fn".
break
try:
done_queue.put(NormalResult(fn(*args, **kwargs)))
except AbortException:
# SIGINT, SIGTERM or internal hard timeout caught during execution of
# "fn".
break
except Exception as e:
logging.exception('Unhandled error during worker execution.')
done_queue.put(ExceptionResult(e))
except KeyboardInterrupt:
assert False, 'Unreachable'
@contextmanager
def without_sig():
int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
try:
yield
finally:
signal.signal(signal.SIGINT, int_handler)
signal.signal(signal.SIGTERM, term_handler)
@contextmanager
def drain_queue_async(queue):
"""Drains a queue in a background thread until the wrapped code unblocks.
This can be used to unblock joining a child process that might still write
to the queue. The join should be wrapped by this context manager.
"""
keep_running = True
def empty_queue():
elem_count = 0
while keep_running:
try:
while True:
queue.get(True, 0.1)
elem_count += 1
if elem_count < 200:
logging.info('Drained an element from queue.')
except Empty:
pass
except:
logging.exception('Error draining queue.')
emptier = threading.Thread(target=empty_queue)
emptier.start()
yield
keep_running = False
emptier.join()
class ContextPool():
def __init__(self):
self.abort_now = False
def init(self, num_workers, heartbeat_timeout=1, notify_function=None):
"""
Delayed initialization. At context creation time we have no access to the
below described parameters.
Args:
num_workers: Number of worker processes to run in parallel.
heartbeat_timeout: Timeout in seconds for waiting for results. Each time
the timeout is reached, a heartbeat is signalled and timeout is reset.
notify_function: Callable called to signal some events like termination. The
event name is passed as string.
"""
pass
def add_jobs(self, jobs):
pass
def results(self, requirement):
pass
def abort(self):
self.abort_now = True
ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])
class DefaultExecutionPool(ContextPool):
"""Distributes tasks to a number of worker processes.
New tasks can be added dynamically even after the workers have been started.
Requirement: Tasks can only be added from the parent process, e.g. while
consuming the results generator."""
# Factor to calculate the maximum number of items in the work/done queue.
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
def __init__(self, os_context=None):
super(DefaultExecutionPool, self).__init__()
self.os_context = os_context
self.processes = []
self.terminated = False
# Invariant: processing_count >= #work_queue + #done_queue. It is greater
# when a worker takes an item from the work_queue and before the result is
# submitted to the done_queue. It is equal when no worker is working,
# e.g. when all workers have finished, and when no results are processed.
# Count is only accessed by the parent process. Only the parent process is
# allowed to remove items from the done_queue and to add items to the
# work_queue.
self.processing_count = 0
# Disable sigint and sigterm to prevent subprocesses from capturing the
# signals.
with without_sig():
self.work_queue = Queue()
self.done_queue = Queue()
def init(self, num_workers=1, heartbeat_timeout=1, notify_function=None):
"""
Args:
num_workers: Number of worker processes to run in parallel.
heartbeat_timeout: Timeout in seconds for waiting for results. Each time
the timeout is reached, a heartbeat is signalled and timeout is reset.
notify_function: Callable called to signal some events like termination. The
event name is passed as string.
"""
self.num_workers = num_workers
self.heartbeat_timeout = heartbeat_timeout
self.notify = notify_function or (lambda x: x)
def add_jobs(self, jobs):
self.add(jobs)
def results(self, requirement):
return self.imap_unordered(
fn=run_job,
gen=[],
process_context_fn=ProcessContext,
process_context_args=[requirement],
)
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
"""Maps function "fn" to items in generator "gen" on the worker processes
in an arbitrary order. The items are expected to be lists of arguments to
the function. Returns a results iterator. A result value of type
MaybeResult either indicates a heartbeat of the runner, i.e. indicating
that the runner is still waiting for the result to be computed, or it wraps
the real result.
Args:
process_context_fn: Function executed once by each worker. Expected to
return a process-context object. If present, this object is passed
as additional argument to each call to fn.
process_context_args: List of arguments for the invocation of
process_context_fn. All arguments will be pickled and sent beyond the
process boundary.
"""
if self.terminated:
return
try:
internal_error = False
gen = iter(gen)
self.advance = self._advance_more
# Disable sigint and sigterm to prevent subprocesses from capturing the
# signals.
with without_sig():
for w in range(self.num_workers):
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
process_context_fn,
process_context_args))
p.start()
self.processes.append(p)
self.advance(gen)
while self.processing_count > 0:
while True:
try:
# Read from result queue in a responsive fashion. If available,
# this will return a normal result immediately or a heartbeat on
# heartbeat timeout (default 1 second).
result = self._get_result_from_queue()
except:
# TODO(machenbach): Handle a few known types of internal errors
# gracefully, e.g. missing test files.
logging.exception('Internal error in a worker process.')
internal_error = True
continue
finally:
if self.abort_now:
# SIGINT, SIGTERM or internal hard timeout.
return
yield result
break
self.advance(gen)
except KeyboardInterrupt:
assert False, 'Unreachable'
except Exception:
logging.exception('Unhandled error during pool execution.')
finally:
self._terminate()
if internal_error:
raise Exception('Internal error in a worker process.')
def _advance_more(self, gen):
while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
try:
self.work_queue.put(next(gen))
self.processing_count += 1
except StopIteration:
self.advance = self._advance_empty
break
def _advance_empty(self, gen):
pass
def add(self, args):
"""Adds an item to the work queue. Can be called dynamically while
processing the results from imap_unordered."""
assert not self.terminated
self.work_queue.put(args)
self.processing_count += 1
def abort(self):
"""Schedules abort on next queue read.
This is safe to call when handling SIGINT, SIGTERM or when an internal
hard timeout is reached.
"""
self.abort_now = True
def _terminate_processes(self):
for p in self.processes:
self.os_context.terminate_process(p)
def _terminate(self):
"""Terminates execution and cleans up the queues.
If abort() was called before termination, this also terminates the
subprocesses and doesn't wait for ongoing tests.
"""
if self.terminated:
return
self.terminated = True
# Drain out work queue from tests
try:
while True:
self.work_queue.get(True, 0.1)
except Empty:
pass
# Make sure all processes stop
for _ in self.processes:
# During normal tear down the workers block on get(). Feed a poison pill
# per worker to make them stop.
self.work_queue.put("STOP")
# Send a SIGTERM to all workers. They will gracefully terminate their
# processing loop and if the signal is caught during job execution they
# will try to terminate the ongoing test processes quickly.
if self.abort_now:
self._terminate_processes()
self.notify("Joining workers")
with drain_queue_async(self.done_queue):
for p in self.processes:
p.join()
self.notify("Pool terminated")
def _get_result_from_queue(self):
"""Attempts to get the next result from the queue.
Returns: A wrapped result if one was available within heartbeat timeout,
a heartbeat result otherwise.
Raises:
Exception: If an exception occured when processing the task on the
worker side, it is reraised here.
"""
while True:
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
self.processing_count -= 1
if result.exception:
raise result.exception
return MaybeResult.create_result(result.result)
except Empty:
return MaybeResult.create_heartbeat()
class SingleThreadedExecutionPool(ContextPool):
def __init__(self):
super(SingleThreadedExecutionPool, self).__init__()
self.work_queue = []
def add_jobs(self, jobs):
self.work_queue.extend(jobs)
def results(self, requirement):
while self.work_queue and not self.abort_now:
job = self.work_queue.pop()
yield MaybeResult.create_result(job.run(ProcessContext(requirement)))
# Global function for multiprocessing, because pickling a static method doesn't
# work on Windows.
def run_job(job, process_context):
return job.run(process_context)
|