File: TaskThreadJoinTest.py

package info (click to toggle)
clustershell 1.6-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,236 kB
  • sloc: python: 12,990; yacc: 2,844; makefile: 7
file content (129 lines) | stat: -rw-r--r-- 3,835 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
# ClusterShell test suite
# Written by S. Thiell 2010-01-16


"""Unit test for ClusterShell task's join feature in multithreaded
environments"""

import sys
import time
import unittest

sys.path.insert(0, '../lib')

from ClusterShell.Task import *
from ClusterShell.Event import EventHandler


class TaskThreadJoinTest(unittest.TestCase):

    def tearDown(self):
        task_cleanup()

    def testThreadTaskWaitWhenRunning(self):
        """test task_wait() when workers are running"""

        for i in range(1, 5):
            task = Task()
            task.shell("sleep %d" % i)
            task.resume()

        task_wait()


    def testThreadTaskWaitWhenSomeFinished(self):
        """test task_wait() when some workers finished"""

        for i in range(1, 5):
            task = Task()
            task.shell("sleep %d" % i)
            task.resume()

        time.sleep(2)
        task_wait()


    def testThreadTaskWaitWhenAllFinished(self):
        """test task_wait() when all workers finished"""

        for i in range(1, 3):
            task = Task()
            task.shell("sleep %d" % i)
            task.resume()

        time.sleep(4)
        task_wait()

    def testThreadSimpleTaskSupervisor(self):
        """test task methods from another thread"""
        #print "PASS 1"
        task = Task()
        task.shell("sleep 3")
        task.shell("echo testing", key=1)
        task.resume()
        task.join()
        self.assertEqual(task.key_buffer(1), "testing")
        #print "PASS 2"
        task.shell("echo ok", key=2)
        task.resume()
        task.join()
        #print "PASS 3"
        self.assertEqual(task.key_buffer(2), "ok")
        task.shell("sleep 1 && echo done", key=3)
        task.resume()
        task.join()
        #print "PASS 4"
        self.assertEqual(task.key_buffer(3), "done")
        task.abort()

    def testThreadTaskBuffers(self):
        """test task data access methods after join()"""
        task = Task()
        # test data access from main thread

        # test stderr separated
        task.set_default("stderr", True)
        task.shell("echo foobar", key="OUT")
        task.shell("echo raboof 1>&2", key="ERR")
        task.resume()
        task.join()
        self.assertEqual(task.key_buffer("OUT"), "foobar")
        self.assertEqual(task.key_error("OUT"), "")
        self.assertEqual(task.key_buffer("ERR"), "")
        self.assertEqual(task.key_error("ERR"), "raboof")

        # test stderr merged
        task.set_default("stderr", False)
        task.shell("echo foobar", key="OUT")
        task.shell("echo raboof 1>&2", key="ERR")
        task.resume()
        task.join()
        self.assertEqual(task.key_buffer("OUT"), "foobar")
        self.assertEqual(task.key_error("OUT"), "")
        self.assertEqual(task.key_buffer("ERR"), "raboof")
        self.assertEqual(task.key_error("ERR"), "")

    def testThreadTaskUnhandledException(self):
        """test task unhandled exception in thread"""
        class TestUnhandledException(Exception):
            """test exception"""
        class RaiseOnRead(EventHandler):
            def ev_read(self, worker):
                raise TestUnhandledException("you should see this exception")

        task = Task()
        # test data access from main thread
        task.shell("echo raisefoobar", key=1, handler=RaiseOnRead())
        task.resume()
        task.join()
        self.assertEqual(task.key_buffer(1), "raisefoobar")
        time.sleep(1) # for pretty display, because unhandled exception
                      # traceback may be sent to stderr after the join()
        self.assert_(not task.running())


if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TaskThreadJoinTest)
    unittest.TextTestRunner(verbosity=2).run(suite)