File: parallel_testsuite.py

package info (click to toggle)
emscripten 2.0.12~dfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 108,440 kB
  • sloc: ansic: 510,324; cpp: 384,763; javascript: 84,341; python: 51,362; sh: 50,019; pascal: 4,159; makefile: 3,409; asm: 2,150; lisp: 1,869; ruby: 488; cs: 142
file content (235 lines) | stat: -rw-r--r-- 7,224 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# Copyright 2017 The Emscripten Authors.  All rights reserved.
# Emscripten is available under two separate licenses, the MIT license and the
# University of Illinois/NCSA Open Source License.  Both these licenses can be
# found in the LICENSE file.

import multiprocessing
import os
import sys
import unittest
import tempfile
import time
import queue

from tools.tempfiles import try_delete


def g_testing_thread(work_queue, result_queue, temp_dir):
  for test in iter(lambda: get_from_queue(work_queue), None):
    result = BufferedParallelTestResult()
    test.set_temp_dir(temp_dir)
    try:
      test(result)
    except unittest.SkipTest as e:
      result.addSkip(test, e)
    except Exception as e:
      result.addError(test, e)
    result_queue.put(result)


class ParallelTestSuite(unittest.BaseTestSuite):
  """Runs a suite of tests in parallel.

  Creates worker threads, manages the task queue, and combines the results.
  """

  def __init__(self, max_cores):
    super(ParallelTestSuite, self).__init__()
    self.processes = None
    self.result_queue = None
    self.max_cores = max_cores

  def run(self, result):
    test_queue = self.create_test_queue()
    self.init_processes(test_queue)
    results = self.collect_results()
    return self.combine_results(result, results)

  def create_test_queue(self):
    test_queue = multiprocessing.Queue()
    for test in self.reversed_tests():
      test_queue.put(test)
    return test_queue

  def reversed_tests(self):
    """A list of this suite's tests in reverse order.

    Many of the tests in test_core are intentionally named so that long tests
    fall toward the end of the alphabet (e.g. test_the_bullet). Tests are
    loaded in alphabetical order, so here we reverse that in order to start
    running longer tasks earlier, which should lead to better core utilization.

    Future work: measure slowness of tests and sort accordingly.
    """
    tests = []
    for test in self:
      tests.append(test)
    tests.sort(key=str)
    return tests[::-1]

  def init_processes(self, test_queue):
    use_cores = min(self.max_cores, num_cores())
    print('Using %s parallel test processes' % use_cores)
    self.processes = []
    self.result_queue = multiprocessing.Queue()
    self.dedicated_temp_dirs = [tempfile.mkdtemp() for x in range(use_cores)]
    for temp_dir in self.dedicated_temp_dirs:
      p = multiprocessing.Process(target=g_testing_thread,
                                  args=(test_queue, self.result_queue, temp_dir))
      p.start()
      self.processes.append(p)

  def collect_results(self):
    buffered_results = []
    while len(self.processes):
      res = get_from_queue(self.result_queue)
      if res is not None:
        buffered_results.append(res)
      else:
        self.clear_finished_processes()
    for temp_dir in self.dedicated_temp_dirs:
      try_delete(temp_dir)
    return buffered_results

  def clear_finished_processes(self):
    self.processes = [p for p in self.processes if p.is_alive()]

  def combine_results(self, result, buffered_results):
    print()
    print('DONE: combining results on main thread')
    print()
    # Sort the results back into alphabetical order. Running the tests in
    # parallel causes mis-orderings, this makes the results more readable.
    results = sorted(buffered_results, key=lambda res: str(res.test))
    for r in results:
      r.updateResult(result)
    return result


class BufferedParallelTestResult():
  """A picklable struct used to communicate test results across processes

  Fulfills the interface for unittest.TestResult
  """
  def __init__(self):
    self.buffered_result = None

  @property
  def test(self):
    return self.buffered_result.test

  def updateResult(self, result):
    result.startTest(self.test)
    self.buffered_result.updateResult(result)
    result.stopTest(self.test)

  def startTest(self, test):
    # Python 2 does not have perf_counter()
    # TODO: remove when we remove Python 2 support
    if hasattr(time, 'perf_counter'):
      self.start_time = time.perf_counter()

  def stopTest(self, test):
    # TODO(sbc): figure out a way to display this duration information again when
    # these results get passed back to the TextTestRunner/TextTestResult.
    if hasattr(time, 'perf_counter'):
      self.buffered_result.duration = time.perf_counter() - self.start_time

  def addSuccess(self, test):
    if hasattr(time, 'perf_counter'):
      print(test, '... ok (%.2fs)' % (time.perf_counter() - self.start_time), file=sys.stderr)
    self.buffered_result = BufferedTestSuccess(test)

  def addSkip(self, test, reason):
    print(test, "... skipped '%s'" % reason, file=sys.stderr)
    self.buffered_result = BufferedTestSkip(test, reason)

  def addFailure(self, test, err):
    print(test, '... FAIL', file=sys.stderr)
    self.buffered_result = BufferedTestFailure(test, err)

  def addError(self, test, err):
    print(test, '... ERROR', file=sys.stderr)
    self.buffered_result = BufferedTestError(test, err)


class BufferedTestBase():
  """Abstract class that holds test result data, split by type of result."""
  def __init__(self, test, err=None):
    self.test = test
    if err:
      exctype, value, tb = err
      self.error = exctype, value, FakeTraceback(tb)

  def updateResult(self, result):
    assert False, 'Base class should not be used directly'


class BufferedTestSuccess(BufferedTestBase):
  def updateResult(self, result):
    result.addSuccess(self.test)


class BufferedTestSkip(BufferedTestBase):
  def __init__(self, test, reason):
    self.test = test
    self.reason = reason

  def updateResult(self, result):
    result.addSkip(self.test, self.reason)


class BufferedTestFailure(BufferedTestBase):
  def updateResult(self, result):
    result.addFailure(self.test, self.error)


class BufferedTestError(BufferedTestBase):
  def updateResult(self, result):
    result.addError(self.test, self.error)


class FakeTraceback():
  """A fake version of a traceback object that is picklable across processes.

  Python's traceback objects contain hidden stack information that isn't able
  to be pickled. Further, traceback objects aren't constructable from Python,
  so we need a dummy object that fulfills its interface.

  The fields we expose are exactly those which are used by
  unittest.TextTestResult to show a text representation of a traceback. Any
  other use is not intended.
  """

  def __init__(self, tb):
    self.tb_frame = FakeFrame(tb.tb_frame)
    self.tb_lineno = tb.tb_lineno
    self.tb_next = FakeTraceback(tb.tb_next) if tb.tb_next is not None else None


class FakeFrame():
  def __init__(self, f):
    self.f_code = FakeCode(f.f_code)
    # f.f_globals is not picklable, not used in stack traces, and needs to be iterable
    self.f_globals = []


class FakeCode():
  def __init__(self, co):
    self.co_filename = co.co_filename
    self.co_name = co.co_name


def num_cores():
  emcc_cores = os.environ.get('PARALLEL_SUITE_EMCC_CORES') or os.environ.get('EMCC_CORES')
  if emcc_cores:
    return int(emcc_cores)
  return multiprocessing.cpu_count()


def get_from_queue(q):
  try:
    return q.get(True, 0.1)
  except queue.Empty:
    pass
  return None