1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
from core.module import Module, Status
from core.vectors import PhpCode, PythonCode, Os
class Ssh(Module):
"""Execute shell commands through SSH without PTY
Vector | Native | Comment
-----------------|--------|-----------------------------------------
php | No | SSH2 extension
py.paramiko | No |
py.pexpect | No | Rarely installed
py.subprocess | Yes | Stores clear passwd on disk temporarily
/!\\ When using py.subprocess vector the password is stored in a file at <ASKPASS> /!\\
As a result the file gets truncated, chmod and then removed. Becareful !
This file has to be executable, you have to insure it spawns on a partition
that allows execution (ie. not mounted with `noexec`).
Examples:
:shell_ssh root@127.0.0.1 Sup3rPassw0rd id
:shell_ssh -vector py.subprocess -askpass /executable/path -port 1337 root@secret.server Sup3rPassw0rd id
"""
def init(self):
self.register_info(
{
'author': [
'ZanyMonk'
],
'license': 'GPLv3'
}
)
self.register_vectors(
[
PhpCode("""
if(function_exists("ssh2_connect")){
if($c = ssh2_connect("${host}","${port}")){
if(ssh2_auth_password($c,"${user}","${password}")){
$s = ssh2_exec($c,"${command}${stderr}");
stream_set_blocking($s, true);
echo stream_get_contents($s);
fclose($s);
}else{echo "Authentication failed";}
}else{echo "Could not connect.";}
}else{echo "ModuleNotFound";}
""",
name="php",
),
PythonCode("""
import paramiko
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy())
c.connect("${host}", username="${user}", password="${password}", port=int("${port}"))
i, o, e = c.exec_command("${command}${stderr}")
i.close()
for l in iter(o.readline, ""):
print(l,end="")
""", "py.paramiko"
),
PythonCode("""
from pexpect import pxssh
s = pxssh.pxssh()
s.options = dict(StrictHostKeyChecking="no", UserKnownHostsFile="/dev/null")
s.login("${host}", "${user}", "${password}", port=int("${port}"))
c = "${command}"
s.sendline(c)
s.prompt()
print(s.before[len(c) + 2:].decode("utf-8","replace"), end="")
""",
name="py.pexpect",
target=Os.NIX,
),
PythonCode("""
import os,subprocess as s
with open("${askpass}", "w") as f:
f.write("echo '${password}'")
f.close()
os.chmod("${askpass}", 0o755)
p = s.Popen([ "ssh","-T","-tt","-oUserKnownHostsFile=/dev/null","-oStrictHostKeyChecking=no",
"-p${port}","${user}@${host}","${command}${stderr}"
],
shell=False,
stdout=s.PIPE, stderr=s.PIPE, stdin=s.PIPE,
env={
"DISPLAY": ":99",
"SSH_ASKPASS": "${askpass}"
})
o, e = p.communicate()
os.unlink("${askpass}")
print((o if o else e).decode("utf-8","replace"), end="")
""",
name="py.subprocess",
),
])
self.register_arguments([
{'name': 'address', 'help': 'user@host[:port]'},
{'name': 'password', 'help': 'User\'s password'},
{'name': 'command', 'help': 'Shell command', 'nargs': '+'},
{'name': '-askpass', 'default': '/tmp/.p',
'help': 'SSH_ASKPASS location (/!\\ will be overwritten and removed /!\\)'},
{'name': '-port', 'default': 0, 'type': int, 'help': 'SSH server port'},
{'name': '-stderr', 'default': ' 2>&1'},
{'name': '-vector', 'choices': self.vectors.get_names()},
])
def setup(self):
"""Probe all vectors to find a working system-like function.
The method run_until is not used due to the check of shell_sh
enabling for every tested vector.
Args:
self.args: The dictionary of arguments
Returns:
Status value, must be Status.RUN, Status.FAIL, or Status.IDLE.
"""
args_check = {
'host': '127.0.0.1',
'user': 'root',
'password': 'root',
'port': 54321,
'command': 'whoami',
'stderr': '',
'askpass': '/tmp/.p',
}
def is_valid(result):
return (self.session['shell_php']['status'] != Status.RUN or # Stop if shell_php is not running
result and 'ModuleNotFound' not in result) # Or if the result is correct
(vector_name,
result) = self.vectors.find_first_result(
names=[self.args.get('vector', '')],
format_args=args_check,
condition=is_valid
)
if self.session['shell_php']['status'] == Status.RUN and result:
self.session['shell_ssh']['stored_args']['vector'] = vector_name
if 'vector' not in self.args or not self.args['vector']:
self.args['vector'] = vector_name
return Status.RUN
else:
return Status.FAIL
def run(self, **kwargs):
self.args['user'], self.args['host'], self.args['port'] = self._parse_address(self.args['address'])
self.args['command'] = ' '.join(self.args['command']).replace('"', '\\"')
return self.vectors.get_result(
name=self.args['vector'],
format_args=self.args
)
def _parse_address(self, address):
user = self.session['system_info']['results'].get('whoami', '')
host = address
if '@' in address:
user, host = address.split('@', 1)
trailing_port = 22
port = self.args.get('port')
if ':' in host:
host, trailing_port = host.split(':', 1)
if not port: #
port = trailing_port
return user, host, port
|