File: TaskTimeoutTest.py

package info (click to toggle)
clustershell 1.9.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,228 kB
  • sloc: python: 20,978; makefile: 149
file content (27 lines) | stat: -rw-r--r-- 728 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ClusterShell (local) test suite
# Written by S. Thiell

"""Unit test for ClusterShell Task/Worker timeout support"""

import unittest

from ClusterShell.Task import task_self


class TaskTimeoutTest(unittest.TestCase):

    def testWorkersTimeoutBuffers(self):
        """test worker buffers with timeout"""
        task = task_self()

        worker = task.shell('echo some buffer; echo here...; sleep 10', timeout=4)

        task.resume()
        self.assertEqual(worker.read(), b"""some buffer
here...""")
        test = 1
        for buf, keys in task.iter_buffers():
            test = 0
            self.assertEqual(buf, b"""some buffer
here...""")
        self.assertEqual(test, 0, "task.iter_buffers() did not work")