"""PEP 3156 event loop based on GLib."""

import asyncio
import os
import signal
import socket
import sys
import threading
import warnings
import weakref
from asyncio import CancelledError, constants, events, sslproto, tasks

try:
    from gi.repository import Gio, GLib
except ImportError:  # pragma: no cover
    GLib = None
    Gio = None


from . import transports

if hasattr(os, "set_blocking"):

    def _set_nonblocking(fd):
        os.set_blocking(fd, False)

elif sys.platform == "win32":

    def _set_nonblocking(fd):
        pass

else:
    import fcntl

    def _set_nonblocking(fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, flags)


__all__ = ["GLibEventLoop", "GLibEventLoopPolicy"]


# The Windows `asyncio` implementation doesn't actually use this, but
# `glib` abstracts so nicely over this that we can use it on any platform
if sys.platform == "win32":

    class AbstractChildWatcher:
        pass

else:
    from asyncio.unix_events import AbstractChildWatcher


with warnings.catch_warnings():
    warnings.simplefilter("ignore", DeprecationWarning)

    class GLibChildWatcher(AbstractChildWatcher):
        def __init__(self):
            self._sources = {}
            self._handles = {}

        # On windows on has to open a process handle for the given PID number
        # before it's possible to use GLib's `child_watch_add` on it
        if sys.platform == "win32":

            def _create_handle_for_pid(self, pid):
                import _winapi

                return _winapi.OpenProcess(0x00100400, 0, pid)

            def _close_process_handle(self, handle):
                import _winapi

                _winapi.CloseHandle(handle)

        else:

            def _create_handle_for_pid(self, pid):
                return pid

            def _close_process_handle(self, pid):
                return None

        def attach_loop(self, loop):
            # just ignored
            pass

        def add_child_handler(self, pid, callback, *args):
            self.remove_child_handler(pid)

            handle = self._create_handle_for_pid(pid)
            source = GLib.child_watch_add(0, handle, self.__callback__)
            self._sources[pid] = source, callback, args, handle
            self._handles[handle] = pid

        def remove_child_handler(self, pid):
            try:
                source, callback, args, handle = self._sources.pop(pid)
                assert self._handles.pop(handle) == pid
            except KeyError:
                return False

            self._close_process_handle(handle)
            GLib.source_remove(source)
            return True

        def close(self):
            for source, callback, args, handle in self._sources.values():
                self._close_process_handle(handle)
                GLib.source_remove(source)
            self._sources = {}
            self._handles = {}

        def __enter__(self):
            return self

        def __exit__(self, a, b, c):
            pass

        def __callback__(self, handle, status):
            try:
                pid = self._handles.pop(handle)
                source, callback, args, handle = self._sources.pop(pid)
            except KeyError:
                return

            self._close_process_handle(handle)
            GLib.source_remove(source)

            if hasattr(os, "WIFSIGNALED") and os.WIFSIGNALED(status):
                returncode = -os.WTERMSIG(status)
            elif hasattr(os, "WIFEXITED") and os.WIFEXITED(status):
                returncode = os.WEXITSTATUS(status)

                # FIXME: Hack for adjusting invalid status returned by GLIB
                #    Looks like there is a bug in glib or in pygobject
                if returncode > 128:
                    returncode = 128 - returncode
            else:
                returncode = status

            callback(pid, returncode, *args)


class GLibHandle(events.Handle):
    __slots__ = ("_source", "_repeat", "_context")

    def __init__(self, *, loop, source, repeat, callback, args, context=None):
        super().__init__(callback, args, loop)

        if sys.version_info[:2] >= (3, 7) and context is None:
            import contextvars

            context = contextvars.copy_context()
        self._context = context
        self._source = source
        self._repeat = repeat
        loop._handlers.add(self)
        source.set_callback(self.__callback__, self)
        source.attach(loop._context)

    def cancel(self):
        super().cancel()
        self._source.destroy()
        self._loop._handlers.discard(self)

    def __callback__(self, ignore_self):
        # __callback__ is called within the MainContext object, which is
        # important in case that code includes a `Gtk.main()` or some such.
        # Otherwise what happens is the loop is started recursively, but the
        # callbacks don't finish firing, so they can't be rescheduled.
        self._run()
        if not self._repeat:
            self._source.destroy()
            self._loop._handlers.discard(self)

        return self._repeat


