import os
import tempfile

import mitogen.ssh
import mitogen.utils

import testlib
import plain_old_module


class StubSshMixin(testlib.RouterMixin):
    """
    Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'.
    """
    def stub_ssh(self, STUBSSH_MODE=None, **kwargs):
        os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE)
        try:
            return self.router.ssh(
                hostname='hostname',
                username='mitogen__has_sudo',
                python_path='python3',
                ssh_path=testlib.data_path('stubs/stub-ssh.py'),
                **kwargs
            )
        finally:
            del os.environ['STUBSSH_MODE']


class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
    def test_okay(self):
        context = self.router.ssh(
            hostname='hostname',
            username='mitogen__has_sudo',
            python_path='python3',
            ssh_path=testlib.data_path('stubs/stub-ssh.py'),
        )
        #context.call(mitogen.utils.log_to_file, '/tmp/log')
        #context.call(mitogen.utils.disable_site_packages)
        self.assertEqual(3, context.call(plain_old_module.add, 1, 2))


class SshMixin(testlib.DockerMixin):
    def test_debug_decoding(self):
        # ensure filter_debug_logs() decodes the logged string.
        capture = testlib.LogCapturer()
        capture.start()
        try:
            context = self.docker_ssh(
                username='mitogen__has_sudo',
                password='has_sudo_password',
                ssh_debug_level=3,
            )
        finally:
            s = capture.stop()

        expect = "%s: debug1: Reading configuration data" % (context.name,)
        self.assertIn(expect, s)

    def test_bash_permission_denied(self):
        # issue #271: only match Permission Denied at start of line.
        context = self.docker_ssh(
            username='mitogen__permdenied',
            password='permdenied_password',
            ssh_debug_level=3,
        )

    def test_stream_name(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo',
            password='has_sudo_password',
        )
        name = 'ssh.%s:%s' % (
            self.dockerized_ssh.host,
            self.dockerized_ssh.port,
        )
        self.assertEqual(name, context.name)

    def test_via_stream_name(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo_nopw',
            password='has_sudo_nopw_password',
        )
        sudo = self.router.sudo(via=context)

        name = 'ssh.%s:%s.sudo.root' % (
            self.dockerized_ssh.host,
            self.dockerized_ssh.port,
        )
        self.assertEqual(name, sudo.name)

    def test_password_required(self):
        e = self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.docker_ssh(
                username='mitogen__has_sudo',
            )
        )
        self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)

    def test_password_incorrect(self):
        e = self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.docker_ssh(
                username='mitogen__has_sudo',
                password='badpw',
            )
        )
        self.assertEqual(e.args[0], mitogen.ssh.password_incorrect_msg)

    def test_password_specified(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo',
            password='has_sudo_password',
        )

        self.assertEqual(
            'i-am-mitogen-test-docker-image\n',
            context.call(plain_old_module.get_sentinel_value),
        )

    def test_pubkey_required(self):
        e = self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.docker_ssh(
                username='mitogen__has_sudo_pubkey',
            )
        )
        self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)

    def test_pubkey_specified(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo_pubkey',
            identity_file=testlib.data_path('docker/mitogen__has_sudo_pubkey.key'),
        )
        self.assertEqual(
            'i-am-mitogen-test-docker-image\n',
            context.call(plain_old_module.get_sentinel_value),
        )

    def test_enforce_unknown_host_key(self):
        fp = tempfile.NamedTemporaryFile()
        ssh_args = self.docker_ssh_default_kwargs.get('ssh_args', [])
        try:
            e = self.assertRaises(mitogen.ssh.HostKeyError,
                lambda: self.docker_ssh(
                    username='mitogen__has_sudo_pubkey',
                    password='has_sudo_password',
                    ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
                    check_host_keys='enforce',
                )
            )
            self.assertEqual(e.args[0], mitogen.ssh.hostkey_failed_msg)
        finally:
            fp.close()

    def test_accept_enforce_host_keys(self):
        fp = tempfile.NamedTemporaryFile()
        ssh_args = self.docker_ssh_default_kwargs.get('ssh_args', [])
        try:
            context = self.docker_ssh(
                username='mitogen__has_sudo',
                password='has_sudo_password',
                ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
                check_host_keys='accept',
            )
            context.shutdown(wait=True)

            fp.seek(0)
            # Lame test, but we're about to use enforce mode anyway, which
            # verifies the file contents.
            self.assertGreater(len(fp.read()), 0)

            context = self.docker_ssh(
                username='mitogen__has_sudo',
                password='has_sudo_password',
                ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
                check_host_keys='enforce',
            )
            context.shutdown(wait=True)
        finally:
            fp.close()


for distro_spec in testlib.DISTRO_SPECS.split():
    dockerized_ssh = testlib.DockerizedSshDaemon(distro_spec)
    klass_name = 'SshTest%s' % (dockerized_ssh.distro.capitalize(),)
    klass = type(
        klass_name,
        (SshMixin, testlib.TestCase),
        {'dockerized_ssh': dockerized_ssh},
    )
    globals()[klass_name] = klass


class BannerMixin(testlib.DockerMixin):
    # Verify the ability to disambiguate random spam appearing in the SSHd's
    # login banner from a legitimate password prompt.
    def test_verbose_enabled(self):
        context = self.docker_ssh(
            username='mitogen__has_sudo',
            password='has_sudo_password',
            ssh_debug_level=3,
        )
        name = 'ssh.%s:%s' % (
            self.dockerized_ssh.host,
            self.dockerized_ssh.port,
        )
        self.assertEqual(name, context.name)
        context.shutdown(wait=True)


for distro_spec in testlib.DISTRO_SPECS.split():
    dockerized_ssh = testlib.DockerizedSshDaemon(distro_spec)
    klass_name = 'BannerTest%s' % (dockerized_ssh.distro.capitalize(),)
    klass = type(
        klass_name,
        (BannerMixin, testlib.TestCase),
        {'dockerized_ssh': dockerized_ssh},
    )
    globals()[klass_name] = klass


class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
    def test_classic_prompt(self):
        self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic'))

    def test_openssh_75_prompt(self):
        self.assertRaises(mitogen.ssh.PasswordError,
            lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75'))


class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
    def test_check_host_keys_accept(self):
        # required=true, host_key_checking=accept
        context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')
        self.assertEqual('1', context.call(os.getenv, 'STDERR_WAS_TTY'))

    def test_check_host_keys_enforce(self):
        # required=false, host_key_checking=enforce
        context = self.stub_ssh(check_host_keys='enforce')
        self.assertEqual(None, context.call(os.getenv, 'STDERR_WAS_TTY'))

    def test_check_host_keys_ignore(self):
        # required=false, host_key_checking=ignore
        context = self.stub_ssh(check_host_keys='ignore')
        self.assertEqual(None, context.call(os.getenv, 'STDERR_WAS_TTY'))

    def test_password_present(self):
        # required=true, password is not None
        context = self.stub_ssh(check_host_keys='ignore', password='willick')
        self.assertEqual('1', context.call(os.getenv, 'STDERR_WAS_TTY'))
