1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dataclasses
import functools
import multiprocessing
import sys
import typing
from .package_initializer import package_initializer
class TaskQueue(object):
"""
Represents a task queue to run tasks with using a worker pool. Scheduled
tasks will be executed in parallel.
"""
@dataclasses.dataclass
class _Task(object):
workload: int
func: callable
args: typing.List[typing.Any]
kwargs: typing.Dict[str, typing.Any]
def __init__(self, single_process=False):
"""
Args:
single_process: True makes the instance will not create nor use a
child process so that error messages will be easier to read.
This is useful for debugging.
"""
assert isinstance(single_process, bool)
if single_process:
self._single_process = True
self._pool_size = 1
self._pool = None
else:
self._single_process = False
self._pool_size = multiprocessing.cpu_count()
if sys.platform == 'win32':
# TODO(crbug.com/1190269) - we can't use more than 56
# cores on Windows or Python3 may hang.
self._pool_size = min(self._pool_size, 56)
self._pool = multiprocessing.Pool(self._pool_size,
package_initializer().init)
self._requested_tasks = [] # List of _Task
self._did_run = False
def post_task(self, func, *args, **kwargs):
"""
Schedules a new task to be executed when |run| method is invoked. This
method does not kick any execution, only puts a new task in the queue.
This task will be scheduled without any workload hint, therefore will be
queued in an arbitrary order. Use |post_task_with_workload| to influence
the order this task is scheduled in.
"""
self.post_task_with_workload(0, func, *args, **kwargs)
def post_task_with_workload(self, workload, func, *args, **kwargs):
"""
Schedules a new task to be executed when |run| method is invoked,
including a hint regarding how long this task is expected to take. This
method does not kick any execution, only puts a new task in the queue.
Tasks with higher workload values will be run first. This tries to
reduce the impact of long-running tasks on total wall time on
multiprocessor systems.
"""
assert not self._did_run
self._requested_tasks.append(self._Task(workload, func, args, kwargs))
def run(self, report_progress=None):
"""
Executes all scheduled tasks.
Args:
report_progress: A callable that takes two arguments, total number
of worker tasks and number of completed worker tasks.
"""
assert report_progress is None or callable(report_progress)
assert not self._did_run
self._did_run = True
self._requested_tasks = sorted(self._requested_tasks,
key=lambda task: task.workload,
reverse=True)
if self._single_process:
self._run_in_sequence(report_progress)
else:
self._run_in_parallel(report_progress)
def _run_in_sequence(self, report_progress):
for index, task in enumerate(self._requested_tasks):
report_progress(len(self._requested_tasks), index)
task.func(*task.args, **task.kwargs)
report_progress(len(self._requested_tasks), len(self._requested_tasks))
def _run_in_parallel(self, report_progress):
worker_tasks = [] # List of multiprocessing.pool.AsyncResult
for task in self._requested_tasks:
worker_tasks.append(
self._pool.apply_async(task.func, task.args, task.kwargs))
self._pool.close()
def report_worker_task_progress():
if not report_progress:
return
done_count = functools.reduce(
lambda count, worker_task: count + bool(worker_task.ready()),
worker_tasks, 0)
report_progress(len(worker_tasks), done_count)
timeout_in_sec = 1
while True:
report_worker_task_progress()
for worker_task in worker_tasks:
if not worker_task.ready():
worker_task.wait(timeout_in_sec)
break
if not worker_task.successful():
worker_task.get() # Let |get()| raise an exception.
assert False
else:
break
self._pool.join()
|