if sys.platform == "win32":

    class GLibBaseEventLoopPlatformExt:
        def __init__(self):
            pass

        def close(self):
            pass

else:
    from asyncio import unix_events

    class GLibBaseEventLoopPlatformExt(unix_events.SelectorEventLoop):
        """Semi-hack that allows us to leverage the existing implementation of
        Unix domain sockets without having to actually implement a selector
        based event loop.

        Note that both `__init__` and `close` DO NOT and SHOULD NOT ever
        call their parent implementation!
        """

        def __init__(self):
            self._sighandlers = {}
            self._unix_server_sockets = {}

        def close(self):
            for sig in list(self._sighandlers):
                self.remove_signal_handler(sig)

        def add_signal_handler(self, sig, callback, *args):
            self.remove_signal_handler(sig)

            s = GLib.unix_signal_source_new(sig)
            if s is None:
                # Show custom error messages for signal that are uncatchable
                if sig == signal.SIGKILL:
                    raise RuntimeError("cannot catch SIGKILL")
                elif sig == signal.SIGSTOP:
                    raise RuntimeError("cannot catch SIGSTOP")
                else:
                    raise ValueError("signal not supported")

            assert sig not in self._sighandlers

            self._sighandlers[sig] = GLibHandle(
                loop=self, source=s, repeat=True, callback=callback, args=args
            )

        def remove_signal_handler(self, sig):
            try:
                self._sighandlers.pop(sig).cancel()
                return True
            except KeyError:
                return False


class _BaseEventLoop(asyncio.BaseEventLoop):
    """Extra inheritance step that needs to be inserted so that we only ever
    indirectly inherit from `asyncio.BaseEventLoop`.

    This is necessary as the Unix implementation will also indirectly
    inherit from that class (thereby creating diamond inheritance).
    Python permits and fully supports diamond inheritance so this is not
    a problem. However it is, on the other hand, not permitted to
    inherit from a class both directly *and* indirectly – hence we add
    this intermediate class to make sure that can never happen (see
    https://stackoverflow.com/q/29214888 for a minimal example a
    forbidden inheritance tree) and
    https://www.python.org/download/releases/2.3/mro/ for some extensive
    documentation of the allowed inheritance structures in python.
    """


