File: pool.py

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (401 lines) | stat: -rw-r--r-- 12,607 bytes parent folder | download | duplicates (14)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#!/usr/bin/env python3
# Copyright 2014 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import logging
import os
import signal
import threading
import traceback

from contextlib import contextmanager
from multiprocessing import Process, Queue
from queue import Empty



def setup_testing():
  """For testing only: Use threading under the hood instead of multiprocessing
  to make coverage work.
  """
  global Queue
  global Process
  del Queue
  del Process
  from queue import Queue
  from threading import Thread as Process
  # Monkeypatch os.kill and add fake pid property on Thread.
  os.kill = lambda *args: None
  Process.pid = property(lambda self: None)


class AbortException(Exception):
  """Indicates early abort on SIGINT, SIGTERM or internal hard timeout."""
  pass


class NormalResult():
  def __init__(self, result):
    self.result = result
    self.exception = None

class ExceptionResult():
  def __init__(self, exception):
    self.exception = exception


class MaybeResult():
  def __init__(self, heartbeat, value):
    self.heartbeat = heartbeat
    self.value = value

  @staticmethod
  def create_heartbeat():
    return MaybeResult(True, None)

  @staticmethod
  def create_result(value):
    return MaybeResult(False, value)


def Worker(fn, work_queue, done_queue,
           process_context_fn=None, process_context_args=None):
  """Worker to be run in a child process.
  The worker stops when the poison pill "STOP" is reached.
  """
  # Install a default signal handler for SIGTERM that stops the processing
  # loop below on the next occasion. The job function "fn" is supposed to
  # register their own handler to avoid blocking, but still chain to this
  # handler on SIGTERM to terminate the loop quickly.
  stop = [False]
  def handler(signum, frame):
    stop[0] = True
  signal.signal(signal.SIGTERM, handler)

  try:
    kwargs = {}
    if process_context_fn and process_context_args is not None:
      kwargs.update(process_context=process_context_fn(*process_context_args))
    for args in iter(work_queue.get, "STOP"):
      if stop[0]:
        # SIGINT, SIGTERM or internal hard timeout caught outside the execution
        # of "fn".
        break
      try:
        done_queue.put(NormalResult(fn(*args, **kwargs)))
      except AbortException:
        # SIGINT, SIGTERM or internal hard timeout caught during execution of
        # "fn".
        break
      except Exception as e:
        logging.exception('Unhandled error during worker execution.')
        done_queue.put(ExceptionResult(e))
  except KeyboardInterrupt:
    assert False, 'Unreachable'


@contextmanager
def without_sig():
  int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
  term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
  try:
    yield
  finally:
    signal.signal(signal.SIGINT, int_handler)
    signal.signal(signal.SIGTERM, term_handler)


@contextmanager
def drain_queue_async(queue):
  """Drains a queue in a background thread until the wrapped code unblocks.

  This can be used to unblock joining a child process that might still write
  to the queue. The join should be wrapped by this context manager.
  """
  keep_running = True

  def empty_queue():
    elem_count = 0
    while keep_running:
      try:
        while True:
          queue.get(True, 0.1)
          elem_count += 1
          if elem_count < 200:
            logging.info('Drained an element from queue.')
      except Empty:
        pass
      except:
        logging.exception('Error draining queue.')

  emptier = threading.Thread(target=empty_queue)
  emptier.start()
  yield
  keep_running = False
  emptier.join()


class ContextPool():

  def __init__(self):
    self.abort_now = False

  def init(self, num_workers, heartbeat_timeout=1, notify_function=None):
    """
    Delayed initialization. At context creation time we have no access to the
    below described parameters.
    Args:
      num_workers: Number of worker processes to run in parallel.
      heartbeat_timeout: Timeout in seconds for waiting for results. Each time
          the timeout is reached, a heartbeat is signalled and timeout is reset.
      notify_function: Callable called to signal some events like termination. The
          event name is passed as string.
    """
    pass

  def add_jobs(self, jobs):
    pass

  def results(self, requirement):
    pass

  def abort(self):
    self.abort_now = True


ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])


