File: ssh.py

package info (click to toggle)
weevely 4.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,336 kB
  • sloc: python: 7,732; php: 1,035; sh: 53; makefile: 2
file content (177 lines) | stat: -rw-r--r-- 6,995 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from core.module import Module, Status
from core.vectors import PhpCode, PythonCode, Os


class Ssh(Module):
    """Execute shell commands through SSH without PTY

  Vector          | Native | Comment
 -----------------|--------|-----------------------------------------
  php             |  No    | SSH2 extension
  py.paramiko     |  No    |
  py.pexpect      |  No    | Rarely installed
  py.subprocess   |  Yes   | Stores clear passwd on disk temporarily

/!\\ When using py.subprocess vector the password is stored in a file at <ASKPASS> /!\\
    As a result the file gets truncated, chmod and then removed. Becareful !
    This file has to be executable, you have to insure it spawns on a partition
    that allows execution (ie. not mounted with `noexec`).

  Examples:
     :shell_ssh root@127.0.0.1 Sup3rPassw0rd id
     :shell_ssh -vector py.subprocess -askpass /executable/path -port 1337 root@secret.server Sup3rPassw0rd id
"""

    def init(self):
        self.register_info(
            {
                'author': [
                    'ZanyMonk'
                ],
                'license': 'GPLv3'
            }
        )

        self.register_vectors(
            [
                PhpCode("""
                    if(function_exists("ssh2_connect")){
                        if($c = ssh2_connect("${host}","${port}")){
                            if(ssh2_auth_password($c,"${user}","${password}")){
                                $s = ssh2_exec($c,"${command}${stderr}");
                                stream_set_blocking($s, true);
                                echo stream_get_contents($s);
                                fclose($s);
                            }else{echo "Authentication failed";}
                        }else{echo "Could not connect.";}
                    }else{echo "ModuleNotFound";}
                    """,
                        name="php",
                        ),
                PythonCode("""
                    import paramiko
                    c = paramiko.SSHClient()
                    c.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy())
                    c.connect("${host}", username="${user}", password="${password}", port=int("${port}"))
                    i, o, e = c.exec_command("${command}${stderr}")
                    i.close()
                    for l in iter(o.readline, ""):
                        print(l,end="")
                    """, "py.paramiko"
                           ),
                PythonCode("""
                    from pexpect import pxssh
                    s = pxssh.pxssh()
                    s.options = dict(StrictHostKeyChecking="no", UserKnownHostsFile="/dev/null")
                    s.login("${host}", "${user}", "${password}", port=int("${port}"))
                    c = "${command}"
                    s.sendline(c)
                    s.prompt()
                    print(s.before[len(c) + 2:].decode("utf-8","replace"), end="")
                    """,
                           name="py.pexpect",
                           target=Os.NIX,
                           ),
                PythonCode("""
                    import os,subprocess as s
                    with open("${askpass}", "w") as f:
                        f.write("echo '${password}'")
                        f.close()
                        os.chmod("${askpass}", 0o755)
                    p = s.Popen([ "ssh","-T","-tt","-oUserKnownHostsFile=/dev/null","-oStrictHostKeyChecking=no",
                                  "-p${port}","${user}@${host}","${command}${stderr}"
                                ],
                                shell=False,
                                stdout=s.PIPE, stderr=s.PIPE, stdin=s.PIPE,
                                env={
                                    "DISPLAY": ":99",
                                    "SSH_ASKPASS": "${askpass}"
                                })
                    o, e = p.communicate()
                    os.unlink("${askpass}")
                    print((o if o else e).decode("utf-8","replace"), end="")
                    """,
                           name="py.subprocess",
                           ),
            ])

        self.register_arguments([
            {'name': 'address', 'help': 'user@host[:port]'},
            {'name': 'password', 'help': 'User\'s password'},
            {'name': 'command', 'help': 'Shell command', 'nargs': '+'},
            {'name': '-askpass', 'default': '/tmp/.p',
             'help': 'SSH_ASKPASS location (/!\\ will be overwritten and removed /!\\)'},
            {'name': '-port', 'default': 0, 'type': int, 'help': 'SSH server port'},
            {'name': '-stderr', 'default': ' 2>&1'},
            {'name': '-vector', 'choices': self.vectors.get_names()},
        ])

    def setup(self):
        """Probe all vectors to find a working system-like function.

        The method run_until is not used due to the check of shell_sh
        enabling for every tested vector.

        Args:
            self.args: The dictionary of arguments

        Returns:
            Status value, must be Status.RUN, Status.FAIL, or Status.IDLE.

        """

        args_check = {
            'host': '127.0.0.1',
            'user': 'root',
            'password': 'root',
            'port': 54321,
            'command': 'whoami',
            'stderr': '',
            'askpass': '/tmp/.p',
        }

        def is_valid(result):
            return (self.session['shell_php']['status'] != Status.RUN or  # Stop if shell_php is not running
                    result and 'ModuleNotFound' not in result)  # Or if the result is correct

        (vector_name,
         result) = self.vectors.find_first_result(
            names=[self.args.get('vector', '')],
            format_args=args_check,
            condition=is_valid
        )

        if self.session['shell_php']['status'] == Status.RUN and result:
            self.session['shell_ssh']['stored_args']['vector'] = vector_name
            if 'vector' not in self.args or not self.args['vector']:
                self.args['vector'] = vector_name
            return Status.RUN
        else:
            return Status.FAIL

    def run(self, **kwargs):
        self.args['user'], self.args['host'], self.args['port'] = self._parse_address(self.args['address'])

        self.args['command'] = ' '.join(self.args['command']).replace('"', '\\"')

        return self.vectors.get_result(
            name=self.args['vector'],
            format_args=self.args
        )

    def _parse_address(self, address):
        user = self.session['system_info']['results'].get('whoami', '')
        host = address

        if '@' in address:
            user, host = address.split('@', 1)

        trailing_port = 22
        port = self.args.get('port')
        if ':' in host:
            host, trailing_port = host.split(':', 1)

        if not port:  #
            port = trailing_port

        return user, host, port