class GLibBaseEventLoop(_BaseEventLoop, GLibBaseEventLoopPlatformExt):
    def __init__(self, context=None):
        self._handlers = set()

        self._accept_futures = {}
        self._context = context or GLib.MainContext()
        self._selector = self
        self._transports = weakref.WeakValueDictionary()
        self._readers = {}
        self._writers = {}

        self._channels = weakref.WeakValueDictionary()

        _BaseEventLoop.__init__(self)
        GLibBaseEventLoopPlatformExt.__init__(self)

    def close(self):
        for future in self._accept_futures.values():
            future.cancel()
        self._accept_futures.clear()

        for s in list(self._handlers):
            s.cancel()
        self._handlers.clear()

        GLibBaseEventLoopPlatformExt.close(self)
        _BaseEventLoop.close(self)

    def select(self, timeout=None):
        self._context.acquire()
        try:
            if timeout is None:
                self._context.iteration(True)
            elif timeout <= 0:
                self._context.iteration(False)
            else:
                # Schedule fake callback that will trigger an event and cause the loop to terminate
                # after the given number of seconds
                handle = GLibHandle(
                    loop=self,
                    source=GLib.Timeout(timeout * 1000),
                    repeat=False,
                    callback=lambda: None,
                    args=(),
                )
                try:
                    self._context.iteration(True)
                finally:
                    handle.cancel()
            return ()  # Available events are dispatched immediately and not returned
        finally:
            self._context.release()

    def _make_socket_transport(
        self, sock, protocol, waiter=None, *, extra=None, server=None
    ):
        """Create socket transport."""
        return transports.SocketTransport(self, sock, protocol, waiter, extra, server)

    def _make_ssl_transport(
        self,
        rawsock,
        protocol,
        sslcontext,
        waiter=None,
        *,
        server_side=False,
        server_hostname=None,
        extra=None,
        server=None,
        ssl_handshake_timeout=None,
        ssl_shutdown_timeout=None,
    ):
        """Create SSL transport."""
        # sslproto._is_sslproto_available was removed from asyncio, starting from Python 3.7.
        if (
            hasattr(sslproto, "_is_sslproto_available")
            and not sslproto._is_sslproto_available()
        ):
            raise NotImplementedError(
                "Proactor event loop requires Python 3.5"
                " or newer (ssl.MemoryBIO) to support "
                "SSL"
            )
        extra_protocol_kwargs = {}
        # Support for the ssl_handshake_timeout keyword argument was added in Python 3.7.
        if sys.version_info[:2] >= (3, 7):
            extra_protocol_kwargs["ssl_handshake_timeout"] = ssl_handshake_timeout
        # Support for the ssl_shutdown_timeout keyword argument was added in Python 3.11.
        if sys.version_info[:2] >= (3, 11):
            extra_protocol_kwargs["ssl_shutdown_timeout"] = ssl_shutdown_timeout

        ssl_protocol = sslproto.SSLProtocol(
            self,
            protocol,
            sslcontext,
            waiter,
            server_side,
            server_hostname,
            **extra_protocol_kwargs,
        )
        transports.SocketTransport(
            self, rawsock, ssl_protocol, extra=extra, server=server
        )
        return ssl_protocol._app_transport

    def _make_datagram_transport(
        self, sock, protocol, address=None, waiter=None, extra=None
    ):
        """Create datagram transport."""
        return transports.DatagramTransport(
            self, sock, protocol, address, waiter, extra
        )

    def _make_read_pipe_transport(self, pipe, protocol, waiter=None, extra=None):
        """Create read pipe transport."""
        channel = self._channel_from_fileobj(pipe)
        return transports.PipeReadTransport(self, channel, protocol, waiter, extra)

    def _make_write_pipe_transport(self, pipe, protocol, waiter=None, extra=None):
        """Create write pipe transport."""
        channel = self._channel_from_fileobj(pipe)
        return transports.PipeWriteTransport(self, channel, protocol, waiter, extra)

    async def _make_subprocess_transport(
        self,
        protocol,
        args,
        shell,
        stdin,
        stdout,
        stderr,
        bufsize,
        extra=None,
        **kwargs,
    ):
        """Create subprocess transport."""
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", DeprecationWarning)
            watcher = events.get_child_watcher()

        with watcher:
            waiter = asyncio.Future(loop=self)
            transport = transports.SubprocessTransport(
                self,
                protocol,
                args,
                shell,
                stdin,
                stdout,
                stderr,
                bufsize,
                waiter=waiter,
                extra=extra,
                **kwargs,
            )

            watcher.add_child_handler(
                transport.get_pid(), self._child_watcher_callback, transport
            )
            try:
                await waiter
            except Exception as exc:
                err = exc
            else:
                err = None
            if err is not None:
                transport.close()
                await transport._wait()
                raise err

        return transport

    def _child_watcher_callback(self, pid, returncode, transport):
        self.call_soon_threadsafe(transport._process_exited, returncode)

    def _write_to_self(self):
        self._context.wakeup()

    def _process_events(self, event_list):
        """Process selector events."""
        pass  # This is already done in `.select()`

    def _start_serving(
        self,
        protocol_factory,
        sock,
        sslcontext=None,
        server=None,
        backlog=100,
        ssl_handshake_timeout=getattr(constants, "SSL_HANDSHAKE_TIMEOUT", 60.0),
        ssl_shutdown_timeout=getattr(constants, "SSL_SHUTDOWN_TIMEOUT", 60.0),
    ):
        self._transports[sock.fileno()] = server

        def server_loop(f=None):
            try:
                if f is not None:
                    (conn, addr) = f.result()
                    protocol = protocol_factory()
                    if sslcontext is not None:
                        self._make_ssl_transport(
                            conn,
                            protocol,
                            sslcontext,
                            server_side=True,
                            extra={"peername": addr},
                            server=server,
                            ssl_handshake_timeout=ssl_handshake_timeout,
                            ssl_shutdown_timeout=ssl_shutdown_timeout,
                        )
                    else:
                        self._make_socket_transport(
                            conn, protocol, extra={"peername": addr}, server=server
                        )
                if self.is_closed():
                    return
                f = self.sock_accept(sock)
            except OSError as exc:
                if sock.fileno() != -1:
                    self.call_exception_handler(
                        {
                            "message": "Accept failed on a socket",
                            "exception": exc,
                            "socket": sock,
                        }
                    )
                    sock.close()
            except CancelledError:
                sock.close()
            else:
                self._accept_futures[sock.fileno()] = f
                f.add_done_callback(server_loop)

        self.call_soon(server_loop)

    def _stop_serving(self, sock):
        if sock.fileno() in self._accept_futures:
            self._accept_futures[sock.fileno()].cancel()
        sock.close()

    def _check_not_coroutine(self, callback, name):
        """Check whether the given callback is a coroutine or not."""
        from asyncio import coroutines

        if coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback):
            raise TypeError(f"coroutines cannot be used with {name}()")

    def _ensure_fd_no_transport(self, fd):
        """Ensure that the given file descriptor is NOT used by any transport.

        Adding another reader to a fd that is already being waited for
        causes a hang on Windows.
        """
        try:
            transport = self._transports[fd]
        except KeyError:
            pass
        else:
            if not hasattr(transport, "is_closing") or not transport.is_closing():
                raise RuntimeError(
                    "File descriptor {!r} is used by transport {!r}".format(
                        fd, transport
                    )
                )

    def _channel_from_socket(self, sock):
        """Create GLib IOChannel for the given file object.

        This function will cache weak references to `GLib.Channel` objects
        it previously has created to prevent weird issues that can occur
        when two GLib channels point to the same underlying socket resource.

        On windows this will only work for network sockets.
        """
        fd = self._fileobj_to_fd(sock)

        sock_id = id(sock)
        try:
            channel = self._channels[sock_id]
        except KeyError:
            if sys.platform == "win32":
                channel = GLib.IOChannel.win32_new_socket(fd)
            else:
                channel = GLib.IOChannel.unix_new(fd)

            # disabling buffering requires setting the encoding to None
            channel.set_encoding(None)
            channel.set_buffered(False)

            self._channels[sock_id] = channel
        return channel

    def _channel_from_fileobj(self, fileobj):
        """Create GLib IOChannel for the given file object.

        On windows this will only work for files and pipes returned
        GLib's C library.
        """
        fd = self._fileobj_to_fd(fileobj)

        # pipes have been shown to be blocking here, so we'll do someone
        # else's job for them.
        _set_nonblocking(fd)

        if sys.platform == "win32":
            channel = GLib.IOChannel.win32_new_fd(fd)
        else:
            channel = GLib.IOChannel.unix_new(fd)

        # disabling buffering requires setting the encoding to None
        channel.set_encoding(None)
        channel.set_buffered(False)
        return channel

    def _fileobj_to_fd(self, fileobj):
        """Obtain the raw file descriptor number for the given file object."""
        if isinstance(fileobj, int):
            fd = fileobj
        else:
            try:
                fd = int(fileobj.fileno())
            except (AttributeError, TypeError, ValueError):
                raise ValueError(f"Invalid file object: {fileobj!r}")
        if fd < 0:
            raise ValueError(f"Invalid file descriptor: {fd}")
        return fd

    def _delayed(self, source, callback=None, *args):
        """Create a future that will complete after the given GLib Source
        object has become ready and the data it tracks has been processed."""
        future = None

        def handle_ready(*args):
            try:
                if callback:
                    (done, result) = callback(*args)
                else:
                    (done, result) = (True, None)

                if done:
                    future.set_result(result)
                    future.handle.cancel()
            except Exception as error:
                if not future.cancelled():
                    future.set_exception(error)
                future.handle.cancel()

        # Create future and properly wire up it's cancellation with the
        # handle's cancellation machinery
        future = asyncio.Future(loop=self)
        future.handle = GLibHandle(
            loop=self, source=source, repeat=True, callback=handle_ready, args=args
        )
        return future

    def _socket_handle_errors(self, sock):
        """Raise exceptions for error states (SOL_ERROR) on the given socket
        object."""
        errno = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if errno != 0:
            if sys.platform == "win32":
                msg = socket.errorTab.get(errno, f"Error {errno}")
                raise OSError(errno, f"[WinError {errno}] {msg}", None, errno)
            else:
                raise OSError(errno, os.strerror(errno))

    ###############################
    # Low-level socket operations #
    ###############################
    def sock_connect(self, sock, address):
        # Request connection on socket (it is expected that `sock` is already non-blocking)
        try:
            sock.connect(address)
        except BlockingIOError:
            pass

        # Create glib IOChannel for socket and wait for it to become writable
        channel = self._channel_from_socket(sock)
        source = GLib.io_create_watch(channel, GLib.IO_OUT)

        def sock_finish_connect(sock):
            self._socket_handle_errors(sock)
            return (True, sock)

        return self._delayed(source, sock_finish_connect, sock)

    def sock_accept(self, sock):
        channel = self._channel_from_socket(sock)
        source = GLib.io_create_watch(channel, GLib.IO_IN)

        def sock_connection_received(sock):
            return (True, sock.accept())

        async def accept_coro(future, conn):
            # Coroutine closing the accept socket if the future is cancelled
            try:
                return await future
            except CancelledError:
                sock.close()
                raise

        future = self._delayed(source, sock_connection_received, sock)
        return self.create_task(accept_coro(future, sock))

    def sock_recv(self, sock, nbytes, flags=0):
        channel = self._channel_from_socket(sock)

        def read_func(channel, nbytes):
            if not sock._closed:
                return sock.recv(nbytes, flags)

        return self._channel_read(channel, nbytes, read_func)

    def sock_recv_into(self, sock, buf, flags=0):
        channel = self._channel_from_socket(sock)

        def read_func(channel, nbytes):
            if not sock._closed:
                return sock.recv_into(buf, flags)

        return self._channel_read(channel, len(buf), read_func)

    def sock_recvfrom(self, sock, nbytes, flags=0):
        channel = self._channel_from_socket(sock)

        def read_func(channel, nbytes):
            if not sock._closed:
                return sock.recvfrom(nbytes, flags)

        return self._channel_read(channel, nbytes, read_func)

    def sock_sendall(self, sock, buf, flags=0):
        channel = self._channel_from_socket(sock)

        def write_func(channel, buf):
            if not sock._closed:
                return sock.send(buf, flags)

        return self._channel_write(channel, buf, write_func)

    def sock_sendallto(self, sock, buf, addr, flags=0):
        channel = self._channel_from_socket(sock)

        def write_func(channel, buf):
            if not sock._closed:
                return sock.sendto(buf, flags, addr)

        return self._channel_write(channel, buf, write_func)

    #####################################
    # Low-level GLib.Channel operations #
    #####################################
    def _channel_read(self, channel, nbytes, read_func=None):
        if read_func is None:

            def read_func(channel, nbytes):
                return channel.read(nbytes)

        source = GLib.io_create_watch(channel, GLib.IO_IN | GLib.IO_HUP)

        def channel_readable(read_func, channel, nbytes):
            return (True, read_func(channel, nbytes))

        return self._delayed(source, channel_readable, read_func, channel, nbytes)

    def _channel_write(self, channel, buf, write_func=None):
        if write_func is None:
            # note: channel.write doesn't raise BlockingIOError, instead it
            # returns 0
            # gi.overrides.GLib.write has an isinstance(buf, bytes) check, so
            # we can't give it a bytearray or a memoryview.
            def write_func(channel, buf):
                return channel.write(bytes(buf))

        buflen = len(buf)

        # Fast-path: If there is enough room in the OS buffer all data can be written synchronously
        try:
            nbytes = write_func(channel, buf)
        except BlockingIOError:
            nbytes = 0
        else:
            if nbytes >= len(buf):
                # All data was written synchronously in one go
                result = asyncio.Future(loop=self)
                result.set_result(nbytes)
                return result

        # Chop off the initially transmitted data and store result
        # as a bytearray for easier future modification
        buf = bytearray(buf[nbytes:])

        # Send the remaining data asynchronously as the socket becomes writable
        source = GLib.io_create_watch(channel, GLib.IO_OUT)

        def channel_writable(buflen, write_func, channel, buf):
            nbytes = write_func(channel, buf)
            if nbytes >= len(buf):
                return (True, buflen)
            else:
                del buf[0:nbytes]
                return (False, buflen)

        return self._delayed(source, channel_writable, buflen, write_func, channel, buf)

    def add_reader(self, fileobj, callback, *args):
        fd = self._fileobj_to_fd(fileobj)
        self._ensure_fd_no_transport(fd)

        self.remove_reader(fd)
        channel = self._channel_from_socket(fd)
        source = GLib.io_create_watch(
            channel, GLib.IO_IN | GLib.IO_HUP | GLib.IO_ERR | GLib.IO_NVAL
        )

        assert fd not in self._readers
        self._readers[fd] = GLibHandle(
            loop=self, source=source, repeat=True, callback=callback, args=args
        )

    def remove_reader(self, fileobj):
        fd = self._fileobj_to_fd(fileobj)
        self._ensure_fd_no_transport(fd)

        try:
            self._readers.pop(fd).cancel()
            return True
        except KeyError:
            return False

    def add_writer(self, fileobj, callback, *args):
        fd = self._fileobj_to_fd(fileobj)
        self._ensure_fd_no_transport(fd)

        self.remove_writer(fd)
        channel = self._channel_from_socket(fd)
        source = GLib.io_create_watch(channel, GLib.IO_OUT | GLib.IO_ERR | GLib.IO_NVAL)

        assert fd not in self._writers
        self._writers[fd] = GLibHandle(
            loop=self, source=source, repeat=True, callback=callback, args=args
        )

    def remove_writer(self, fileobj):
        fd = self._fileobj_to_fd(fileobj)
        self._ensure_fd_no_transport(fd)

        try:
            self._writers.pop(fd).cancel()
            return True
        except KeyError:
            return False


