1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
# ClusterShell.Worker.ExecWorker test suite
# First version by A. Degremont 2014-07-10
"""Unit test for ExecWorker"""
import os
import unittest
from TLib import HOSTNAME, make_temp_file, make_temp_filename, make_temp_dir
from ClusterShell.Event import EventHandler
from ClusterShell.Worker.Exec import ExecWorker, WorkerError
from ClusterShell.Task import task_self
class ExecTest(unittest.TestCase):
def execw(self, **kwargs):
"""helper method to spawn and run ExecWorker"""
worker = ExecWorker(**kwargs)
task_self().schedule(worker)
task_self().run()
return worker
def test_no_nodes(self):
"""test ExecWorker with a simple command without nodes"""
self.execw(nodes=None, handler=None, command="echo ok")
self.assertEqual(task_self().max_retcode(), None)
def test_shell_syntax(self):
"""test ExecWorker with a command using shell syntax"""
cmd = "echo -n 1; echo -n 2"
self.execw(nodes='localhost', handler=None, command=cmd)
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'12')
def test_one_node(self):
"""test ExecWorker with a simple command on localhost"""
self.execw(nodes='localhost', handler=None, command="echo ok")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'ok')
def test_one_node_error(self):
"""test ExecWorker with an error command on localhost"""
self.execw(nodes='localhost', handler=None, command="false")
self.assertEqual(task_self().max_retcode(), 1)
self.assertEqual(task_self().node_buffer('localhost'), b'')
@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
def test_timeout(self):
"""test ExecWorker with a timeout"""
nodes = "localhost,%s" % HOSTNAME
self.execw(nodes=nodes, handler=None, command="sleep 1", timeout=0.2)
self.assertEqual(task_self().max_retcode(), None)
self.assertEqual(task_self().num_timeout(), 2)
def test_node_placeholder(self):
"""test ExecWorker with several nodes and %h (host)"""
nodes = "localhost,%s" % HOSTNAME
self.execw(nodes=nodes, handler=None, command="echo %h")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'localhost')
self.assertEqual(task_self().node_buffer(HOSTNAME), HOSTNAME.encode('utf-8'))
def test_bad_placeholder(self):
"""test ExecWorker with unknown placeholder pattern"""
self.assertRaises(WorkerError, self.execw,
nodes="localhost", handler=None, command="echo %x")
self.assertRaises(WorkerError, self.execw,
nodes="localhost", handler=None, command="echo %")
@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
def test_rank_placeholder(self):
"""test ExecWorker with several nodes and %n (rank)"""
nodes = "localhost,%s" % HOSTNAME
self.execw(nodes=nodes, handler=None, command="echo %n")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(set(bytes(msg) for msg, _ in task_self().iter_buffers()),
set([b'0', b'1']))
def test_copy(self):
"""test copying with an ExecWorker and host placeholder"""
src = make_temp_file(b"data")
dstdir = make_temp_dir()
dstpath = os.path.join(dstdir.name, os.path.basename(src.name))
try:
pattern = dstpath + ".%h"
self.execw(nodes='localhost', handler=None, source=src.name,
dest=pattern)
self.assertEqual(task_self().max_retcode(), 0)
self.assertTrue(os.path.isfile(dstpath + '.localhost'))
finally:
os.unlink(dstpath + '.localhost')
dstdir.cleanup()
def test_copy_preserve(self):
"""test copying with an ExecWorker (preserve=True)"""
src = make_temp_file(b"data")
past_time = 443757600
os.utime(src.name, (past_time, past_time))
dstpath = make_temp_filename()
try:
self.execw(nodes='localhost', handler=None, source=src.name,
dest=dstpath, preserve=True)
self.assertEqual(task_self().max_retcode(), 0)
self.assertTrue(os.stat(dstpath).st_mtime, past_time)
finally:
os.unlink(dstpath)
def test_copy_directory(self):
"""test copying directory with an ExecWorker"""
srcdir = make_temp_dir()
dstdir = make_temp_dir()
ref1 = make_temp_file(b"data1", dir=srcdir.name)
pathdstsrcdir = os.path.join(dstdir.name, os.path.basename(srcdir.name))
pathdst1 = os.path.join(pathdstsrcdir, os.path.basename(ref1.name))
try:
self.execw(nodes='localhost', handler=None, source=srcdir.name,
dest=dstdir.name)
self.assertEqual(task_self().max_retcode(), 0)
self.assertTrue(os.path.isdir(pathdstsrcdir))
self.assertTrue(os.path.isfile(pathdst1))
with open(pathdst1) as dst1:
self.assertEqual(dst1.readlines()[0], "data1")
finally:
os.unlink(pathdst1)
os.rmdir(pathdstsrcdir)
ref1.close()
dstdir.cleanup()
srcdir.cleanup()
def test_copy_wrong_directory(self):
"""test copying wrong directory with an ExecWorker"""
srcdir = make_temp_dir()
dst = make_temp_file(b"data")
ref1 = make_temp_file(b"data1", dir=srcdir.name)
try:
self.execw(nodes='localhost', handler=None, source=srcdir.name,
dest=dst.name, stderr=True)
self.assertEqual(task_self().max_retcode(), 1)
self.assertTrue(len(task_self().node_error("localhost")) > 0)
self.assertTrue(os.path.isfile(ref1.name))
finally:
ref1.close()
srcdir.cleanup()
def test_rcopy_wrong_directory(self):
"""test ExecWorker reverse copying with wrong directory"""
with make_temp_dir() as dstbasedirname:
dstdir = os.path.join(dstbasedirname, "wrong")
src = make_temp_file(b"data")
self.assertRaises(ValueError, self.execw, nodes='localhost',
handler=None, source=src.name, dest=dstdir,
stderr=True, reverse=True)
def test_abort_on_read(self):
"""test ExecWorker.abort() on read"""
class TestH(EventHandler):
def ev_read(self, worker, node, sname, msg):
worker.abort()
worker.abort() # safe but no effect
self.execw(nodes='localhost', handler=TestH(),
command="echo ok; tail -f /dev/null")
self.assertEqual(task_self().max_retcode(), None)
self.assertEqual(task_self().node_buffer('localhost'), b'ok')
def test_abort_on_close(self):
"""test ExecWorker.abort() on close"""
class TestH(EventHandler):
def ev_close(self, worker, timedout):
worker.abort()
worker.abort() # safe but no effect
self.execw(nodes='localhost', handler=TestH(),
command="echo ok; sleep .1")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'ok')
|