File: ssh_test.py

package info (click to toggle)
python-mitogen 0.3.25~a2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,220 kB
  • sloc: python: 21,989; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (249 lines) | stat: -rw-r--r-- 8,571 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import os
import tempfile

import mitogen.ssh
import mitogen.utils

import testlib
import plain_old_module


class StubSshMixin(testlib.RouterMixin):
    """
    Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'.
    """
    def stub_ssh(self, STUBSSH_MODE=None, **kwargs):
        os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE)
        try:
            return self.router.ssh(
                hostname='hostname',
                username='mitogen__has_sudo',
                python_path='python3',
                ssh_path=testlib.data_path('stubs/stub-ssh.py'),
                **kwargs
            )
        finally:
            del os.environ['STUBSSH_MODE']


class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
    def test_okay(self):
        context = self.router.ssh(
            hostname='hostname',
            username='mitogen__has_sudo',
            python_path='python3',
            ssh_path=testlib.data_path('stubs/stub-ssh.py'),
        )
        #context.call(mitogen.utils.log_to_file, '/tmp/log')
        #context.call(mitogen.utils.disable_site_packages)
        self.assertEqual(3, context.call(plain_old_module.add, 1, 2))


class SshMixin(testlib.DockerMixin):
    def test_debug_decoding(self):
        # ensure filter_debug_logs() decodes the logged string.
        capture = testlib.LogCapturer()
        capture.start()
        try:
            context = self.docker_ssh(
                username='mitogen__has_sudo',
                password='has_sudo_password',
                ssh_debug_level=3,
            )
        finally:
            s = capture.stop()

        expect = "%s: debug1: Reading configuration data" % (context.name,)
        self.assertIn(expect, s)

    def test_bash_permission_denied(self):
        # issue #271: only match Permission Denied at start of line.
        context = self.docker_ssh(
            username='mitogen__permdenied',
            password='permdenied_password',
            ssh_debug_level=3,
        )

    def test_stream_name(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo',
            password='has_sudo_password',
        )
        name = 'ssh.%s:%s' % (
            self.dockerized_ssh.host,
            self.dockerized_ssh.port,
        )
        self.assertEqual(name, context.name)

    def test_via_stream_name(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo_nopw',
            password='has_sudo_nopw_password',
        )
        sudo = self.router.sudo(via=context)

        name = 'ssh.%s:%s.sudo.root' % (
            self.dockerized_ssh.host,
            self.dockerized_ssh.port,
        )
        self.assertEqual(name, sudo.name)

    def test_password_required(self):
        e = self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.docker_ssh(
                username='mitogen__has_sudo',
            )
        )
        self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)

    def test_password_incorrect(self):
        e = self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.docker_ssh(
                username='mitogen__has_sudo',
                password='badpw',
            )
        )
        self.assertEqual(e.args[0], mitogen.ssh.password_incorrect_msg)

    def test_password_specified(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo',
            password='has_sudo_password',
        )

        self.assertEqual(
            'i-am-mitogen-test-docker-image\n',
            context.call(plain_old_module.get_sentinel_value),
        )

    def test_pubkey_required(self):
        e = self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.docker_ssh(
                username='mitogen__has_sudo_pubkey',
            )
        )
        self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)

    def test_pubkey_specified(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo_pubkey',
            identity_file=testlib.data_path('docker/mitogen__has_sudo_pubkey.key'),
        )
        self.assertEqual(
            'i-am-mitogen-test-docker-image\n',
            context.call(plain_old_module.get_sentinel_value),
        )

    def test_enforce_unknown_host_key(self):
        fp = tempfile.NamedTemporaryFile()
        ssh_args = self.docker_ssh_default_kwargs.get('ssh_args', [])
        try:
            e = self.assertRaises(mitogen.ssh.HostKeyError,
                lambda: self.docker_ssh(
                    username='mitogen__has_sudo_pubkey',
                    password='has_sudo_password',
                    ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
                    check_host_keys='enforce',
                )
            )
            self.assertEqual(e.args[0], mitogen.ssh.hostkey_failed_msg)
        finally:
            fp.close()

    def test_accept_enforce_host_keys(self):
        fp = tempfile.NamedTemporaryFile()
        ssh_args = self.docker_ssh_default_kwargs.get('ssh_args', [])
        try:
            context = self.docker_ssh(
                username='mitogen__has_sudo',
                password='has_sudo_password',
                ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
                check_host_keys='accept',
            )
            context.shutdown(wait=True)

            fp.seek(0)
            # Lame test, but we're about to use enforce mode anyway, which
            # verifies the file contents.
            self.assertGreater(len(fp.read()), 0)

            context = self.docker_ssh(
                username='mitogen__has_sudo',
                password='has_sudo_password',
                ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
                check_host_keys='enforce',
            )
            context.shutdown(wait=True)
        finally:
            fp.close()


for distro_spec in testlib.DISTRO_SPECS.split():
    dockerized_ssh = testlib.DockerizedSshDaemon(distro_spec)
    klass_name = 'SshTest%s' % (dockerized_ssh.distro.capitalize(),)
    klass = type(
        klass_name,
        (SshMixin, testlib.TestCase),
        {'dockerized_ssh': dockerized_ssh},
    )
    globals()[klass_name] = klass


class BannerMixin(testlib.DockerMixin):
    # Verify the ability to disambiguate random spam appearing in the SSHd's
    # login banner from a legitimate password prompt.
    def test_verbose_enabled(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo',
            password='has_sudo_password',
            ssh_debug_level=3,
        )
        name = 'ssh.%s:%s' % (
            self.dockerized_ssh.host,
            self.dockerized_ssh.port,
        )
        self.assertEqual(name, context.name)
        context.shutdown(wait=True)


for distro_spec in testlib.DISTRO_SPECS.split():
    dockerized_ssh = testlib.DockerizedSshDaemon(distro_spec)
    klass_name = 'BannerTest%s' % (dockerized_ssh.distro.capitalize(),)
    klass = type(
        klass_name,
        (BannerMixin, testlib.TestCase),
        {'dockerized_ssh': dockerized_ssh},
    )
    globals()[klass_name] = klass


class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
    def test_classic_prompt(self):
        self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic'))

    def test_openssh_75_prompt(self):
        self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75'))


class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
    def test_check_host_keys_accept(self):
        # required=true, host_key_checking=accept
        context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')
        self.assertEqual('1', context.call(os.getenv, 'STDERR_WAS_TTY'))

    def test_check_host_keys_enforce(self):
        # required=false, host_key_checking=enforce
        context = self.stub_ssh(check_host_keys='enforce')
        self.assertEqual(None, context.call(os.getenv, 'STDERR_WAS_TTY'))

    def test_check_host_keys_ignore(self):
        # required=false, host_key_checking=ignore
        context = self.stub_ssh(check_host_keys='ignore')
        self.assertEqual(None, context.call(os.getenv, 'STDERR_WAS_TTY'))

    def test_password_present(self):
        # required=true, password is not None
        context = self.stub_ssh(check_host_keys='ignore', password='willick')
        self.assertEqual('1', context.call(os.getenv, 'STDERR_WAS_TTY'))