import pickle
import signal
import sys
import threading
from typing import Union

import pytest

from anyio import (
    CancelScope,
    CapacityLimiter,
    Condition,
    Event,
    Lock,
    Semaphore,
    TaskInfo,
    create_blocking_portal,
    create_memory_object_stream,
    create_task_group,
    current_default_worker_thread_limiter,
    current_effective_deadline,
    current_time,
    fail_after,
    get_current_task,
    get_running_tasks,
    maybe_async,
    maybe_async_cm,
    move_on_after,
    open_signal_receiver,
    run_async_from_thread,
    run_sync_from_thread,
    run_sync_in_worker_thread,
    to_thread,
)
from anyio._core._compat import (
    DeprecatedAwaitable,
    DeprecatedAwaitableFloat,
    DeprecatedAwaitableList,
)

pytestmark = pytest.mark.anyio


class TestMaybeAsync:
    async def test_cancel_scope(self) -> None:
        with CancelScope() as scope:
            await maybe_async(scope.cancel())

    async def test_current_time(self) -> None:
        value = await maybe_async(current_time())
        assert type(value) is float

    async def test_current_effective_deadline(self) -> None:
        value = await maybe_async(current_effective_deadline())
        assert type(value) is float

    async def test_get_running_tasks(self) -> None:
        tasks = await maybe_async(get_running_tasks())
        assert type(tasks) is list

    async def test_get_current_task(self) -> None:
        task = await maybe_async(get_current_task())
        assert type(task) is TaskInfo


async def test_maybe_async_cm() -> None:
    async with maybe_async_cm(CancelScope()):
        pass


class TestDeprecations:
    async def test_current_effective_deadlinee(self) -> None:
        with pytest.deprecated_call():
            deadline = await current_effective_deadline()

        assert isinstance(deadline, float)

    async def test_current_time(self) -> None:
        with pytest.deprecated_call():
            timestamp = await current_time()

        assert isinstance(timestamp, float)

    async def test_get_current_task(self) -> None:
        with pytest.deprecated_call():
            task = await get_current_task()

        assert isinstance(task, TaskInfo)

    async def test_running_tasks(self) -> None:
        with pytest.deprecated_call():
            tasks = await get_running_tasks()

        assert tasks
        assert all(isinstance(task, TaskInfo) for task in tasks)

    @pytest.mark.skipif(
        sys.platform == "win32", reason="Signal delivery cannot be tested on Windows"
    )
    async def test_open_signal_receiver(self) -> None:
        with pytest.deprecated_call():
            async with open_signal_receiver(signal.SIGINT):
                pass

    async def test_cancelscope_cancel(self) -> None:
        with CancelScope() as scope:
            with pytest.deprecated_call():
                await scope.cancel()

    async def test_taskgroup_cancel(self) -> None:
        async with create_task_group() as tg:
            with pytest.deprecated_call():
                await tg.cancel_scope.cancel()

    async def test_capacitylimiter_acquire_nowait(self) -> None:
        limiter = CapacityLimiter(1)
        with pytest.deprecated_call():
            await limiter.acquire_nowait()

    async def test_capacitylimiter_acquire_on_behalf_of_nowait(self) -> None:
        limiter = CapacityLimiter(1)
        with pytest.deprecated_call():
            await limiter.acquire_on_behalf_of_nowait(object())

    async def test_capacitylimiter_set_total_tokens(self) -> None:
        limiter = CapacityLimiter(1)
        with pytest.deprecated_call():
            await limiter.set_total_tokens(3)

        assert limiter.total_tokens == 3

    async def test_condition_release(self) -> None:
        condition = Condition()
        condition.acquire_nowait()
        with pytest.deprecated_call():
            await condition.release()

    async def test_event_set(self) -> None:
        event = Event()
        with pytest.deprecated_call():
            await event.set()

    async def test_lock_release(self) -> None:
        lock = Lock()
        lock.acquire_nowait()
        with pytest.deprecated_call():
            await lock.release()

    async def test_memory_object_stream_send_nowait(self) -> None:
        send, receive = create_memory_object_stream(1)
        with pytest.deprecated_call():
            await send.send_nowait(None)

    async def test_semaphore_release(self) -> None:
        semaphore = Semaphore(1)
        semaphore.acquire_nowait()
        with pytest.deprecated_call():
            await semaphore.release()

    async def test_move_on_after(self) -> None:
        with pytest.deprecated_call():
            async with move_on_after(0):
                pass

    async def test_fail_after(self) -> None:
        with pytest.raises(TimeoutError), pytest.deprecated_call():
            async with fail_after(0):
                pass

    async def test_run_sync_in_worker_thread(self) -> None:
        with pytest.deprecated_call():
            thread_id = await run_sync_in_worker_thread(threading.get_ident)
            assert thread_id != threading.get_ident()

    async def test_run_async_from_thread(self) -> None:
        async def get_ident() -> int:
            return threading.get_ident()

        def thread_func() -> int:
            with pytest.deprecated_call():
                return run_async_from_thread(get_ident)

        assert await to_thread.run_sync(thread_func) == threading.get_ident()

    async def test_run_sync_from_thread(self) -> None:
        def thread_func() -> int:
            with pytest.deprecated_call():
                return run_sync_from_thread(threading.get_ident)

        assert await to_thread.run_sync(thread_func) == threading.get_ident()

    async def test_current_default_worker_thread_limiter(self) -> None:
        with pytest.deprecated_call():
            default_limiter = to_thread.current_default_thread_limiter()
            assert current_default_worker_thread_limiter() is default_limiter

    async def test_create_blocking_portal(self) -> None:
        with pytest.deprecated_call():
            async with create_blocking_portal():
                pass


class TestPickle:
    def test_deprecated_awaitable_none(self) -> None:
        def fn() -> DeprecatedAwaitable:
            return DeprecatedAwaitable(fn)

        obj = fn()
        result = pickle.loads(pickle.dumps(obj))
        assert result is None

    def test_deprecated_awaitable_float(self) -> None:
        def fn() -> DeprecatedAwaitableFloat:
            return DeprecatedAwaitableFloat(2.3, fn)

        obj = fn()
        result = pickle.loads(pickle.dumps(obj))
        assert type(result) is float
        assert result == 2.3

    def test_deprecated_awaitable_list(self) -> None:
        def fn() -> DeprecatedAwaitableList[Union[str, int]]:
            return DeprecatedAwaitableList([1, "a"], func=fn)

        obj = fn()
        result = pickle.loads(pickle.dumps(obj))
        assert type(result) is list
        assert result == [1, "a"]
