File: pool_test.py

package info (click to toggle)
chromium 138.0.7204.183-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,071,908 kB
  • sloc: cpp: 34,937,088; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,971; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (82 lines) | stat: -rwxr-xr-x 2,339 bytes parent folder | download | duplicates (14)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python3
# Copyright 2014 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import sys
import unittest

from queue import Empty, Full, Queue

# Needed because the test runner contains relative imports.
TOOLS_PATH = os.path.dirname(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(TOOLS_PATH)

from testrunner.local.pool import DefaultExecutionPool, drain_queue_async


def Run(x):
  if x == 10:
    raise Exception("Expected exception triggered by test.")
  return x


class PoolTest(unittest.TestCase):

  def testNormal(self):
    results = set()
    pool = DefaultExecutionPool()
    pool.init(3)
    for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
      if result.heartbeat:
        # Any result can be a heartbeat due to timings.
        continue
      results.add(result.value)
    self.assertEqual(set(range(0, 10)), results)

  def testException(self):
    results = set()
    pool = DefaultExecutionPool()
    pool.init(3)
    with self.assertRaises(Exception):
      for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
        if result.heartbeat:
          # Any result can be a heartbeat due to timings.
          continue
        # Item 10 will not appear in results due to an internal exception.
        results.add(result.value)
    expect = set(range(0, 12))
    expect.remove(10)
    self.assertEqual(expect, results)

  def testAdd(self):
    results = set()
    pool = DefaultExecutionPool()
    pool.init(3)
    for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
      if result.heartbeat:
        # Any result can be a heartbeat due to timings.
        continue
      results.add(result.value)
      if result.value < 30:
        pool.add([result.value + 20])
    self.assertEqual(
        set(range(0, 10)) | set(range(20, 30)) | set(range(40, 50)), results)


class QueueTest(unittest.TestCase):
  def testDrainQueueAsync(self):
    queue = Queue(1)
    queue.put('foo')
    with self.assertRaises(Full):
      queue.put('bar', timeout=0.01)
    with drain_queue_async(queue):
      queue.put('bar')
    with self.assertRaises(Empty):
      queue.get(False)


if __name__ == '__main__':
  unittest.main()