class DefaultExecutionPool(ContextPool):
  """Distributes tasks to a number of worker processes.
  New tasks can be added dynamically even after the workers have been started.
  Requirement: Tasks can only be added from the parent process, e.g. while
  consuming the results generator."""

  # Factor to calculate the maximum number of items in the work/done queue.
  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
  BUFFER_FACTOR = 4

  def __init__(self, os_context=None):
    super(DefaultExecutionPool, self).__init__()
    self.os_context = os_context
    self.processes = []
    self.terminated = False

    # Invariant: processing_count >= #work_queue + #done_queue. It is greater
    # when a worker takes an item from the work_queue and before the result is
    # submitted to the done_queue. It is equal when no worker is working,
    # e.g. when all workers have finished, and when no results are processed.
    # Count is only accessed by the parent process. Only the parent process is
    # allowed to remove items from the done_queue and to add items to the
    # work_queue.
    self.processing_count = 0

    # Disable sigint and sigterm to prevent subprocesses from capturing the
    # signals.
    with without_sig():
      self.work_queue = Queue()
      self.done_queue = Queue()

  def init(self, num_workers=1, heartbeat_timeout=1, notify_function=None):
    """
    Args:
      num_workers: Number of worker processes to run in parallel.
      heartbeat_timeout: Timeout in seconds for waiting for results. Each time
          the timeout is reached, a heartbeat is signalled and timeout is reset.
      notify_function: Callable called to signal some events like termination. The
          event name is passed as string.
    """
    self.num_workers = num_workers
    self.heartbeat_timeout = heartbeat_timeout
    self.notify = notify_function or (lambda x: x)

  def add_jobs(self, jobs):
    self.add(jobs)

  def results(self, requirement):
    return self.imap_unordered(
        fn=run_job,
        gen=[],
        process_context_fn=ProcessContext,
        process_context_args=[requirement],
    )

  def imap_unordered(self, fn, gen,
                     process_context_fn=None, process_context_args=None):
    """Maps function "fn" to items in generator "gen" on the worker processes
    in an arbitrary order. The items are expected to be lists of arguments to
    the function. Returns a results iterator. A result value of type
    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
    that the runner is still waiting for the result to be computed, or it wraps
    the real result.

    Args:
      process_context_fn: Function executed once by each worker. Expected to
          return a process-context object. If present, this object is passed
          as additional argument to each call to fn.
      process_context_args: List of arguments for the invocation of
          process_context_fn. All arguments will be pickled and sent beyond the
          process boundary.
    """
    if self.terminated:
      return
    try:
      internal_error = False
      gen = iter(gen)
      self.advance = self._advance_more

      # Disable sigint and sigterm to prevent subprocesses from capturing the
      # signals.
      with without_sig():
        for w in range(self.num_workers):
          p = Process(target=Worker, args=(fn,
                                          self.work_queue,
                                          self.done_queue,
                                          process_context_fn,
                                          process_context_args))
          p.start()
          self.processes.append(p)

      self.advance(gen)
      while self.processing_count > 0:
        while True:
          try:
            # Read from result queue in a responsive fashion. If available,
            # this will return a normal result immediately or a heartbeat on
            # heartbeat timeout (default 1 second).
            result = self._get_result_from_queue()
          except:
            # TODO(machenbach): Handle a few known types of internal errors
            # gracefully, e.g. missing test files.
            logging.exception('Internal error in a worker process.')
            internal_error = True
            continue
          finally:
            if self.abort_now:
              # SIGINT, SIGTERM or internal hard timeout.
              return

          yield result
          break

        self.advance(gen)
    except KeyboardInterrupt:
      assert False, 'Unreachable'
    except Exception:
      logging.exception('Unhandled error during pool execution.')
    finally:
      self._terminate()

    if internal_error:
      raise Exception('Internal error in a worker process.')

  def _advance_more(self, gen):
    while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
      try:
        self.work_queue.put(next(gen))
        self.processing_count += 1
      except StopIteration:
        self.advance = self._advance_empty
        break

  def _advance_empty(self, gen):
    pass

  def add(self, args):
    """Adds an item to the work queue. Can be called dynamically while
    processing the results from imap_unordered."""
    assert not self.terminated

    self.work_queue.put(args)
    self.processing_count += 1

  def abort(self):
    """Schedules abort on next queue read.

    This is safe to call when handling SIGINT, SIGTERM or when an internal
    hard timeout is reached.
    """
    self.abort_now = True

  def _terminate_processes(self):
    for p in self.processes:
      self.os_context.terminate_process(p)

  def _terminate(self):
    """Terminates execution and cleans up the queues.

    If abort() was called before termination, this also terminates the
    subprocesses and doesn't wait for ongoing tests.
    """
    if self.terminated:
      return
    self.terminated = True

    # Drain out work queue from tests
    try:
      while True:
        self.work_queue.get(True, 0.1)
    except Empty:
      pass

    # Make sure all processes stop
    for _ in self.processes:
      # During normal tear down the workers block on get(). Feed a poison pill
      # per worker to make them stop.
      self.work_queue.put("STOP")

    # Send a SIGTERM to all workers. They will gracefully terminate their
    # processing loop and if the signal is caught during job execution they
    # will try to terminate the ongoing test processes quickly.
    if self.abort_now:
      self._terminate_processes()

    self.notify("Joining workers")
    with drain_queue_async(self.done_queue):
      for p in self.processes:
        p.join()

    self.notify("Pool terminated")

  def _get_result_from_queue(self):
    """Attempts to get the next result from the queue.

    Returns: A wrapped result if one was available within heartbeat timeout,
        a heartbeat result otherwise.
    Raises:
        Exception: If an exception occured when processing the task on the
            worker side, it is reraised here.
    """
    while True:
      try:
        result = self.done_queue.get(timeout=self.heartbeat_timeout)
        self.processing_count -= 1
        if result.exception:
          raise result.exception
        return MaybeResult.create_result(result.result)
      except Empty:
        return MaybeResult.create_heartbeat()


class SingleThreadedExecutionPool(ContextPool):

  def __init__(self):
    super(SingleThreadedExecutionPool, self).__init__()
    self.work_queue = []

  def add_jobs(self, jobs):
    self.work_queue.extend(jobs)

  def results(self, requirement):
    while self.work_queue and not self.abort_now:
      job = self.work_queue.pop()
      yield MaybeResult.create_result(job.run(ProcessContext(requirement)))


# Global function for multiprocessing, because pickling a static method doesn't
# work on Windows.
def run_job(job, process_context):
  return job.run(process_context)