"""
More complex tests that exercise stream resetting functionality to validate
that connection state is appropriately maintained.

Specifically, these tests validate that streams that have been reset accurately
keep track of connection-level state.
"""
from __future__ import annotations

import pytest

import h2.connection
import h2.errors
import h2.events


class TestStreamReset:
    """
    Tests for resetting streams.
    """

    example_request_headers = [
        (b":authority", b"example.com"),
        (b":path", b"/"),
        (b":scheme", b"https"),
        (b":method", b"GET"),
    ]
    example_response_headers = [
        (b":status", b"200"),
        (b"server", b"fake-serv/0.1.0"),
        (b"content-length", b"0"),
    ]

    def test_reset_stream_keeps_header_state_correct(self, frame_factory) -> None:
        """
        A stream that has been reset still affects the header decoder.
        """
        c = h2.connection.H2Connection()
        c.initiate_connection()
        c.send_headers(stream_id=1, headers=self.example_request_headers)
        c.reset_stream(stream_id=1)
        c.send_headers(stream_id=3, headers=self.example_request_headers)
        c.clear_outbound_data_buffer()

        f = frame_factory.build_headers_frame(
            headers=self.example_response_headers, stream_id=1,
        )
        rst_frame = frame_factory.build_rst_stream_frame(
            1, h2.errors.ErrorCodes.STREAM_CLOSED,
        )
        events = c.receive_data(f.serialize())
        assert not events
        assert c.data_to_send() == rst_frame.serialize()

        # This works because the header state should be intact from the headers
        # frame that was send on stream 1, so they should decode cleanly.
        f = frame_factory.build_headers_frame(
            headers=self.example_response_headers, stream_id=3,
        )
        event = c.receive_data(f.serialize())[0]

        assert isinstance(event, h2.events.ResponseReceived)
        assert event.stream_id == 3
        assert event.headers == self.example_response_headers

    @pytest.mark.parametrize(("close_id", "other_id"), [(1, 3), (3, 1)])
    def test_reset_stream_keeps_flow_control_correct(self,
                                                     close_id,
                                                     other_id,
                                                     frame_factory) -> None:
        """
        A stream that has been reset does not affect the connection flow
        control window.
        """
        c = h2.connection.H2Connection()
        c.initiate_connection()
        c.send_headers(stream_id=1, headers=self.example_request_headers)
        c.send_headers(stream_id=3, headers=self.example_request_headers)

        # Record the initial window size.
        initial_window = c.remote_flow_control_window(stream_id=other_id)

        f = frame_factory.build_headers_frame(
            headers=self.example_response_headers, stream_id=close_id,
        )
        c.receive_data(f.serialize())
        c.reset_stream(stream_id=close_id)
        c.clear_outbound_data_buffer()

        f = frame_factory.build_data_frame(
            data=b"some data",
            stream_id=close_id,
        )
        c.receive_data(f.serialize())

        expected = frame_factory.build_rst_stream_frame(
            stream_id=close_id,
            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
        ).serialize()
        assert c.data_to_send() == expected

        new_window = c.remote_flow_control_window(stream_id=other_id)
        assert initial_window - len(b"some data") == new_window

    @pytest.mark.parametrize("clear_streams", [True, False])
    def test_reset_stream_automatically_resets_pushed_streams(self,
                                                              frame_factory,
                                                              clear_streams) -> None:
        """
        Resetting a stream causes RST_STREAM frames to be automatically emitted
        to close any streams pushed after the reset.
        """
        c = h2.connection.H2Connection()
        c.initiate_connection()
        c.send_headers(stream_id=1, headers=self.example_request_headers)
        c.reset_stream(stream_id=1)
        c.clear_outbound_data_buffer()

        if clear_streams:
            # Call open_outbound_streams to force the connection to clean
            # closed streams.
            c.open_outbound_streams

        f = frame_factory.build_push_promise_frame(
            stream_id=1,
            promised_stream_id=2,
            headers=self.example_request_headers,
        )
        events = c.receive_data(f.serialize())
        assert not events

        f = frame_factory.build_rst_stream_frame(
            stream_id=2,
            error_code=h2.errors.ErrorCodes.REFUSED_STREAM,
        )
        assert c.data_to_send() == f.serialize()
