File: test_dummy_threading.py

package info (click to toggle)
python3.4 3.4.2-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 80,356 kB
  • ctags: 100,540
  • sloc: python: 459,698; ansic: 381,519; sh: 17,599; asm: 14,322; makefile: 2,209; objc: 761; lisp: 502; exp: 499; cpp: 353; pascal: 80; xml: 73; csh: 21
file content (64 lines) | stat: -rw-r--r-- 1,807 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from test import support
import unittest
import dummy_threading as _threading
import time

class DummyThreadingTestCase(unittest.TestCase):

    class TestThread(_threading.Thread):

        def run(self):
            global running
            global sema
            global mutex
            # Uncomment if testing another module, such as the real 'threading'
            # module.
            #delay = random.random() * 2
            delay = 0
            if support.verbose:
                print('task', self.name, 'will run for', delay, 'sec')
            sema.acquire()
            mutex.acquire()
            running += 1
            if support.verbose:
                print(running, 'tasks are running')
            mutex.release()
            time.sleep(delay)
            if support.verbose:
                print('task', self.name, 'done')
            mutex.acquire()
            running -= 1
            if support.verbose:
                print(self.name, 'is finished.', running, 'tasks are running')
            mutex.release()
            sema.release()

    def setUp(self):
        self.numtasks = 10
        global sema
        sema = _threading.BoundedSemaphore(value=3)
        global mutex
        mutex = _threading.RLock()
        global running
        running = 0
        self.threads = []

    def test_tasks(self):
        for i in range(self.numtasks):
            t = self.TestThread(name="<thread %d>"%i)
            self.threads.append(t)
            t.start()

        if support.verbose:
            print('waiting for all tasks to complete')
        for t in self.threads:
            t.join()
        if support.verbose:
            print('all tasks done')

def test_main():
    support.run_unittest(DummyThreadingTestCase)


if __name__ == '__main__':
    test_main()