# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later

# Imports
import functools
import asyncio
import inspect
import collections
import os
import types
import concurrent.futures
import warnings
import threading

from asyncio import futures, coroutines
from asyncio.tasks import ensure_future
from typing import Callable

# Tango imports
from tango.green import AbstractExecutor
from tango.utils import _is_coroutine_function, PyTangoThreadPoolExecutor

__all__ = (
    "AsyncioExecutor",
    "get_global_executor",
    "set_global_executor",
    "_switch_global_executor_to_thread",
)

_ALREADY_WARNED_FUNCTIONS = []


# Function removed from Python 3.11
# Taken from https://github.com/python/cpython/blob/3.10/Lib/asyncio/coroutines.py
# (without the _DEBUG part)
# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 Python Software Foundation;
# All Rights Reserved
def _coroutine(func):
    """Decorator to mark coroutines.
    If the coroutine is not yielded from before it is destroyed,
    an error message is logged.
    """
    if inspect.iscoroutinefunction(func):
        return func

    if inspect.isgeneratorfunction(func):
        coro = func
    else:

        @functools.wraps(func)
        def coro(*args, **kw):
            res = func(*args, **kw)
            if asyncio.isfuture(res) or inspect.isgenerator(res):
                res = yield from res
            else:
                # If 'res' is an awaitable, run it.
                try:
                    await_meth = res.__await__
                except AttributeError:
                    pass
                else:
                    if isinstance(res, collections.abc.Awaitable):
                        res = yield from await_meth()
            return res

    coro = types.coroutine(coro)
    wrapper = coro
    wrapper._is_coroutine = (
        asyncio.coroutines._is_coroutine
    )  # For iscoroutinefunction().
    return wrapper


# In Python 3.12 the legacy generator-based coroutines are not allowed for execution anymore
# Here we use modified run_coroutine_threadsafe method, which still can execute them
# Taken from https://github.com/python/cpython/blob/3.12/Lib/asyncio/tasks.py
# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023 Python Software Foundation;
# All Rights Reserved
def run_coroutine_threadsafe(coro, loop):
    """Submit a coroutine object to a given event loop.

    Return a concurrent.futures.Future to access the result.
    """
    if not coroutines.iscoroutine(coro) and not types.GeneratorType:
        raise TypeError("A coroutine object is required")

    future = concurrent.futures.Future()

    def callback():
        try:
            futures._chain_future(ensure_future(coro, loop=loop), future)
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            if future.set_running_or_notify_cancel():
                future.set_exception(exc)
            raise

    loop.call_soon_threadsafe(callback)
    return future


_PYTANGOTHREADPOOLEXECUTOR = None


def get_thread_pool_executor():
    global _PYTANGOTHREADPOOLEXECUTOR
    if _PYTANGOTHREADPOOLEXECUTOR is None:
        _PYTANGOTHREADPOOLEXECUTOR = PyTangoThreadPoolExecutor(
            thread_name_prefix="_PyTangoThreadPoolExecutor"
        )
    return _PYTANGOTHREADPOOLEXECUTOR


# Global executor
_MAIN_EXECUTOR = None
_THREAD_EXECUTORS = {}


def _switch_global_executor_to_thread():
    """
    internal PyTango function, use only if you sure, what you are doing!
    Used for correct behavior of TestDeviceContext
    Checks, that global executor belongs to the caller thread, and,
    if not - creates a new one and saves it as a new global
    """
    global _MAIN_EXECUTOR
    if _MAIN_EXECUTOR is not None and not _MAIN_EXECUTOR.in_executor_context():
        # we save current executor in the known subthread executors to be used later
        _THREAD_EXECUTORS[_MAIN_EXECUTOR.get_ident()] = _MAIN_EXECUTOR
        _MAIN_EXECUTOR = AsyncioExecutor()


def get_global_executor():
    global _MAIN_EXECUTOR
    if _MAIN_EXECUTOR is None:
        _MAIN_EXECUTOR = AsyncioExecutor()
    # the following patch is used for correct behavior of TestDeviceContext,
    # which has two different executors for main and device threads
    if not _MAIN_EXECUTOR.in_executor_context():
        ident = threading.get_ident(), os.getpid()
        if ident in _THREAD_EXECUTORS:
            return _THREAD_EXECUTORS[ident]

    return _MAIN_EXECUTOR


def set_global_executor(executor):
    global _MAIN_EXECUTOR
    _MAIN_EXECUTOR = executor


def _get_function_name(fn: Callable) -> str:
    if hasattr(fn, "__qualname__"):
        return fn.__qualname__
    elif hasattr(fn, "__name__"):
        return fn.__name__

    return f"{fn}"


# Asyncio executor
class AsyncioExecutor(AbstractExecutor):
    """Asyncio tango executor"""

    asynchronous = True
    default_wait = False

    def __init__(self, loop=None, subexecutor=None):
        super().__init__()
        if loop is None:
            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
        self.loop = loop
        self.subexecutor = (
            subexecutor if subexecutor is not None else get_thread_pool_executor()
        )

    def delegate(self, fn, *args, **kwargs):
        """Return the given operation as an asyncio future."""
        callback = functools.partial(fn, *args, **kwargs)
        coro = self.loop.run_in_executor(self.subexecutor, callback)
        return asyncio.ensure_future(coro)

    def access(self, accessor, timeout=None):
        """Return a result from an asyncio future."""
        if self.loop.is_running():
            raise RuntimeError("Loop is already running")
        coro = asyncio.wait_for(accessor, timeout)
        return self.loop.run_until_complete(coro)

    def submit(self, fn, *args, **kwargs):
        """Submit an operation"""
        if _is_coroutine_function(fn):
            return run_coroutine_threadsafe(fn(*args, **kwargs), self.loop)
        else:
            # we leave this part of the code to support legacy servers
            name = _get_function_name(fn)
            if name not in _ALREADY_WARNED_FUNCTIONS:
                _ALREADY_WARNED_FUNCTIONS.append(name)
                warnings.warn(
                    f"Sync {name} function called: support of "
                    f"sync functions in PyTango's Asyncio mode is "
                    f"deprecated. Use 'async def' instead of 'def'.",
                    DeprecationWarning,
                )
            corofn = _coroutine(lambda: fn(*args, **kwargs))
            return run_coroutine_threadsafe(corofn(), self.loop)

    def execute(self, fn, *args, **kwargs):
        """Execute an operation and return the result."""
        if self.in_executor_context():
            if _is_coroutine_function(fn):
                return fn(*args, **kwargs)
            else:
                # we leave this part of the code to support legacy servers
                name = _get_function_name(fn)
                if name not in _ALREADY_WARNED_FUNCTIONS:
                    _ALREADY_WARNED_FUNCTIONS.append(name)
                    warnings.warn(
                        f"Sync {name} function called: support of "
                        f"sync functions in PyTango's Asyncio mode is "
                        f"deprecated. Use 'async def' instead of 'def'.",
                        DeprecationWarning,
                    )
                corofn = _coroutine(lambda: fn(*args, **kwargs))
                return corofn()
        future = self.submit(fn, *args, **kwargs)
        return future.result()
