import errno
import fcntl
import os
import platform
import signal
import sys
import time
import unittest

try:
    from unittest import mock
except ImportError:
    import mock
import testlib

import mitogen.core
import mitogen.parent

try:
    file
except NameError:
    from io import FileIO as file


def wait_for_child(pid, timeout=1.0):
    deadline = mitogen.core.now() + timeout
    while timeout < mitogen.core.now():
        try:
            target_pid, status = os.waitpid(pid, os.WNOHANG)
            if target_pid == pid:
                return
        except OSError:
            e = sys.exc_info()[1]
            if e.args[0] == errno.ECHILD:
                return

        time.sleep(0.05)

    assert False, "wait_for_child() timed out"


@mitogen.core.takes_econtext
def call_func_in_sibling(ctx, econtext, sync_sender):
    recv = ctx.call_async(time.sleep, 99999)
    sync_sender.send(None)
    recv.get().unpickle()


def wait_for_empty_output_queue(sync_recv, context):
    # wait for sender to submit their RPC. Since the RPC is sent first, the
    # message sent to this sender cannot arrive until we've routed the RPC.
    sync_recv.get()

    router = context.router
    broker = router.broker
    while True:
        # Now wait for the RPC to exit the output queue.
        stream = router.stream_by_id(context.context_id)
        if broker.defer_sync(lambda: stream.protocol.pending_bytes()) == 0:
            return
        time.sleep(0.1)


class GetDefaultRemoteNameTest(testlib.TestCase):
    func = staticmethod(mitogen.parent.get_default_remote_name)

    @mock.patch('os.getpid')
    @mock.patch('getpass.getuser')
    @mock.patch('socket.gethostname')
    def test_slashes(self, mock_gethostname, mock_getuser, mock_getpid):
        # Ensure slashes appearing in the remote name are replaced with
        # underscores.
        mock_gethostname.return_value = 'box'
        mock_getuser.return_value = 'ECORP\\Administrator'
        mock_getpid.return_value = 123
        self.assertEqual("ECORP_Administrator@box:123", self.func())


class ReturncodeToStrTest(testlib.TestCase):
    func = staticmethod(mitogen.parent.returncode_to_str)

    def test_return_zero(self):
        self.assertEqual(self.func(0), 'exited with return code 0')

    def test_return_one(self):
        self.assertEqual(self.func(1), 'exited with return code 1')

    def test_sigkill(self):
        self.assertEqual(self.func(-signal.SIGKILL),
            'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),)
        )

    # can't test SIGSTOP without POSIX sessions rabbithole


class ReapChildTest(testlib.RouterMixin, testlib.TestCase):
    def test_connect_timeout(self):
        # Ensure the child process is reaped if the connection times out.
        options = mitogen.parent.Options(
            old_router=self.router,
            max_message_size=self.router.max_message_size,
            python_path=testlib.data_path('python_never_responds.py'),
            connect_timeout=0.5,
        )

        conn = mitogen.parent.Connection(options, router=self.router)
        self.assertRaises(mitogen.core.TimeoutError,
            lambda: conn.connect(context=mitogen.core.Context(None, 1234))
        )
        wait_for_child(conn.proc.pid)
        e = self.assertRaises(OSError,
            lambda: os.kill(conn.proc.pid, 0)
        )
        self.assertEqual(e.args[0], errno.ESRCH)


class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
    def test_direct_eof(self):
        e = self.assertRaises(mitogen.core.StreamError,
            lambda: self.router.local(
                python_path='true',
                connect_timeout=3,
            )
        )
        prefix = mitogen.parent.Connection.eof_error_msg
        self.assertTrue(e.args[0].startswith(prefix))

    def test_via_eof(self):
        # Verify FD leakage does not keep failed process open.
        local = self.router.local()
        e = self.assertRaises(mitogen.core.StreamError,
            lambda: self.router.local(
                via=local,
                python_path='echo',
                connect_timeout=3,
            )
        )
        expect = mitogen.parent.Connection.eof_error_msg
        self.assertIn(expect, e.args[0])

    def test_direct_enoent(self):
        e = self.assertRaises(mitogen.core.StreamError,
            lambda: self.router.local(
                python_path='derp',
                connect_timeout=3,
            )
        )
        prefix = 'Child start failed: [Errno 2] No such file or directory'
        self.assertTrue(e.args[0].startswith(prefix))

    def test_via_enoent(self):
        local = self.router.local()
        e = self.assertRaises(mitogen.core.StreamError,
            lambda: self.router.local(
                via=local,
                python_path='derp',
                connect_timeout=3,
            )
        )
        s = 'Child start failed: [Errno 2] No such file or directory'
        self.assertIn(s, e.args[0])


class ContextTest(testlib.RouterMixin, testlib.TestCase):
    def test_context_shutdown(self):
        local = self.router.local()
        pid = local.call(os.getpid)
        local.shutdown(wait=True)
        wait_for_child(pid)
        self.assertRaises(OSError, lambda: os.kill(pid, 0))


