"""
These tests validate the state machines directly. Writing meaningful tests for
this case can be tricky, so the majority of these tests use Hypothesis to try
to talk about general behaviours rather than specific cases.
"""
from __future__ import annotations

import pytest
from hypothesis import given
from hypothesis.strategies import sampled_from

import h2.connection
import h2.exceptions
import h2.stream


class TestConnectionStateMachine:
    """
    Tests of the connection state machine.
    """

    @given(state=sampled_from(h2.connection.ConnectionState),
           input_=sampled_from(h2.connection.ConnectionInputs))
    def test_state_transitions(self, state, input_) -> None:
        c = h2.connection.H2ConnectionStateMachine()
        c.state = state

        try:
            c.process_input(input_)
        except h2.exceptions.ProtocolError:
            assert c.state == h2.connection.ConnectionState.CLOSED
        else:
            assert c.state in h2.connection.ConnectionState

    def test_state_machine_only_allows_connection_states(self) -> None:
        """
        The Connection state machine only allows ConnectionState inputs.
        """
        c = h2.connection.H2ConnectionStateMachine()

        with pytest.raises(ValueError):
            c.process_input(1)

    @pytest.mark.parametrize(
        "state",
        (
            s for s in h2.connection.ConnectionState
            if s != h2.connection.ConnectionState.CLOSED
        ),
    )
    @pytest.mark.parametrize(
        "input_",
        [
            h2.connection.ConnectionInputs.RECV_PRIORITY,
            h2.connection.ConnectionInputs.SEND_PRIORITY,
        ],
    )
    def test_priority_frames_allowed_in_all_states(self, state, input_) -> None:
        """
        Priority frames can be sent/received in all connection states except
        closed.
        """
        c = h2.connection.H2ConnectionStateMachine()
        c.state = state

        c.process_input(input_)


class TestStreamStateMachine:
    """
    Tests of the stream state machine.
    """

    @given(state=sampled_from(h2.stream.StreamState),
           input_=sampled_from(h2.stream.StreamInputs))
    def test_state_transitions(self, state, input_) -> None:
        s = h2.stream.H2StreamStateMachine(stream_id=1)
        s.state = state

        try:
            s.process_input(input_)
        except h2.exceptions.StreamClosedError:
            # This can only happen for streams that started in the closed
            # state OR where the input was RECV_DATA and the state was not
            # OPEN or HALF_CLOSED_LOCAL OR where the state was
            # HALF_CLOSED_REMOTE and a frame was received.
            if state == h2.stream.StreamState.CLOSED:
                assert s.state == h2.stream.StreamState.CLOSED
            elif input_ == h2.stream.StreamInputs.RECV_DATA:
                assert s.state == h2.stream.StreamState.CLOSED
                assert state not in (
                    h2.stream.StreamState.OPEN,
                    h2.stream.StreamState.HALF_CLOSED_LOCAL,
                )
            elif state == h2.stream.StreamState.HALF_CLOSED_REMOTE:
                assert input_ in (
                    h2.stream.StreamInputs.RECV_HEADERS,
                    h2.stream.StreamInputs.RECV_PUSH_PROMISE,
                    h2.stream.StreamInputs.RECV_DATA,
                    h2.stream.StreamInputs.RECV_CONTINUATION,
                )
        except h2.exceptions.ProtocolError:
            assert s.state == h2.stream.StreamState.CLOSED
        else:
            assert s.state in h2.stream.StreamState

    def test_state_machine_only_allows_stream_states(self) -> None:
        """
        The Stream state machine only allows StreamState inputs.
        """
        s = h2.stream.H2StreamStateMachine(stream_id=1)

        with pytest.raises(ValueError):
            s.process_input(1)

    def test_stream_state_machine_forbids_pushes_on_server_streams(self) -> None:
        """
        Streams where this peer is a server do not allow receiving pushed
        frames.
        """
        s = h2.stream.H2StreamStateMachine(stream_id=1)
        s.process_input(h2.stream.StreamInputs.RECV_HEADERS)

        with pytest.raises(h2.exceptions.ProtocolError):
            s.process_input(h2.stream.StreamInputs.RECV_PUSH_PROMISE)

    def test_stream_state_machine_forbids_sending_pushes_from_clients(self) -> None:
        """
        Streams where this peer is a client do not allow sending pushed frames.
        """
        s = h2.stream.H2StreamStateMachine(stream_id=1)
        s.process_input(h2.stream.StreamInputs.SEND_HEADERS)

        with pytest.raises(h2.exceptions.ProtocolError):
            s.process_input(h2.stream.StreamInputs.SEND_PUSH_PROMISE)

    @pytest.mark.parametrize(
        "input_",
        [
            h2.stream.StreamInputs.SEND_HEADERS,
            h2.stream.StreamInputs.SEND_PUSH_PROMISE,
            h2.stream.StreamInputs.SEND_RST_STREAM,
            h2.stream.StreamInputs.SEND_DATA,
            h2.stream.StreamInputs.SEND_WINDOW_UPDATE,
            h2.stream.StreamInputs.SEND_END_STREAM,
        ],
    )
    def test_cannot_send_on_closed_streams(self, input_) -> None:
        """
        Sending anything but a PRIORITY frame is forbidden on closed streams.
        """
        c = h2.stream.H2StreamStateMachine(stream_id=1)
        c.state = h2.stream.StreamState.CLOSED

        expected_error = (
            h2.exceptions.ProtocolError
            if input_ == h2.stream.StreamInputs.SEND_PUSH_PROMISE
            else h2.exceptions.StreamClosedError
        )

        with pytest.raises(expected_error):
            c.process_input(input_)
