1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
#!/usr/bin/env python
# ClusterShell test suite
# Written by S. Thiell 2010-01-16
"""Unit test for ClusterShell in multithreaded environments"""
import random
import sys
import time
import thread
import unittest
sys.path.insert(0, '../lib')
from ClusterShell.Task import *
from ClusterShell.Event import EventHandler
class TaskThreadSuspendTest(unittest.TestCase):
def tearDown(self):
task_cleanup()
def testSuspendMiscTwoTasks(self):
"""test task suspend/resume (2 tasks)"""
task = task_self()
task2 = Task()
task2.shell("sleep 4 && echo thr1")
task2.resume()
w = task.shell("sleep 1 && echo thr0", key=0)
task.resume()
self.assertEqual(task.key_buffer(0), "thr0")
self.assertEqual(w.read(), "thr0")
assert task2 != task
task2.suspend()
time.sleep(10)
task2.resume()
task_wait()
task2.shell("echo suspend_test", key=1)
task2.resume()
task_wait()
self.assertEqual(task2.key_buffer(1), "suspend_test")
def _thread_delayed_unsuspend_func(self, task):
"""thread used to unsuspend task during task_wait()"""
time_th = int(random.random()*6+5)
#print "TIME unsuspend thread=%d" % time_th
time.sleep(time_th)
self.resumed = True
task.resume()
def testThreadTaskWaitWithSuspend(self):
"""test task_wait() with suspended tasks"""
task = Task()
self.resumed = False
thread.start_new_thread(TaskThreadSuspendTest._thread_delayed_unsuspend_func, (self, task))
time_sh = int(random.random()*4)
#print "TIME shell=%d" % time_sh
task.shell("sleep %d" % time_sh)
task.resume()
time.sleep(1)
suspended = task.suspend()
for i in range(1, 4):
task = Task()
task.shell("sleep %d" % i)
task.resume()
time.sleep(1)
task_wait()
self.assert_(self.resumed or suspended == False)
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TaskThreadSuspendTest)
unittest.TextTestRunner(verbosity=2).run(suite)
|