1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
|
# These modules provides a set of functions to execute actions on a host.
# If the host is local, it's done direcly; if it's remote we log in via SSH.
import os
import shutil
import subprocess
import logging
from ZeekControl import py3zeek
from ZeekControl import ssh_runner
from ZeekControl import util
# Copy src to dstdir, preserving permission bits and file type. The src
# file type can be symlink, regular file, or directory (directories are copied
# recursively). If the target pathname already exists, it is not clobbered.
def install(src, dstdir, cmdout):
if not os.path.lexists(src):
cmdout.error("pathname not found: %s" % src)
return False
dst = os.path.join(dstdir, os.path.basename(src))
if os.path.lexists(dst):
# Do not clobber existing files/dirs (this is not an error)
return True
logging.debug("cp %s %s", src, dstdir)
try:
if os.path.islink(src):
target = os.readlink(src)
os.symlink(target, dst)
elif os.path.isfile(src):
shutil.copy2(src, dstdir)
elif os.path.isdir(src):
shutil.copytree(src, dst, symlinks=True)
else:
cmdout.error("failed to copy %s: not a file, dir, or symlink" % src)
return False
except OSError:
# Python 2.6 has a bug where this may fail on NFS. So we just
# ignore errors.
pass
except IOError as err:
cmdout.error("failed to copy: %s" % err)
return False
return True
# rsyncs paths from localhost to destination hosts.
def sync(nodes, paths, cmdout):
result = True
cmds = []
for n in nodes:
args = ['-rRl', '--delete', '--rsh="ssh -o BatchMode=yes -o LogLevel=error -o ConnectTimeout=30"']
dst = ["%s:/" % util.format_rsync_addr(n.addr)]
args += paths + dst
cmdline = "rsync %s" % " ".join(args)
cmds += [(n, cmdline, "", None)]
for (id, success, output) in run_localcmds(cmds):
if not success:
cmdout.error("rsync to %s failed: %s" % (id.addr, output))
result = False
return result
# Runs command locally and returns tuple (success, output)
# with success being true if the command terminated with exit code 0,
# and output is a string containing the combined stdout/stderr output of the
# command.
# The "env" is a space-separated string of environment variables to set,
# and "inputtext" is a string to send to stdin.
def run_localcmd(cmd, env=None, inputtext=None):
proc = _run_localcmd_init("single", cmd, env)
return _run_localcmd_wait(proc, inputtext)
# Same as run_localcmd() but runs a set of local commands in parallel.
# Cmds is a list of (id, cmd, envs, inputtext) tuples, where id is
# an arbitrary cookie identifying each command.
# Returns a list of (id, success, output) tuples.
def run_localcmds(cmds):
results = []
running = []
for (id, cmd, envs, inputtext) in cmds:
proc = _run_localcmd_init(id, cmd, envs)
running += [(id, proc, inputtext)]
for (id, proc, inputtext) in running:
success, output = _run_localcmd_wait(proc, inputtext)
results += [(id, success, output)]
return results
def _run_localcmd_init(id, cmd, env):
if env:
cmdline = env + " " + cmd
else:
cmdline = cmd
logging.debug(cmdline)
# os.setsid makes sure that the child process doesn't receive our CTRL-Cs.
proc = subprocess.Popen([cmdline], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, shell=True, preexec_fn=os.setsid)
return proc
def _run_localcmd_wait(proc, inputtext):
if py3zeek.using_py3 and inputtext:
inputtext = inputtext.encode()
# Note: "output" is combined stdout/stderr output.
output, _ = proc.communicate(inputtext)
rc = proc.returncode
if py3zeek.using_py3:
output = output.decode()
logging.debug("exit status: %d", rc)
return (rc == 0, output)
# FIXME: This is an ugly hack. The __del__ method produces
# strange unhandled exceptions in the child at termination
# of the main process. Not sure if disabling the cleanup
# altogether is a good thing but right now that's the
# only fix I can come up with.
def _emptyDel(self):
pass
subprocess.Popen.__del__ = _emptyDel
class Executor:
def __init__(self, config):
self.config = config
self.sshrunner = ssh_runner.MultiMasterManager(config.localaddrs)
def finish(self):
self.sshrunner.shutdown_all()
# Run commands in parallel on one or more hosts.
#
# cmds: a list of the form: [ (node, cmd, args), ... ]
# where "cmd" is a string, "args" is a list of strings.
# shell: if True, then the "cmd" (and "args") will be interpreted by a
# shell.
# helper: if True, then the "cmd" will be modified to specify the full
# path to the zeekctl helper script.
#
# Returns a list of results: [(node, success, output), ...]
# where "success" is a boolean (True if command's exit status was zero),
# and "output" is a string containing the command's stdout followed by
# stderr, or an error message if no result was received (this could occur
# upon failure to communicate with remote host, or if the command being
# executed did not finish before the timeout).
def run_cmds(self, cmds, shell=False, helper=False):
results = []
if not cmds:
return results
dd = {}
hostlist = []
for nodecmd in cmds:
host = nodecmd[0].addr
if host not in dd:
dd[host] = []
hostlist.append(host)
dd[host].append(nodecmd)
nodecmdlist = []
for host in hostlist:
for zeeknode, cmd, args in dd[host]:
if helper:
cmdargs = [os.path.join(self.config.helperdir, cmd)]
else:
cmdargs = [cmd]
if shell:
if args:
cmdargs = ["%s %s" % (cmdargs[0], " ".join(args))]
else:
cmdargs += args
nodecmdlist.append((zeeknode.addr, cmdargs))
logging.debug("%s: %s", zeeknode.host, " ".join(cmdargs))
for host, result in self.sshrunner.exec_multihost_commands(nodecmdlist, shell, self.config.commandtimeout):
nodecmd = dd[host].pop(0)
zeeknode = nodecmd[0]
if not isinstance(result, Exception):
res = result[0]
out = result[1]
err = result[2]
results.append((zeeknode, res == 0, out + err))
logging.debug("%s: exit code %d", zeeknode.host, res)
else:
results.append((zeeknode, False, str(result)))
return results
# Run shell commands in parallel on one or more hosts.
# cmdlines: a list of the form [ (node, cmdline), ... ]
# where "cmdline" is a string to be interpreted by the shell
#
# Return value is same as run_cmds.
def run_shell_cmds(self, cmdlines):
cmds = [(node, cmdline, []) for node, cmdline in cmdlines]
return self.run_cmds(cmds, shell=True)
# A convenience function that calls run_cmds.
def run_helper(self, cmds, shell=False):
return self.run_cmds(cmds, shell, True)
# A convenience function that calls run_cmds.
# dirs: a list of the form [ (node, dir), ... ]
#
# Returns a list of the form: [ (node, success, output), ... ]
# where "success" is a boolean (true if specified directory was created
# or already exists).
def mkdirs(self, dirs):
results = []
cmds = []
for (node, dir) in dirs:
cmds += [(node, "mkdir", ["-p", dir])]
for (node, success, output) in self.run_cmds(cmds):
results += [(node, success, output)]
return results
# A convenience function that calls run_cmds to remove directories
# on one or more hosts.
# dirs: a list of the form [ (node, dir), ... ]
#
# Returns a list of the form: [ (node, success, output), ... ]
# where "success" is a boolean (true if specified directory was removed
# or does not exist).
def rmdirs(self, dirs):
results = []
cmds = []
for (node, dir) in dirs:
cmds += [(node, "if [ -d %s ]; then rm -rf %s ; fi" % (dir, dir), [])]
for (node, success, output) in self.run_cmds(cmds, shell=True):
results += [(node, success, output)]
return results
def host_status(self):
return self.sshrunner.host_status()
|