File: WorkerExecTest.py

package info (click to toggle)
clustershell 1.7.3-2~deb9u1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,904 kB
  • sloc: python: 18,634; makefile: 132
file content (151 lines) | stat: -rw-r--r-- 6,310 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python
# ClusterShell.Worker.ExecWorker test suite
# First version by A. Degremont 2014-07-10

import os
import unittest

from TLib import HOSTNAME, make_temp_file, make_temp_filename, make_temp_dir

from ClusterShell.Worker.Exec import ExecWorker, WorkerError
from ClusterShell.Task import task_self

class ExecTest(unittest.TestCase):

    def execw(self, **kwargs):
        """helper method to spawn and run ExecWorker"""
        worker = ExecWorker(**kwargs)
        task_self().schedule(worker)
        task_self().run()
        return worker

    def test_no_nodes(self):
        """test ExecWorker with a simple command without nodes"""
        self.execw(nodes=None, handler=None, command="echo ok")
        self.assertEqual(task_self().max_retcode(), None)

    def test_shell_syntax(self):
        """test ExecWorker with a command using shell syntax"""
        cmd = "echo -n 1; echo -n 2"
        self.execw(nodes='localhost', handler=None, command=cmd)
        self.assertEqual(task_self().max_retcode(), 0)
        self.assertEqual(task_self().node_buffer('localhost'), '12')

    def test_one_node(self):
        """test ExecWorker with a simple command on localhost"""
        self.execw(nodes='localhost', handler=None, command="echo ok")
        self.assertEqual(task_self().max_retcode(), 0)
        self.assertEqual(task_self().node_buffer('localhost'), 'ok')

    def test_one_node_error(self):
        """test ExecWorker with an error command on localhost"""
        self.execw(nodes='localhost', handler=None, command="false")
        self.assertEqual(task_self().max_retcode(), 1)
        self.assertEqual(task_self().node_buffer('localhost'), '')

    def test_timeout(self):
        """test ExecWorker with a timeout"""
        nodes = "localhost,%s" % HOSTNAME
        self.execw(nodes=nodes, handler=None, command="sleep 1", timeout=0.2)
        self.assertEqual(task_self().max_retcode(), None)
        self.assertEqual(task_self().num_timeout(), 2)

    def test_node_placeholder(self):
        """test ExecWorker with several nodes and %h (host)"""
        nodes = "localhost,%s" % HOSTNAME
        self.execw(nodes=nodes, handler=None, command="echo %h")
        self.assertEqual(task_self().max_retcode(), 0)
        self.assertEqual(task_self().node_buffer('localhost'), 'localhost')
        self.assertEqual(task_self().node_buffer(HOSTNAME), HOSTNAME)

    def test_bad_placeholder(self):
        """test ExecWorker with unknown placeholder pattern"""
        self.assertRaises(WorkerError, self.execw,
                          nodes="localhost", handler=None, command="echo %x")
        self.assertRaises(WorkerError, self.execw,
                          nodes="localhost", handler=None, command="echo %")

    def test_rank_placeholder(self):
        """test ExecWorker with several nodes and %n (rank)"""
        nodes = "localhost,%s" % HOSTNAME
        self.execw(nodes=nodes, handler=None, command="echo %n")
        self.assertEqual(task_self().max_retcode(), 0)
        self.assertEqual([str(msg) for msg, _ in task_self().iter_buffers()],
                         ['0', '1'])

    def test_copy(self):
        """test copying with an ExecWorker and host placeholder"""
        src = make_temp_file("data")
        dstdir = make_temp_dir()
        dstpath = os.path.join(dstdir, os.path.basename(src.name))
        try:
            pattern = dstpath + ".%h"
            self.execw(nodes='localhost', handler=None, source=src.name,
                       dest=pattern)
            self.assertEqual(task_self().max_retcode(), 0)
            self.assertTrue(os.path.isfile(dstpath + '.localhost'))
        finally:
            os.unlink(dstpath + '.localhost')
            os.rmdir(dstdir)

    def test_copy_preserve(self):
        """test copying with an ExecWorker (preserve=True)"""
        src = make_temp_file("data")
        past_time = 443757600
        os.utime(src.name, (past_time, past_time))
        dstpath = make_temp_filename()
        try:
            self.execw(nodes='localhost', handler=None, source=src.name,
                       dest=dstpath, preserve=True)
            self.assertEqual(task_self().max_retcode(), 0)
            self.assertTrue(os.stat(dstpath).st_mtime, past_time)
        finally:
            os.unlink(dstpath)

    def test_copy_directory(self):
        """test copying directory with an ExecWorker"""
        srcdir = make_temp_dir()
        dstdir = make_temp_dir()
        ref1 = make_temp_file("data1", dir=srcdir)
        pathdstsrcdir = os.path.join(dstdir, os.path.basename(srcdir))
        pathdst1 = os.path.join(pathdstsrcdir, os.path.basename(ref1.name))
        try:
            self.execw(nodes='localhost', handler=None, source=srcdir,
                       dest=dstdir)
            self.assertEqual(task_self().max_retcode(), 0)
            self.assertTrue(os.path.isdir(pathdstsrcdir))
            self.assertTrue(os.path.isfile(pathdst1))
            self.assertEqual(open(pathdst1).readlines()[0], "data1")
        finally:
            os.unlink(pathdst1)
            os.rmdir(pathdstsrcdir)
            del ref1
            os.rmdir(dstdir)
            os.rmdir(srcdir)

    def test_copy_wrong_directory(self):
        """test copying wrong directory with an ExecWorker"""
        srcdir = make_temp_dir()
        dst = make_temp_file("data")
        ref1 = make_temp_file("data1", dir=srcdir)
        try:
            self.execw(nodes='localhost', handler=None, source=srcdir,
                       dest=dst.name, stderr=True)
            self.assertEqual(task_self().max_retcode(), 1)
            self.assertTrue(len(task_self().node_error("localhost")) > 0)
            self.assertTrue(os.path.isfile(ref1.name))
        finally:
            del ref1
            os.rmdir(srcdir)

    def test_rcopy_wrong_directory(self):
        """test ExecWorker reverse copying with wrong directory"""
        dstbasedir = make_temp_dir()
        dstdir = os.path.join(dstbasedir, "wrong")
        src = make_temp_file("data")
        try:
            self.assertRaises(ValueError, self.execw, nodes='localhost',
                              handler=None, source=src.name, dest=dstdir,
                              stderr=True, reverse=True)
        finally:
            os.rmdir(dstbasedir)