File: MisusageTest.py

package info (click to toggle)
clustershell 1.7.3-2~deb9u1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,904 kB
  • sloc: python: 18,634; makefile: 132
file content (53 lines) | stat: -rw-r--r-- 1,791 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python
# ClusterShell test suite
# Written by S. Thiell


"""Unit test for ClusterShell common library misusages"""

import sys
import unittest

sys.path.insert(0, '../lib')

from TLib import HOSTNAME
from ClusterShell.Event import EventHandler
from ClusterShell.Worker.Popen import WorkerPopen
from ClusterShell.Worker.Ssh import WorkerSsh
from ClusterShell.Worker.Worker import WorkerError
from ClusterShell.Task import Task, task_self, AlreadyRunningError


class MisusageTest(unittest.TestCase):

    def testTaskResumedTwice(self):
        """test library misusage (task_self resumed twice)"""
        class ResumeAgainHandler(EventHandler):
            def ev_read(self, worker):
                worker.task.resume()
        task = task_self()
        task.shell("/bin/echo OK", handler=ResumeAgainHandler())
        self.assertRaises(AlreadyRunningError, task.resume)

    def testWorkerNotScheduledLocal(self):
        """test library misusage (local worker not scheduled)"""
        task = task_self()
        worker = WorkerPopen(command="/bin/hostname")
        task.resume()
        self.assertRaises(WorkerError, worker.read)

    def testWorkerNotScheduledDistant(self):
        """test library misusage (distant worker not scheduled)"""
        task = task_self()
        worker = WorkerSsh(HOSTNAME, command="/bin/hostname", handler=None, timeout=0)
        self.assert_(worker != None)
        task.resume()
        self.assertRaises(WorkerError, worker.node_buffer, HOSTNAME)

    def testTaskScheduleTwice(self):
        """test task worker schedule twice error"""
        task = task_self()
        self.assert_(task != None)
        worker = task.shell("/bin/echo itsme")
        self.assertRaises(WorkerError, task.schedule, worker)
        task.abort()