File: TaskAdvancedTest.py

package info (click to toggle)
clustershell 1.6-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,236 kB
  • sloc: python: 12,990; yacc: 2,844; makefile: 7
file content (137 lines) | stat: -rw-r--r-- 3,904 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
# ClusterShell (multi-tasks) test suite
# Written by S. Thiell 2009-10-26


"""Unit test for ClusterShell Task (multi)"""

import copy
import sys
import unittest

sys.path.insert(0, '../lib')

from ClusterShell.Task import *


class TaskAdvancedTest(unittest.TestCase):

    def tearDown(self):
        task_cleanup()

    def testTaskRun(self):
        """test task.run() behaving like task.resume()"""
        wrk = task_self().shell("true")
        task_self().run()

    def testTaskRunTimeout(self):
        """test task.run() behaving like task.resume(timeout)"""
        wrk = task_self().shell("sleep 1")
        self.assertRaises(TimeoutError, task_self().run, 0.3)

        wrk = task_self().shell("sleep 1")
        self.assertRaises(TimeoutError, task_self().run, timeout=0.3)

    def testTaskShellRunLocal(self):
        """test task.run() used as a synchronous task.shell() (local)"""
        wrk = task_self().run("false")
        self.assertTrue(wrk)
        self.assertEqual(task_self().max_retcode(), 1)

        # Timeout in shell() fashion way.
        wrk = task_self().run("sleep 1", timeout=0.3)
        self.assertTrue(wrk)
        self.assertEqual(task_self().num_timeout(), 1)

    def testTaskShellRunDistant(self):
        """test task.run() used as a synchronous task.shell() (distant)"""
        wrk = task_self().run("false", nodes="localhost")
        self.assertTrue(wrk)
        self.assertEqual(wrk.node_retcode("localhost"), 1)

    def testTaskEngineUserSelection(self):
        """test task engine user selection hack"""
        task_terminate()
        # Uh ho! It's a test case, not an example!
        Task._std_default['engine'] = 'select'
        self.assertEqual(task_self().info('engine'), 'select')
        task_terminate()

    def testTaskEngineWrongUserSelection(self):
        """test task engine wrong user selection hack"""
        try:
            task_terminate()
            # Uh ho! It's a test case, not an example!
            Task._std_default['engine'] = 'foobar'
            # Check for KeyError in case of wrong engine request
            self.assertRaises(KeyError, task_self)
        finally:
            Task._std_default['engine'] = 'auto'

        task_terminate()

    def testTaskNewThread1(self):
        """test task in new thread 1"""
        # create a task in a new thread
        task = Task()
        self.assert_(task != None)

        match = "test"

        # schedule a command in that task
        worker = task.shell("/bin/echo %s" % match)

        # run this task
        task.resume()

        # wait for the task to complete
        task_wait()

        # verify that the worker has completed
        self.assertEqual(worker.read(), match)

    def testTaskInNewThread2(self):
        """test task in new thread 2"""
        # create a task in a new thread
        task = Task()
        self.assert_(task != None)

        match = "again"

        # schedule a command in that task
        worker = task.shell("/bin/echo %s" % match)

        # run this task
        task.resume()

        # wait for the task to complete
        task_wait()

        # verify that the worker has completed
        self.assertEqual(worker.read(), match)

    def testTaskInNewThread3(self):
        """test task in new thread 3"""
        # create a task in a new thread
        task = Task()
        self.assert_(task != None)

        match = "once again"

        # schedule a command in that task
        worker = task.shell("/bin/echo %s" % match)

        # run this task
        task.resume()

        # wait for the task to complete
        task_wait()

        # verify that the worker has completed
        self.assertEqual(worker.read(), match)


if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TaskAdvancedTest)
    unittest.TextTestRunner(verbosity=2).run(suite)