from functools import partial
import itertools
from pydevd import AbstractSingleNotificationBehavior
import time

import pytest

from _pydevd_bundle.pydevd_daemon_thread import run_as_pydevd_daemon_thread
from tests_python.debugger_unittest import CMD_THREAD_SUSPEND, CMD_STEP_OVER, CMD_SET_BREAK
from _pydev_bundle.pydev_override import overrides
import threading

STATE_RUN = 1
STATE_SUSPEND = 2


class _ThreadInfo(object):
    next_thread_id = partial(next, itertools.count())

    def __init__(self):
        self.state = STATE_RUN
        self.thread = threading.Thread()
        self.thread_id = self.next_thread_id()


class _CustomSingleNotificationBehavior(AbstractSingleNotificationBehavior):
    NOTIFY_OF_PAUSE_TIMEOUT = 0.01

    __slots__ = AbstractSingleNotificationBehavior.__slots__ + ["notification_queue"]

    def __init__(self, py_db):
        try:
            from queue import Queue
        except ImportError:
            from Queue import Queue
        super(_CustomSingleNotificationBehavior, self).__init__(py_db)
        self.notification_queue = Queue()

    @overrides(AbstractSingleNotificationBehavior.send_resume_notification)
    def send_resume_notification(self, *args, **kwargs):
        # print('put resume', threading.current_thread())
        self.notification_queue.put("resume")

    @overrides(AbstractSingleNotificationBehavior.send_suspend_notification)
    def send_suspend_notification(self, *args, **kwargs):
        # print('put suspend', threading.current_thread())
        self.notification_queue.put("suspend")

    def do_wait_suspend(self, thread_info, stop_reason):
        with self.notify_thread_suspended(thread_info.thread_id, thread_info.thread, stop_reason=stop_reason):
            while thread_info.state == STATE_SUSPEND:
                time.sleep(0.1)


@pytest.fixture
def _dummy_pydb():
    return _DummyPyDB()


@pytest.fixture(
    name="single_notification_behavior",
    #     params=range(50)  #  uncomment to run the tests many times.
)
def _single_notification_behavior(_dummy_pydb):
    single_notification_behavior = _CustomSingleNotificationBehavior(_dummy_pydb)
    return single_notification_behavior


@pytest.fixture(name="notification_queue")
def _notification_queue(single_notification_behavior):
    return single_notification_behavior.notification_queue


def wait_for_notification(notification_queue, msg):
    __tracebackhide__ = True
    try:
        from Queue import Empty
    except ImportError:
        from queue import Empty
    try:
        found = notification_queue.get(timeout=2)
        assert found == msg
    except Empty:
        raise AssertionError("Timed out while waiting for %s notification." % (msg,))


def join_thread(t):
    __tracebackhide__ = True
    t.join(2)
    assert not t.is_alive(), "Thread still alive after timeout.s"


class _DummyPyDB(object):
    def __init__(self):
        from _pydevd_bundle.pydevd_timeout import TimeoutTracker

        self.created_pydb_daemon_threads = {}
        self.timeout_tracker = TimeoutTracker(self)


def test_single_notification_1(single_notification_behavior, notification_queue):
    """
    1. Resume before pausing 2nd thread

    - user pauses all (2) threads
    - break first -> send notification
    - user presses continue all before second is paused
      - 2nd should not pause nor send notification
      - resume all notification should be sent
    """
    thread_info1 = _ThreadInfo()
    thread_info2 = _ThreadInfo()

    # pause all = set_suspend both
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    dummy_py_db = _DummyPyDB()

    t = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    thread_info1.state = STATE_RUN
    # Set 2 to run before it starts (should not send additional message).
    thread_info2.state = STATE_RUN
    t.join()

    assert notification_queue.qsize() == 2
    assert notification_queue.get() == "suspend"
    assert notification_queue.get() == "resume"
    assert notification_queue.qsize() == 0

    # Run thread 2 only now (no additional notification).
    t = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    t.join()

    assert notification_queue.qsize() == 0