class GLibEventLoop(GLibBaseEventLoop):
    def __init__(self, *, context=None, application=None):
        self._application = application
        self._running = False
        self._argv = None

        super().__init__(context)
        if application is None:
            self._mainloop = GLib.MainLoop(self._context)

    def is_running(self):
        return self._running

    def run(self):
        recursive = self.is_running()
        if (
            not recursive
            and hasattr(events, "_get_running_loop")
            and events._get_running_loop()
        ):
            raise RuntimeError(
                "Cannot run the event loop while another loop is running"
            )

        if not recursive:
            self._running = True
            if hasattr(events, "_set_running_loop"):
                events._set_running_loop(self)

        try:
            if self._application is not None:
                self._application.run(self._argv)
            else:
                self._mainloop.run()
        finally:
            if not recursive:
                self._running = False
                if hasattr(events, "_set_running_loop"):
                    events._set_running_loop(None)

    def run_until_complete(self, future, **kw):
        """Run the event loop until a Future is done.

        Return the Future's result, or raise its exception.
        """

        def stop(f):
            self.stop()

        future = tasks.ensure_future(future, loop=self)
        future.add_done_callback(stop)
        try:
            self.run_forever(**kw)
        finally:
            future.remove_done_callback(stop)

        if not future.done():
            raise RuntimeError("Event loop stopped before Future completed.")

        return future.result()

    def run_forever(self, application=None, argv=None):
        """Run the event loop until stop() is called."""
        if application is not None:
            self.set_application(application)
        if argv is not None:
            self.set_argv(argv)

        if self.is_running():
            raise RuntimeError(
                "Recursively calling run_forever is forbidden. "
                "To recursively run the event loop, call run()."
            )

        if hasattr(self, "_mainloop") and hasattr(self._mainloop, "_quit_by_sigint"):
            del self._mainloop._quit_by_sigint

        try:
            self.run()
        finally:
            self.stop()

    # Methods scheduling callbacks.  All these return Handles.
    def call_soon(self, callback, *args, context=None):
        self._check_not_coroutine(callback, "call_soon")
        source = GLib.Idle()

        source.set_priority(GLib.PRIORITY_DEFAULT)

        return GLibHandle(
            loop=self,
            source=source,
            repeat=False,
            callback=callback,
            args=args,
            context=context,
        )

    call_soon_threadsafe = call_soon

    def call_later(self, delay, callback, *args, context=None):
        self._check_not_coroutine(callback, "call_later")

        return GLibHandle(
            loop=self,
            source=GLib.Timeout(delay * 1000) if delay > 0 else GLib.Idle(),
            repeat=False,
            callback=callback,
            args=args,
            context=context,
        )

    def call_at(self, when, callback, *args, context=None):
        self._check_not_coroutine(callback, "call_at")

        return self.call_later(when - self.time(), callback, *args, context=context)

    def time(self):
        return GLib.get_monotonic_time() / 1000000

    def stop(self):
        """Stop the inner-most invocation of the event loop.

        Typically, this will mean stopping the event loop completely.

        Note that due to the nature of GLib's main loop, stopping may not be
        immediate.
        """

        if self._application is not None:
            self._application.quit()
        else:
            self._mainloop.quit()

    def set_application(self, application):
        if not isinstance(application, Gio.Application):
            raise TypeError("application must be a Gio.Application object")
        if self._application is not None:
            raise ValueError("application is already set")
        if self.is_running():
            raise RuntimeError(
                "You can't add the application to a loop that's already running."
            )
        self._application = application
        self._policy._application = application
        del self._mainloop

    def set_argv(self, argv):
        """Sets argv to be passed to Gio.Application.run()"""
        self._argv = argv


