1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
"""
These tests run two entities, a client and a server, in parallel threads. These
two entities talk to each other, running what amounts to a number of carefully
controlled simulations of real flows.
This is to ensure that the stack as a whole behaves intelligently in both
client and server cases.
These tests are long, complex, and somewhat brittle, so they aren't in general
recommended for writing the majority of test cases. Their purposes is primarily
to validate that the top-level API of the library behaves as described.
We should also consider writing helper functions to reduce the complexity of
these tests, so that they can be written more easily, as they are remarkably
useful.
"""
from __future__ import annotations
import pytest
import h2.config
import h2.connection
import h2.events
import h2.settings
from . import coroutine_tests
class TestCommunication(coroutine_tests.CoroutineTestCase):
"""
Test that two communicating state machines can work together.
"""
server_config = h2.config.H2Configuration(client_side=False)
request_headers = [
(":method", "GET"),
(":path", "/"),
(":authority", "example.com"),
(":scheme", "https"),
("user-agent", "test-client/0.1.0"),
]
request_headers_bytes = [
(b":method", b"GET"),
(b":path", b"/"),
(b":authority", b"example.com"),
(b":scheme", b"https"),
(b"user-agent", b"test-client/0.1.0"),
]
response_headers = [
(b":status", b"204"),
(b"server", b"test-server/0.1.0"),
(b"content-length", b"0"),
]
@pytest.mark.parametrize("request_headers", [request_headers, request_headers_bytes])
def test_basic_request_response(self, request_headers) -> None:
"""
A request issued by hyper-h2 can be responded to by hyper-h2.
"""
def client():
c = h2.connection.H2Connection()
# Do the handshake. First send the preamble.
c.initiate_connection()
data = yield c.data_to_send()
# Next, handle the remote preamble.
events = c.receive_data(data)
assert len(events) == 2
assert isinstance(events[0], h2.events.SettingsAcknowledged)
assert isinstance(events[1], h2.events.RemoteSettingsChanged)
changed = events[1].changed_settings
assert (
changed[
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
].new_value == 100
)
# Send a request.
events = c.send_headers(1, request_headers, end_stream=True)
assert not events
data = yield c.data_to_send()
# Validate the response.
events = c.receive_data(data)
assert len(events) == 2
assert isinstance(events[0], h2.events.ResponseReceived)
assert events[0].stream_id == 1
assert events[0].headers == self.response_headers
assert isinstance(events[1], h2.events.StreamEnded)
assert events[1].stream_id == 1
@self.server
def server():
c = h2.connection.H2Connection(config=self.server_config)
# First, read for the preamble.
data = yield
events = c.receive_data(data)
assert len(events) == 1
assert isinstance(events[0], h2.events.RemoteSettingsChanged)
changed = events[0].changed_settings
assert (
changed[
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
].new_value == 100
)
# Send our preamble back.
c.initiate_connection()
data = yield c.data_to_send()
# Listen for the request.
events = c.receive_data(data)
assert len(events) == 3
assert isinstance(events[0], h2.events.SettingsAcknowledged)
assert isinstance(events[1], h2.events.RequestReceived)
assert events[1].stream_id == 1
assert events[1].headers == self.request_headers_bytes
assert isinstance(events[2], h2.events.StreamEnded)
assert events[2].stream_id == 1
# Send our response.
events = c.send_headers(1, self.response_headers, end_stream=True)
assert not events
yield c.data_to_send()
self.run_until_complete(client(), server())
|