File: MisusageTest.py

package info (click to toggle)
clustershell 1.9.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,228 kB
  • sloc: python: 20,978; makefile: 149
file content (46 lines) | stat: -rw-r--r-- 1,667 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# ClusterShell test suite
# Written by S. Thiell

"""Unit test for ClusterShell common library misusages"""

import unittest

from TLib import HOSTNAME
from ClusterShell.Event import EventHandler
from ClusterShell.Worker.Popen import WorkerPopen
from ClusterShell.Worker.Ssh import WorkerSsh
from ClusterShell.Worker.Worker import WorkerError
from ClusterShell.Task import task_self, AlreadyRunningError


class MisusageTest(unittest.TestCase):

    def testTaskResumedTwice(self):
        """test library misusage (task_self resumed twice)"""
        class ResumeAgainHandler(EventHandler):
            def ev_read(self, worker, node, sname, msg):
                worker.task.resume()
        task = task_self()
        task.shell("/bin/echo OK", handler=ResumeAgainHandler())
        self.assertRaises(AlreadyRunningError, task.resume)

    def testWorkerNotScheduledLocal(self):
        """test library misusage (local worker not scheduled)"""
        task = task_self()
        worker = WorkerPopen(command="/bin/hostname")
        task.resume()
        self.assertRaises(WorkerError, worker.read)

    def testWorkerNotScheduledDistant(self):
        """test library misusage (distant worker not scheduled)"""
        task = task_self()
        worker = WorkerSsh(HOSTNAME, command="/bin/hostname", handler=None, timeout=0)
        task.resume()
        self.assertRaises(WorkerError, worker.node_buffer, HOSTNAME)

    def testTaskScheduleTwice(self):
        """test task worker schedule twice error"""
        task = task_self()
        worker = task.shell("/bin/echo itsme")
        self.assertRaises(WorkerError, task.schedule, worker)
        task.abort()