class GLibEventLoopPolicy(events.AbstractEventLoopPolicy):
    """Default GLib event loop policy.

    In this policy, each thread has its own event loop.  However, we
    only automatically create an event loop by default for the main
    thread; other threads by default have no event loop.
    """

    EventLoopCls = GLibEventLoop

    # TODO add a parameter to synchronise with GLib's thread default contexts
    # (i.e., g_main_context_push_thread_default())
    def __init__(self, application=None):
        self._default_loop = None
        self._application = application
        self._watcher_lock = threading.Lock()

        self._watcher = None
        self._policy = asyncio.DefaultEventLoopPolicy()
        self._policy.new_event_loop = self.new_event_loop
        self.get_event_loop = self._policy.get_event_loop
        self.set_event_loop = self._policy.set_event_loop

    def get_child_watcher(self):
        if self._watcher is None:
            with self._watcher_lock:
                if self._watcher is None:
                    self._watcher = GLibChildWatcher()
        return self._watcher

    def set_child_watcher(self, watcher):
        """Set a child watcher.

        Must be an an instance of GLibChildWatcher, as it ties in with
        GLib appropriately.
        """

        if watcher is not None and not isinstance(watcher, GLibChildWatcher):
            raise TypeError("Only GLibChildWatcher is supported!")

        with self._watcher_lock:
            self._watcher = watcher

    def new_event_loop(self):
        """Create a new event loop and return it."""
        if (
            not self._default_loop
            and threading.main_thread().ident == threading.get_ident()
        ):
            loop = self.get_default_loop()
        else:
            loop = self.EventLoopCls()
        loop._policy = self

        return loop

    def get_default_loop(self):
        """Get the default event loop."""
        if not self._default_loop:
            self._default_loop = self._new_default_loop()
        return self._default_loop

    def _new_default_loop(self):
        loop = self.EventLoopCls(
            context=GLib.main_context_default(), application=self._application
        )
        loop._policy = self
        return loop
