#!/usr/bin/env python
import sys
import os
import shutil
import tempfile
import unittest

if sys.platform != 'win32':
    from pexpect import pxssh
from .PexpectTestCase import PexpectTestCase

class SSHTestBase(PexpectTestCase):
    def setUp(self):
        super(SSHTestBase, self).setUp()
        self.tempdir = tempfile.mkdtemp()
        self.orig_path = os.environ.get('PATH')
        os.symlink(self.PYTHONBIN, os.path.join(self.tempdir, 'python'))
        fakessh_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'fakessh'))
        os.environ['PATH'] = self.tempdir + os.pathsep + fakessh_dir + \
                    ((os.pathsep + self.orig_path) if self.orig_path else '')

    def tearDown(self):
        shutil.rmtree(self.tempdir)
        if self.orig_path:
            os.environ['PATH'] = self.orig_path
        else:
            del os.environ['PATH']

class PxsshTestCase(SSHTestBase):
    def test_fake_ssh(self):
        ssh = pxssh.pxssh()
        #ssh.logfile_read = sys.stdout  # DEBUG
        ssh.login('server', 'me', password='s3cret')
        ssh.sendline('ping')
        ssh.expect('pong', timeout=10)
        assert ssh.prompt(timeout=10)
        ssh.logout()

    def test_wrong_pw(self):
        ssh = pxssh.pxssh()
        try:
            ssh.login('server', 'me', password='wr0ng')
        except pxssh.ExceptionPxssh:
            pass
        else:
            assert False, 'Password should have been refused'

    def test_failed_set_unique_prompt(self):
        ssh = pxssh.pxssh()
        ssh.set_unique_prompt = lambda: False
        try:
            ssh.login('server', 'me', password='s3cret',
                      auto_prompt_reset=True)
        except pxssh.ExceptionPxssh:
            pass
        else:
            assert False, 'should have raised exception, pxssh.ExceptionPxssh'

    def test_connection_refused(self):
        ssh = pxssh.pxssh()
        try:
            ssh.login('noserver', 'me', password='s3cret')
        except pxssh.ExceptionPxssh:
            pass
        else:
            assert False, 'should have raised exception, pxssh.ExceptionPxssh'

    def test_ssh_tunnel_string(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        tunnels = { 'local': ['2424:localhost:22'],'remote': ['2525:localhost:22'],
            'dynamic': [8888] }
        confirmation_strings = 0
        confirmation_array = ['-R 2525:localhost:22','-L 2424:localhost:22','-D 8888']
        string = ssh.login('server', 'me', password='s3cret', ssh_tunnels=tunnels)
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1

        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated from tunneling is incorrect.'

    def test_remote_ssh_tunnel_string(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        tunnels = { 'local': ['2424:localhost:22'],'remote': ['2525:localhost:22'],
            'dynamic': [8888] }
        confirmation_strings = 0
        confirmation_array = ['-R 2525:localhost:22','-L 2424:localhost:22','-D 8888']
        string = ssh.login('server', 'me', password='s3cret', ssh_tunnels=tunnels, spawn_local_ssh=False)
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1

        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated from remote tunneling is incorrect.'

    def test_ssh_config_passing_string(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        temp_file = tempfile.NamedTemporaryFile()
        config_path = temp_file.name
        string = ssh.login('server', 'me', password='s3cret', spawn_local_ssh=False, ssh_config=config_path)
        if not '-F '+config_path in string:
            assert False, 'String generated from SSH config passing is incorrect.'

    def test_username_or_ssh_config(self):
        try:
            ssh = pxssh.pxssh(debug_command_string=True)
            temp_file = tempfile.NamedTemporaryFile()
            config_path = temp_file.name
            string = ssh.login('server')
            raise AssertionError('Should have failed due to missing username and missing ssh_config.')
        except TypeError:
            pass

    def test_ssh_config_user(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        temp_file = tempfile.NamedTemporaryFile()
        config_path = temp_file.name
        temp_file.write(b'HosT server\n'
                        b'UsEr me\n'
                        b'hOSt not-server\n')
        temp_file.seek(0)
        string = ssh.login('server', ssh_config=config_path)

    def test_ssh_config_no_username_empty_config(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        temp_file = tempfile.NamedTemporaryFile()
        config_path = temp_file.name
        try:
            string = ssh.login('server', ssh_config=config_path)
            raise AssertionError('Should have failed due to no Host.')
        except TypeError:
            pass

    def test_ssh_config_wrong_Host(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        temp_file = tempfile.NamedTemporaryFile()
        config_path = temp_file.name
        temp_file.write(b'Host not-server\n'
                        b'Host also-not-server\n')
        temp_file.seek(0)
        try:
            string = ssh.login('server', ssh_config=config_path)
            raise AssertionError('Should have failed due to no matching Host.')
        except TypeError:
            pass

    def test_ssh_config_no_user(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        temp_file = tempfile.NamedTemporaryFile()
        config_path = temp_file.name
        temp_file.write(b'Host server\n'
                        b'Host not-server\n')
        temp_file.seek(0)
        try:
            string = ssh.login('server', ssh_config=config_path)
            raise AssertionError('Should have failed due to no user.')
        except TypeError:
            pass

    def test_ssh_config_empty_user(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        temp_file = tempfile.NamedTemporaryFile()
        config_path = temp_file.name
        temp_file.write(b'Host server\n'
                        b'user   \n'
                        b'Host not-server\n')
        temp_file.seek(0)
        try:
            string = ssh.login('server', ssh_config=config_path)
            raise AssertionError('Should have failed due to empty user.')
        except TypeError:
            pass

    def test_ssh_key_string(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        confirmation_strings = 0
        confirmation_array = [' -A']
        string = ssh.login('server', 'me', password='s3cret', ssh_key=True)
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1

        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated from forcing the SSH agent sock is incorrect.'

        confirmation_strings = 0
        temp_file = tempfile.NamedTemporaryFile()
        ssh_key = temp_file.name
        confirmation_array = [' -i '+ssh_key]
        string = ssh.login('server', 'me', password='s3cret', ssh_key=ssh_key)
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1
        
        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated from adding an SSH key is incorrect.'

    def test_custom_ssh_cmd_debug(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
            + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
            + 'aes256-cbc,arcfour'
        confirmation_strings = 0
        confirmation_array = [cipher_string, '-2']
        string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1

        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated for custom ssh client command is incorrect.'

    def test_custom_ssh_cmd_debug(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
            + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
            + 'aes256-cbc,arcfour'
        confirmation_strings = 0
        confirmation_array = [cipher_string, '-2']
        string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1

        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated for custom ssh client command is incorrect.'

    def test_failed_custom_ssh_cmd_debug(self):
        ssh = pxssh.pxssh(debug_command_string=True)
        cipher_string = '-c invalid_cipher'
        confirmation_strings = 0
        confirmation_array = [cipher_string, '-2']
        string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
        for confirmation in confirmation_array:
            if confirmation in string:
                confirmation_strings+=1

        if confirmation_strings!=len(confirmation_array):
            assert False, 'String generated for custom ssh client command is incorrect.'

    def test_custom_ssh_cmd(self):
        try:
            ssh = pxssh.pxssh()
            cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
                + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
                + 'aes256-cbc,arcfour'
            result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')

            ssh.PROMPT = r'Closed connection'
            ssh.sendline('exit')
            ssh.prompt(timeout=5)
            string = str(ssh.before) + str(ssh.after)
    
            if 'Closed connection' not in string:
                assert False, 'should have logged into Mock SSH client and exited'
        except pxssh.ExceptionPxssh as e:
            assert False, 'should not have raised exception, pxssh.ExceptionPxssh'
        else:
            pass

    def test_failed_custom_ssh_cmd(self):
        try:
            ssh = pxssh.pxssh()
            cipher_string = '-c invalid_cipher'
            result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')

            ssh.PROMPT = r'Closed connection'
            ssh.sendline('exit')
            ssh.prompt(timeout=5)
            string = str(ssh.before) + str(ssh.after)
    
            if 'Closed connection' not in string:
                assert False, 'should not have completed logging into Mock SSH client and exited'
        except pxssh.ExceptionPxssh as e:
            pass
        else:
            assert False, 'should have raised exception, pxssh.ExceptionPxssh'

    def test_login_bash(self):
        ssh = pxssh.pxssh()
        result = ssh.login('server bash', 'me', password='s3cret')
        ssh.sendline('ping')
        ssh.expect('pong', timeout=10)
        assert ssh.prompt(timeout=10)
        ssh.logout()

    def test_login_zsh(self):
        ssh = pxssh.pxssh()
        result = ssh.login('server zsh', 'me', password='s3cret')
        ssh.sendline('ping')
        ssh.expect('pong', timeout=10)
        assert ssh.prompt(timeout=10)
        ssh.logout()

    def test_login_tcsh(self):
        ssh = pxssh.pxssh()
        result = ssh.login('server tcsh', 'me', password='s3cret')
        ssh.sendline('ping')
        ssh.expect('pong', timeout=10)
        assert ssh.prompt(timeout=10)
        ssh.logout()

if __name__ == '__main__':
    unittest.main()
