"""
Asyncio-based hub, originally implemented by Miguel Grinberg.
"""

# The various modules involved in asyncio need to call the original, unpatched
# standard library APIs to work: socket, select, threading, and so on. We
# therefore don't import them on the module level, since that would involve
# their imports getting patched, and instead delay importing them as much as
# possible. Then, we do a little song and dance in Hub.__init__ below so that
# when they're imported they import the original modules (select, socket, etc)
# rather than the patched ones.

import os
import sys

from eventlet.hubs import hub
from eventlet.patcher import _unmonkey_patch_asyncio_all


def is_available():
    """
    Indicate whether this hub is available, since some hubs are
    platform-specific.

    Python always has asyncio, so this is always ``True``.
    """
    return True


class Hub(hub.BaseHub):
    """An Eventlet hub implementation on top of an asyncio event loop."""

    def __init__(self):
        super().__init__()

        # Pre-emptively make sure we're using the right modules:
        _unmonkey_patch_asyncio_all()

        # The presumption is that eventlet is driving the event loop, so we
        # want a new one we control.
        import asyncio

        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.sleep_event = asyncio.Event()

        import asyncio.events
        if hasattr(asyncio.events, "on_fork"):
            # Allow post-fork() child to continue using the same event loop.
            # This is a terrible idea.
            asyncio.events.on_fork.__code__ = (lambda: None).__code__
        else:
            # On Python 3.9-3.11, there's a thread local we need to reset.
            # Also a terrible idea.
            def re_register_loop(loop=self.loop):
                asyncio.events._set_running_loop(loop)

            os.register_at_fork(after_in_child=re_register_loop)

    def add_timer(self, timer):
        """
        Register a ``Timer``.

        Typically not called directly by users.
        """
        super().add_timer(timer)
        self.sleep_event.set()

    def _file_cb(self, cb, fileno):
        """
        Callback called by ``asyncio`` when a file descriptor has an event.
        """
        try:
            cb(fileno)
        except self.SYSTEM_EXCEPTIONS:
            raise
        except:
            self.squelch_exception(fileno, sys.exc_info())
        self.sleep_event.set()

    def add(self, evtype, fileno, cb, tb, mark_as_closed):
        """
        Add a file descriptor of given event type to the ``Hub``.  See the
        superclass for details.

        Typically not called directly by users.
        """
        try:
            os.fstat(fileno)
        except OSError:
            raise ValueError("Invalid file descriptor")
        already_listening = self.listeners[evtype].get(fileno) is not None
        listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
        if not already_listening:
            if evtype == hub.READ:
                self.loop.add_reader(fileno, self._file_cb, cb, fileno)
            else:
                self.loop.add_writer(fileno, self._file_cb, cb, fileno)
        return listener

    def remove(self, listener):
        """
        Remove a listener from the ``Hub``.  See the superclass for details.

        Typically not called directly by users.
        """
        super().remove(listener)
        evtype = listener.evtype
        fileno = listener.fileno
        if not self.listeners[evtype].get(fileno):
            if evtype == hub.READ:
                self.loop.remove_reader(fileno)
            else:
                self.loop.remove_writer(fileno)

    def remove_descriptor(self, fileno):
        """
        Remove a file descriptor from the ``asyncio`` loop.

        Typically not called directly by users.
        """
        have_read = self.listeners[hub.READ].get(fileno)
        have_write = self.listeners[hub.WRITE].get(fileno)
        super().remove_descriptor(fileno)
        if have_read:
            self.loop.remove_reader(fileno)
        if have_write:
            self.loop.remove_writer(fileno)

    def run(self, *a, **kw):
        """
        Start the ``Hub`` running. See the superclass for details.
        """
        import asyncio

        async def async_run():
            if self.running:
                raise RuntimeError("Already running!")
            try:
                self.running = True
                self.stopping = False
                while not self.stopping:
                    while self.closed:
                        # We ditch all of these first.
                        self.close_one()
                    self.prepare_timers()
                    if self.debug_blocking:
                        self.block_detect_pre()
                    self.fire_timers(self.clock())
                    if self.debug_blocking:
                        self.block_detect_post()
                    self.prepare_timers()
                    wakeup_when = self.sleep_until()
                    if wakeup_when is None:
                        sleep_time = self.default_sleep()
                    else:
                        sleep_time = wakeup_when - self.clock()
                    if sleep_time > 0:
                        try:
                            await asyncio.wait_for(self.sleep_event.wait(), sleep_time)
                        except asyncio.TimeoutError:
                            pass
                        self.sleep_event.clear()
                    else:
                        await asyncio.sleep(0)
                else:
                    self.timers_canceled = 0
                    del self.timers[:]
                    del self.next_timers[:]
            finally:
                self.running = False
                self.stopping = False

        self.loop.run_until_complete(async_run())
