1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
# ClusterShell (local) test suite
# Written by S. Thiell
"""Unit test for ClusterShell Task/Worker timeout support"""
import unittest
from ClusterShell.Task import task_self
class TaskTimeoutTest(unittest.TestCase):
def testWorkersTimeoutBuffers(self):
"""test worker buffers with timeout"""
task = task_self()
worker = task.shell('echo some buffer; echo here...; sleep 10', timeout=4)
task.resume()
self.assertEqual(worker.read(), b"""some buffer
here...""")
test = 1
for buf, keys in task.iter_buffers():
test = 0
self.assertEqual(buf, b"""some buffer
here...""")
self.assertEqual(test, 0, "task.iter_buffers() did not work")
|