def test_single_notification_2(single_notification_behavior, notification_queue):
    """
    2. Pausing all then stepping

    - user pauses all (2) threads
    - break first -> send notification
    - break second (no notification)
    - user steps in second
    - suspend in second -> send resume/pause notification on step
    """
    thread_info1 = _ThreadInfo()
    thread_info2 = _ThreadInfo()

    dummy_py_db = _DummyPyDB()

    # pause all = set_suspend both
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    # Leave both in break mode
    t1 = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    wait_for_notification(notification_queue, "suspend")

    t2 = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info2, CMD_THREAD_SUSPEND)

    # Step would actually be set state to STEP, which would result in resuming
    # and then stopping again as if it was a SUSPEND (which calls a set_supend again with
    # the step mode).
    thread_info2.state = STATE_RUN
    join_thread(t2)
    wait_for_notification(notification_queue, "resume")

    single_notification_behavior.increment_suspend_time()
    thread_info2.state = STATE_SUSPEND
    t2 = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info2, CMD_STEP_OVER)
    wait_for_notification(notification_queue, "suspend")

    thread_info1.state = STATE_RUN
    thread_info2.state = STATE_RUN
    # First does a resume notification, the other remains quiet.
    wait_for_notification(notification_queue, "resume")

    join_thread(t2)
    join_thread(t1)
    assert notification_queue.qsize() == 0


def test_single_notification_3(single_notification_behavior, notification_queue, _dummy_pydb):
    """
    3. Deadlocked thread

    - user adds breakpoint in thread.join() -- just before threads becomes deadlocked
    - breakpoint hits -> send notification
      - pauses 2nd thread (no notification)
    - user steps over thead.join() -> never completes
    - user presses pause
      - second thread is already stopped
        - send notification regarding 2nd thread (still stopped).
    - leave both threads running: no suspend should be shown as there are no stopped threads
    - when thread is paused, show suspend notification
    """

    # i.e.: stopping at breakpoint
    thread_info1 = _ThreadInfo()
    single_notification_behavior.increment_suspend_time()
    thread_info1.state = STATE_SUSPEND
    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_SET_BREAK)

    # i.e.: stop because of breakpoint
    thread_info2 = _ThreadInfo()
    thread_info2.state = STATE_SUSPEND
    t2 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info2, CMD_SET_BREAK)

    wait_for_notification(notification_queue, "suspend")

    # i.e.: step over (thread 2 is still suspended and this one never completes)
    thread_info1.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")

    join_thread(t1)

    # On pause we should notify that the thread 2 is suspended (after timeout if no other thread suspends first).
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND
    wait_for_notification(notification_queue, "suspend")

    thread_info2.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")

    join_thread(t2)
    assert notification_queue.qsize() == 0
    assert not single_notification_behavior._suspended_thread_id_to_thread

    # Now, no threads are running and pause is pressed
    # (maybe we could do a thread dump in this case as this
    # means nothing is stopped after pause is requested and
    # the timeout elapses).
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    time.sleep(single_notification_behavior.NOTIFY_OF_PAUSE_TIMEOUT * 2)
    assert notification_queue.qsize() == 0

    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    wait_for_notification(notification_queue, "suspend")
    thread_info1.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")
    join_thread(t1)
    assert notification_queue.qsize() == 0


def test_single_notification_4(single_notification_behavior, notification_queue, _dummy_pydb):
    """
    4. Delayed stop

    - user presses pause
    - stops first (2nd keeps running)
    - user steps on first
    - 2nd hits before first ends step (should not send any notification)
    - when step finishes send notification
    """
    thread_info1 = _ThreadInfo()
    thread_info2 = _ThreadInfo()

    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    wait_for_notification(notification_queue, "suspend")
    thread_info1.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")
    join_thread(t1)

    t2 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    time.sleep(0.1)
    assert notification_queue.qsize() == 0

    single_notification_behavior.increment_suspend_time()
    thread_info1.state = STATE_SUSPEND
    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_STEP_OVER)
    wait_for_notification(notification_queue, "suspend")
    thread_info2.state = STATE_RUN
    thread_info1.state = STATE_RUN
    join_thread(t1)
    join_thread(t2)
    wait_for_notification(notification_queue, "resume")
    assert notification_queue.qsize() == 0