class OpenPtyTest(testlib.TestCase):
    func = staticmethod(mitogen.parent.openpty)

    def test_pty_returned(self):
        master_fp, slave_fp = self.func()
        try:
            self.assertTrue(master_fp.isatty())
            self.assertIsInstance(master_fp, file)
            self.assertTrue(slave_fp.isatty())
            self.assertIsInstance(slave_fp, file)
        finally:
            master_fp.close()
            slave_fp.close()

    @mock.patch('os.openpty')
    def test_max_reached(self, openpty):
        openpty.side_effect = OSError(errno.ENXIO)
        e = self.assertRaises(mitogen.core.StreamError,
                              lambda: self.func())
        msg = mitogen.parent.OPENPTY_MSG % (openpty.side_effect,)
        self.assertEqual(e.args[0], msg)

    @unittest.skipIf(condition=(os.uname()[0] != 'Linux'),
                      reason='Fallback only supported on Linux')
    @unittest.skipIf(condition=(platform.machine() == 'ppc64le'),
                      reason='Workaround broken on ppc64el')
    @mock.patch('os.openpty')
    def test_broken_linux_fallback(self, openpty):
        openpty.side_effect = OSError(errno.EPERM)
        master_fp, slave_fp = self.func()
        try:
            st = os.fstat(master_fp.fileno())
            self.assertEqual(5, os.major(st.st_rdev))
            flags = fcntl.fcntl(master_fp.fileno(), fcntl.F_GETFL)
            self.assertTrue(flags & os.O_RDWR)

            st = os.fstat(slave_fp.fileno())
            self.assertEqual(136, os.major(st.st_rdev))
            flags = fcntl.fcntl(slave_fp.fileno(), fcntl.F_GETFL)
            self.assertTrue(flags & os.O_RDWR)
        finally:
            master_fp.close()
            slave_fp.close()


class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
    def test_child_disconnected(self):
        # Easy mode: process notices its own directly connected child is
        # disconnected.
        c1 = self.router.local()
        recv = c1.call_async(time.sleep, 9999)
        c1.shutdown(wait=True)
        e = self.assertRaises(mitogen.core.ChannelError,
            lambda: recv.get())
        self.assertEqual(e.args[0], self.router.respondent_disconnect_msg)

    def test_indirect_child_disconnected(self):
        # Achievement unlocked: process notices an indirectly connected child
        # is disconnected.
        c1 = self.router.local()
        c2 = self.router.local(via=c1)
        recv = c2.call_async(time.sleep, 9999)
        c2.shutdown(wait=True)
        e = self.assertRaises(mitogen.core.ChannelError,
            lambda: recv.get())
        self.assertEqual(e.args[0], self.router.respondent_disconnect_msg)

    def test_indirect_child_intermediary_disconnected(self):
        # Battlefield promotion: process notices indirect child disconnected
        # due to an intermediary child disconnecting.
        c1 = self.router.local()
        c2 = self.router.local(via=c1)
        recv = c2.call_async(time.sleep, 9999)
        c1.shutdown(wait=True)
        e = self.assertRaises(mitogen.core.ChannelError,
            lambda: recv.get())
        self.assertEqual(e.args[0], self.router.respondent_disconnect_msg)

    def test_near_sibling_disconnected(self):
        # Hard mode: child notices sibling connected to same parent has
        # disconnected.
        c1 = self.router.local()
        c2 = self.router.local()

        # Let c1 call functions in c2.
        self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
        c1.call(mitogen.parent.upgrade_router)

        sync_recv = mitogen.core.Receiver(self.router)
        recv = c1.call_async(call_func_in_sibling, c2,
            sync_sender=sync_recv.to_sender())

        wait_for_empty_output_queue(sync_recv, c2)
        c2.shutdown(wait=True)

        e = self.assertRaises(mitogen.core.CallError,
            lambda: recv.get().unpickle())
        s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg
        self.assertTrue(e.args[0].startswith(s), str(e))

    def test_far_sibling_disconnected(self):
        # God mode: child of child notices child of child of parent has
        # disconnected.
        c1 = self.router.local(name='c1')
        c11 = self.router.local(name='c11', via=c1)

        c2 = self.router.local(name='c2')
        c22 = self.router.local(name='c22', via=c2)

        # Let c1 call functions in c2.
        self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
        c11.call(mitogen.parent.upgrade_router)

        sync_recv = mitogen.core.Receiver(self.router)
        recv = c11.call_async(call_func_in_sibling, c22,
            sync_sender=sync_recv.to_sender())

        wait_for_empty_output_queue(sync_recv, c22)
        c22.shutdown(wait=True)

        e = self.assertRaises(mitogen.core.CallError,
            lambda: recv.get().unpickle())
        s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg
        self.assertTrue(e.args[0].startswith(s))
