1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
"""
Tests for the various utility functions provided by hyper-h2.
"""
from __future__ import annotations
import pytest
import h2.config
import h2.connection
import h2.errors
import h2.events
import h2.exceptions
from h2.utilities import SizeLimitDict, extract_method_header
class TestGetNextAvailableStreamID:
"""
Tests for the ``H2Connection.get_next_available_stream_id`` method.
"""
example_request_headers = [
(":authority", "example.com"),
(":path", "/"),
(":scheme", "https"),
(":method", "GET"),
]
example_response_headers = [
(":status", "200"),
("server", "fake-serv/0.1.0"),
]
server_config = h2.config.H2Configuration(client_side=False)
def test_returns_correct_sequence_for_clients(self, frame_factory) -> None:
"""
For a client connection, the correct sequence of stream IDs is
returned.
"""
# Running the exhaustive version of this test (all 1 billion available
# stream IDs) is too painful. For that reason, we validate that the
# original sequence is right for the first few thousand, and then just
# check that it terminates properly.
#
# Make sure that the streams get cleaned up: 8k streams floating
# around would make this test memory-hard, and it's not supposed to be
# a test of how much RAM your machine has.
c = h2.connection.H2Connection()
c.initiate_connection()
initial_sequence = range(1, 2**13, 2)
for expected_stream_id in initial_sequence:
stream_id = c.get_next_available_stream_id()
assert stream_id == expected_stream_id
c.send_headers(
stream_id=stream_id,
headers=self.example_request_headers,
end_stream=True,
)
f = frame_factory.build_headers_frame(
headers=self.example_response_headers,
stream_id=stream_id,
flags=["END_STREAM"],
)
c.receive_data(f.serialize())
c.clear_outbound_data_buffer()
# Jump up to the last available stream ID. Don't clean up the stream
# here because who cares about one stream.
last_client_id = 2**31 - 1
c.send_headers(
stream_id=last_client_id,
headers=self.example_request_headers,
end_stream=True,
)
with pytest.raises(h2.exceptions.NoAvailableStreamIDError):
c.get_next_available_stream_id()
def test_returns_correct_sequence_for_servers(self, frame_factory) -> None:
"""
For a server connection, the correct sequence of stream IDs is
returned.
"""
# Running the exhaustive version of this test (all 1 billion available
# stream IDs) is too painful. For that reason, we validate that the
# original sequence is right for the first few thousand, and then just
# check that it terminates properly.
#
# Make sure that the streams get cleaned up: 8k streams floating
# around would make this test memory-hard, and it's not supposed to be
# a test of how much RAM your machine has.
c = h2.connection.H2Connection(config=self.server_config)
c.initiate_connection()
c.receive_data(frame_factory.preamble())
f = frame_factory.build_headers_frame(
headers=self.example_request_headers,
)
c.receive_data(f.serialize())
initial_sequence = range(2, 2**13, 2)
for expected_stream_id in initial_sequence:
stream_id = c.get_next_available_stream_id()
assert stream_id == expected_stream_id
c.push_stream(
stream_id=1,
promised_stream_id=stream_id,
request_headers=self.example_request_headers,
)
c.send_headers(
stream_id=stream_id,
headers=self.example_response_headers,
end_stream=True,
)
c.clear_outbound_data_buffer()
# Jump up to the last available stream ID. Don't clean up the stream
# here because who cares about one stream.
last_server_id = 2**31 - 2
c.push_stream(
stream_id=1,
promised_stream_id=last_server_id,
request_headers=self.example_request_headers,
)
with pytest.raises(h2.exceptions.NoAvailableStreamIDError):
c.get_next_available_stream_id()
def test_does_not_increment_without_stream_send(self) -> None:
"""
If a new stream isn't actually created, the next stream ID doesn't
change.
"""
c = h2.connection.H2Connection()
c.initiate_connection()
first_stream_id = c.get_next_available_stream_id()
second_stream_id = c.get_next_available_stream_id()
assert first_stream_id == second_stream_id
c.send_headers(
stream_id=first_stream_id,
headers=self.example_request_headers,
)
third_stream_id = c.get_next_available_stream_id()
assert third_stream_id == (first_stream_id + 2)
class TestExtractHeader:
example_headers_with_bytes = [
(b":authority", b"example.com"),
(b":path", b"/"),
(b":scheme", b"https"),
(b":method", b"GET"),
]
def test_extract_header_method(self) -> None:
assert extract_method_header(
self.example_headers_with_bytes,
) == b"GET"
def test_size_limit_dict_limit() -> None:
dct = SizeLimitDict(size_limit=2)
dct[1] = 1
dct[2] = 2
assert len(dct) == 2
assert dct[1] == 1
assert dct[2] == 2
dct[3] = 3
assert len(dct) == 2
assert dct[2] == 2
assert dct[3] == 3
assert 1 not in dct
def test_size_limit_dict_limit_init() -> None:
initial_dct = {
1: 1,
2: 2,
3: 3,
}
dct = SizeLimitDict(initial_dct, size_limit=2)
assert len(dct) == 2
def test_size_limit_dict_no_limit() -> None:
dct = SizeLimitDict(size_limit=None)
dct[1] = 1
dct[2] = 2
assert len(dct) == 2
assert dct[1] == 1
assert dct[2] == 2
dct[3] = 3
assert len(dct) == 3
assert dct[1] == 1
assert dct[2] == 2
assert dct[3] == 3
|