# -*- coding: utf-8 -*-
import itertools
import pytest

from hypothesis import given
from hypothesis.strategies import text, binary, sets, one_of

from hpack import (
    Encoder,
    Decoder,
    HeaderTuple,
    NeverIndexedHeaderTuple,
    HPACKDecodingError,
    InvalidTableIndex,
    OversizedHeaderListError,
    InvalidTableSizeError,
)
from hpack.hpack import _dict_to_iterable, _to_bytes


class TestHPACKEncoder:
    # These tests are stolen entirely from the IETF specification examples.
    def test_literal_header_field_with_indexing(self):
        """
        The header field representation uses a literal name and a literal
        value.
        """
        e = Encoder()
        header_set = {'custom-key': 'custom-header'}
        result = b'\x40\x0acustom-key\x0dcustom-header'

        assert e.encode(header_set, huffman=False) == result
        assert list(e.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8'))
            for n, v in header_set.items()
        ]

    def test_sensitive_headers(self):
        """
        Test encoding header values
        """
        e = Encoder()
        result = (b'\x82\x14\x88\x63\xa1\xa9' +
                  b'\x32\x08\x73\xd0\xc7\x10' +
                  b'\x87\x25\xa8\x49\xe9\xea' +
                  b'\x5f\x5f\x89\x41\x6a\x41' +
                  b'\x92\x6e\xe5\x35\x52\x9f')
        header_set = [
            (':method', 'GET', True),
            (':path', '/jimiscool/', True),
            ('customkey', 'sensitiveinfo', True),
        ]
        assert e.encode(header_set, huffman=True) == result

    def test_non_sensitive_headers_with_header_tuples(self):
        """
        A header field stored in a HeaderTuple emits a representation that
        allows indexing.
        """
        e = Encoder()
        result = (b'\x82\x44\x88\x63\xa1\xa9' +
                  b'\x32\x08\x73\xd0\xc7\x40' +
                  b'\x87\x25\xa8\x49\xe9\xea' +
                  b'\x5f\x5f\x89\x41\x6a\x41' +
                  b'\x92\x6e\xe5\x35\x52\x9f')
        header_set = [
            HeaderTuple(':method', 'GET'),
            HeaderTuple(':path', '/jimiscool/'),
            HeaderTuple('customkey', 'sensitiveinfo'),
        ]
        assert e.encode(header_set, huffman=True) == result

    def test_sensitive_headers_with_header_tuples(self):
        """
        A header field stored in a NeverIndexedHeaderTuple emits a
        representation that forbids indexing.
        """
        e = Encoder()
        result = (b'\x82\x14\x88\x63\xa1\xa9' +
                  b'\x32\x08\x73\xd0\xc7\x10' +
                  b'\x87\x25\xa8\x49\xe9\xea' +
                  b'\x5f\x5f\x89\x41\x6a\x41' +
                  b'\x92\x6e\xe5\x35\x52\x9f')
        header_set = [
            NeverIndexedHeaderTuple(':method', 'GET'),
            NeverIndexedHeaderTuple(':path', '/jimiscool/'),
            NeverIndexedHeaderTuple('customkey', 'sensitiveinfo'),
        ]
        assert e.encode(header_set, huffman=True) == result

    def test_header_table_size_getter(self):
        e = Encoder()
        assert e.header_table_size == 4096

    def test_indexed_literal_header_field_with_indexing(self):
        """
        The header field representation uses an indexed name and a literal
        value and performs incremental indexing.
        """
        e = Encoder()
        header_set = {':path': '/sample/path'}
        result = b'\x44\x0c/sample/path'

        assert e.encode(header_set, huffman=False) == result
        assert list(e.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8'))
            for n, v in header_set.items()
        ]

    def test_indexed_header_field(self):
        """
        The header field representation uses an indexed header field, from
        the static table.
        """
        e = Encoder()
        header_set = {':method': 'GET'}
        result = b'\x82'

        assert e.encode(header_set, huffman=False) == result
        assert list(e.header_table.dynamic_entries) == []

    def test_indexed_header_field_from_static_table(self):
        e = Encoder()
        e.header_table_size = 0
        header_set = {':method': 'GET'}
        result = b'\x82'

        # Make sure we don't emit an encoding context update.
        e.header_table.resized = False

        assert e.encode(header_set, huffman=False) == result
        assert list(e.header_table.dynamic_entries) == []

    def test_request_examples_without_huffman(self):
        """
        This section shows several consecutive header sets, corresponding to
        HTTP requests, on the same connection.
        """
        e = Encoder()
        first_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com'),
        ]
        # We should have :authority in first_header_table since we index it
        first_header_table = [(':authority', 'www.example.com')]
        first_result = b'\x82\x86\x84\x41\x0fwww.example.com'

        assert e.encode(first_header_set, huffman=False) == first_result
        assert list(e.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8'))
            for n, v in first_header_table
        ]

        second_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com',),
            ('cache-control', 'no-cache'),
        ]
        second_header_table = [
            ('cache-control', 'no-cache'),
            (':authority', 'www.example.com')
        ]
        second_result = b'\x82\x86\x84\xbeX\x08no-cache'

        assert e.encode(second_header_set, huffman=False) == second_result
        assert list(e.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8'))
            for n, v in second_header_table
        ]

        third_header_set = [
            (':method', 'GET',),
            (':scheme', 'https',),
            (':path', '/index.html',),
            (':authority', 'www.example.com',),
            ('custom-key', 'custom-value'),
        ]
        third_result = (
            b'\x82\x87\x85\xbf@\ncustom-key\x0ccustom-value'
        )

        assert e.encode(third_header_set, huffman=False) == third_result
        # Don't check the header table here, it's just too complex to be
        # reliable. Check its length though.
        assert len(e.header_table.dynamic_entries) == 3

    def test_request_examples_with_huffman(self):
        """
        This section shows the same examples as the previous section, but
        using Huffman encoding for the literal values.
        """
        e = Encoder()
        first_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com'),
        ]
        first_header_table = [(':authority', 'www.example.com')]
        first_result = (
            b'\x82\x86\x84\x41\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff'
        )

        assert e.encode(first_header_set, huffman=True) == first_result
        assert list(e.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8'))
            for n, v in first_header_table
        ]

        second_header_table = [
            ('cache-control', 'no-cache'),
            (':authority', 'www.example.com')
        ]
        second_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com',),
            ('cache-control', 'no-cache'),
        ]
        second_result = b'\x82\x86\x84\xbeX\x86\xa8\xeb\x10d\x9c\xbf'

        assert e.encode(second_header_set, huffman=True) == second_result
        assert list(e.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8'))
            for n, v in second_header_table
        ]

        third_header_set = [
            (':method', 'GET',),
            (':scheme', 'https',),
            (':path', '/index.html',),
            (':authority', 'www.example.com',),
            ('custom-key', 'custom-value'),
        ]
        third_result = (
            b'\x82\x87\x85\xbf'
            b'@\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8\xb4\xbf'
        )

        assert e.encode(third_header_set, huffman=True) == third_result
        assert len(e.header_table.dynamic_entries) == 3

    # These tests are custom, for hyper.
    def test_resizing_header_table(self):
        # We need to encode a substantial number of headers, to populate the
        # header table.
        e = Encoder()
        header_set = [
            (':method', 'GET'),
            (':scheme', 'https'),
            (':path', '/some/path'),
            (':authority', 'www.example.com'),
            ('custom-key', 'custom-value'),
            (
                "user-agent",
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:16.0) "
                "Gecko/20100101 Firefox/16.0",
            ),
            (
                "accept",
                "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;"
                "q=0.8",
            ),
            ('X-Lukasa-Test', '88989'),
        ]
        e.encode(header_set, huffman=True)

        # Resize the header table to a size so small that nothing can be in it.
        e.header_table_size = 40
        assert len(e.header_table.dynamic_entries) == 0

    def test_resizing_header_table_sends_multiple_updates(self):
        e = Encoder()

        e.header_table_size = 40
        e.header_table_size = 100
        e.header_table_size = 40

        header_set = [(':method', 'GET')]
        out = e.encode(header_set, huffman=True)
        assert out == b'\x3F\x09\x3F\x45\x3F\x09\x82'

    def test_resizing_header_table_to_same_size_ignored(self):
        e = Encoder()

        # These size changes should be ignored
        e.header_table_size = 4096
        e.header_table_size = 4096
        e.header_table_size = 4096

        # These size changes should be encoded
        e.header_table_size = 40
        e.header_table_size = 100
        e.header_table_size = 40

        header_set = [(':method', 'GET')]
        out = e.encode(header_set, huffman=True)
        assert out == b'\x3F\x09\x3F\x45\x3F\x09\x82'

    def test_resizing_header_table_sends_context_update(self):
        e = Encoder()

        # Resize the header table to a size so small that nothing can be in it.
        e.header_table_size = 40

        # Now, encode a header set. Just a small one, with a well-defined
        # output.
        header_set = [(':method', 'GET')]
        out = e.encode(header_set, huffman=True)

        assert out == b'?\t\x82'

    def test_setting_table_size_to_the_same_does_nothing(self):
        e = Encoder()

        # Set the header table size to the default.
        e.header_table_size = 4096

        # Now encode a header set. Just a small one, with a well-defined
        # output.
        header_set = [(':method', 'GET')]
        out = e.encode(header_set, huffman=True)

        assert out == b'\x82'

    def test_evicting_header_table_objects(self):
        e = Encoder()

        # Set the header table size large enough to include one header.
        e.header_table_size = 66
        header_set = [('a', 'b'), ('long-custom-header', 'longish value')]
        e.encode(header_set)

        assert len(e.header_table.dynamic_entries) == 1

    def test_headers_generator(self):
        e = Encoder()

        def headers_generator():
            return (("k" + str(i), "v" + str(i)) for i in range(3))

        header_set = headers_generator()
        out = e.encode(header_set)
        assert Decoder().decode(out) == list(headers_generator())


class TestHPACKDecoder:
    # These tests are stolen entirely from the IETF specification examples.
    def test_literal_header_field_with_indexing(self):
        """
        The header field representation uses a literal name and a literal
        value.
        """
        d = Decoder()
        header_set = [('custom-key', 'custom-header')]
        data = b'\x40\x0acustom-key\x0dcustom-header'

        assert d.decode(data) == header_set
        assert list(d.header_table.dynamic_entries) == [
            (n.encode('utf-8'), v.encode('utf-8')) for n, v in header_set
        ]

    def test_raw_decoding(self):
        """
        The header field representation is decoded as a raw byte string instead
        of UTF-8
        """
        d = Decoder()
        header_set = [
            (b'\x00\x01\x99\x30\x11\x22\x55\x21\x89\x14', b'custom-header')
        ]
        data = (
            b'\x40\x0a\x00\x01\x99\x30\x11\x22\x55\x21\x89\x14\x0d'
            b'custom-header'
        )

        assert d.decode(data, raw=True) == header_set

    def test_literal_header_field_without_indexing(self):
        """
        The header field representation uses an indexed name and a literal
        value.
        """
        d = Decoder()
        header_set = [(':path', '/sample/path')]
        data = b'\x04\x0c/sample/path'

        assert d.decode(data) == header_set
        assert list(d.header_table.dynamic_entries) == []

    def test_header_table_size_getter(self):
        d = Decoder()
        assert d.header_table_size

    def test_indexed_header_field(self):
        """
        The header field representation uses an indexed header field, from
        the static table.
        """
        d = Decoder()
        header_set = [(':method', 'GET')]
        data = b'\x82'

        assert d.decode(data) == header_set
        assert list(d.header_table.dynamic_entries) == []

    def test_request_examples_without_huffman(self):
        """
        This section shows several consecutive header sets, corresponding to
        HTTP requests, on the same connection.
        """
        d = Decoder()
        first_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com'),
        ]
        # The first_header_table doesn't contain 'authority'
        first_data = b'\x82\x86\x84\x01\x0fwww.example.com'

        assert d.decode(first_data) == first_header_set
        assert list(d.header_table.dynamic_entries) == []

        # This request takes advantage of the differential encoding of header
        # sets.
        second_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com',),
            ('cache-control', 'no-cache'),
        ]
        second_data = (
            b'\x82\x86\x84\x01\x0fwww.example.com\x0f\t\x08no-cache'
        )

        assert d.decode(second_data) == second_header_set
        assert list(d.header_table.dynamic_entries) == []

        third_header_set = [
            (':method', 'GET',),
            (':scheme', 'https',),
            (':path', '/index.html',),
            (':authority', 'www.example.com',),
            ('custom-key', 'custom-value'),
        ]
        third_data = (
            b'\x82\x87\x85\x01\x0fwww.example.com@\ncustom-key\x0ccustom-value'
        )

        assert d.decode(third_data) == third_header_set
        # Don't check the header table here, it's just too complex to be
        # reliable. Check its length though.
        assert len(d.header_table.dynamic_entries) == 1

    def test_request_examples_with_huffman(self):
        """
        This section shows the same examples as the previous section, but
        using Huffman encoding for the literal values.
        """
        d = Decoder()

        first_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com'),
        ]
        first_data = (
            b'\x82\x86\x84\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff'
        )

        assert d.decode(first_data) == first_header_set
        assert list(d.header_table.dynamic_entries) == []

        second_header_set = [
            (':method', 'GET',),
            (':scheme', 'http',),
            (':path', '/',),
            (':authority', 'www.example.com',),
            ('cache-control', 'no-cache'),
        ]
        second_data = (
            b'\x82\x86\x84\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff'
            b'\x0f\t\x86\xa8\xeb\x10d\x9c\xbf'
        )

        assert d.decode(second_data) == second_header_set
        assert list(d.header_table.dynamic_entries) == []

        third_header_set = [
            (':method', 'GET',),
            (':scheme', 'https',),
            (':path', '/index.html',),
            (':authority', 'www.example.com',),
            ('custom-key', 'custom-value'),
        ]
        third_data = (
            b'\x82\x87\x85\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff@'
            b'\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8\xb4\xbf'
        )

        assert d.decode(third_data) == third_header_set
        assert len(d.header_table.dynamic_entries) == 1

    # These tests are custom, for hyper.
    def test_resizing_header_table(self):
        # We need to decode a substantial number of headers, to populate the
        # header table. This string isn't magic: it's the output from the
        # equivalent test for the Encoder.
        d = Decoder()
        data = (
            b'\x82\x87D\x87a\x07\xa4\xacV4\xcfA\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0'
            b'\xab\x90\xf4\xff@\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8'
            b'\xb4\xbfz\xbc\xd0\x7ff\xa2\x81\xb0\xda\xe0S\xfa\xd02\x1a\xa4\x9d'
            b'\x13\xfd\xa9\x92\xa4\x96\x854\x0c\x8aj\xdc\xa7\xe2\x81\x02\xef}'
            b'\xa9g{\x81qp\x7fjb):\x9d\x81\x00 \x00@\x150\x9a\xc2\xca\x7f,\x05'
            b'\xc5\xc1S\xb0I|\xa5\x89\xd3M\x1fC\xae\xba\x0cA\xa4\xc7\xa9\x8f3'
            b'\xa6\x9a?\xdf\x9ah\xfa\x1du\xd0b\r&=Ly\xa6\x8f\xbe\xd0\x01w\xfe'
            b'\xbeX\xf9\xfb\xed\x00\x17{@\x8a\xfc[=\xbdF\x81\xad\xbc\xa8O\x84y'
            b'\xe7\xde\x7f'
        )
        d.decode(data)

        # Resize the header table to a size so small that nothing can be in it.
        d.header_table_size = 40
        assert len(d.header_table.dynamic_entries) == 0

    def test_apache_trafficserver(self):
        # This test reproduces the bug in #110, using exactly the same header
        # data.
        d = Decoder()
        data = (
            b'\x10\x07:status\x03200@\x06server\tATS/6.0.0'
            b'@\x04date\x1dTue, 31 Mar 2015 08:09:51 GMT'
            b'@\x0ccontent-type\ttext/html@\x0econtent-length\x0542468'
            b'@\rlast-modified\x1dTue, 31 Mar 2015 01:55:51 GMT'
            b'@\x04vary\x0fAccept-Encoding@\x04etag\x0f"5519fea7-a5e4"'
            b'@\x08x-served\x05Nginx@\x14x-subdomain-tryfiles\x04True'
            b'@\x07x-deity\thydra-lts@\raccept-ranges\x05bytes@\x03age\x010'
            b'@\x19strict-transport-security\rmax-age=86400'
            b'@\x03via2https/1.1 ATS (ApacheTrafficServer/6.0.0 [cSsNfU])'
        )
        expect = [
            (':status', '200'),
            ('server', 'ATS/6.0.0'),
            ('date', 'Tue, 31 Mar 2015 08:09:51 GMT'),
            ('content-type', 'text/html'),
            ('content-length', '42468'),
            ('last-modified', 'Tue, 31 Mar 2015 01:55:51 GMT'),
            ('vary', 'Accept-Encoding'),
            ('etag', '"5519fea7-a5e4"'),
            ('x-served', 'Nginx'),
            ('x-subdomain-tryfiles', 'True'),
            ('x-deity', 'hydra-lts'),
            ('accept-ranges', 'bytes'),
            ('age', '0'),
            ('strict-transport-security', 'max-age=86400'),
            ('via', 'https/1.1 ATS (ApacheTrafficServer/6.0.0 [cSsNfU])'),
        ]

        result = d.decode(data)

        assert result == expect
        # The status header shouldn't be indexed.
        assert len(d.header_table.dynamic_entries) == len(expect) - 1

    def test_utf8_errors_raise_hpack_decoding_error(self):
        d = Decoder()

        # Invalid UTF-8 data.
        data = b'\x82\x86\x84\x01\x10www.\x07\xaa\xd7\x95\xd7\xa8\xd7\x94.com'

        with pytest.raises(HPACKDecodingError):
            d.decode(data)

    def test_invalid_indexed_literal(self):
        d = Decoder()

        # Refer to an index that is too large.
        data = b'\x82\x86\x84\x7f\x0a\x0fwww.example.com'
        with pytest.raises(InvalidTableIndex):
            d.decode(data)

    def test_invalid_indexed_header(self):
        d = Decoder()

        # Refer to an indexed header that is too large.
        data = b'\xBE\x86\x84\x01\x0fwww.example.com'
        with pytest.raises(InvalidTableIndex):
            d.decode(data)

    def test_literal_header_field_with_indexing_emits_headertuple(self):
        """
        A header field with indexing emits a HeaderTuple.
        """
        d = Decoder()
        data = b'\x00\x0acustom-key\x0dcustom-header'

        headers = d.decode(data)
        assert len(headers) == 1

        header = headers[0]
        assert isinstance(header, HeaderTuple)
        assert not isinstance(header, NeverIndexedHeaderTuple)

    def test_literal_never_indexed_emits_neverindexedheadertuple(self):
        """
        A literal header field that must never be indexed emits a
        NeverIndexedHeaderTuple.
        """
        d = Decoder()
        data = b'\x10\x0acustom-key\x0dcustom-header'

        headers = d.decode(data)
        assert len(headers) == 1

        header = headers[0]
        assert isinstance(header, NeverIndexedHeaderTuple)

    def test_indexed_never_indexed_emits_neverindexedheadertuple(self):
        """
        A header field with an indexed name that must never be indexed emits a
        NeverIndexedHeaderTuple.
        """
        d = Decoder()
        data = b'\x14\x0c/sample/path'

        headers = d.decode(data)
        assert len(headers) == 1

        header = headers[0]
        assert isinstance(header, NeverIndexedHeaderTuple)

    def test_max_header_list_size(self):
        """
        If the header block is larger than the max_header_list_size, the HPACK
        decoder throws an OversizedHeaderListError.
        """
        d = Decoder(max_header_list_size=44)
        data = b'\x14\x0c/sample/path'

        with pytest.raises(OversizedHeaderListError):
            d.decode(data)

    def test_can_decode_multiple_header_table_size_changes(self):
        """
        If multiple header table size changes are sent in at once, they are
        successfully decoded.
        """
        d = Decoder()
        data = b'?a?\xe1\x1f\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
        expect = [
            (':method', 'GET'),
            (':scheme', 'https'),
            (':path', '/'),
            (':authority', '127.0.0.1:8443')
        ]

        assert d.decode(data) == expect

    def test_header_table_size_change_above_maximum(self):
        """
        If a header table size change is received that exceeds the maximum
        allowed table size, it is rejected.
        """
        d = Decoder()
        d.max_allowed_table_size = 127
        data = b'?a\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'

        with pytest.raises(InvalidTableSizeError):
            d.decode(data)

    def test_table_size_not_adjusting(self):
        """
        If the header table size is shrunk, and then the remote peer doesn't
        join in the shrinking, then an error is raised.
        """
        d = Decoder()
        d.max_allowed_table_size = 128
        data = b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'

        with pytest.raises(InvalidTableSizeError):
            d.decode(data)

    def test_table_size_last_rejected(self):
        """
        If a header table size change comes last in the header block, it is
        forbidden.
        """
        d = Decoder()
        data = b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99?a'

        with pytest.raises(HPACKDecodingError):
            d.decode(data)

    def test_table_size_middle_rejected(self):
        """
        If a header table size change comes anywhere but first in the header
        block, it is forbidden.
        """
        d = Decoder()
        data = b'\x82?a\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'

        with pytest.raises(HPACKDecodingError):
            d.decode(data)

    def test_truncated_header_name(self):
        """
        If a header name is truncated an error is raised.
        """
        d = Decoder()
        # This is a simple header block that has a bad ending. The interesting
        # part begins on the second line. This indicates a string that has
        # literal name and value. The name is a 5 character huffman-encoded
        # string that is only three bytes long.
        data = (
            b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
            b'\x00\x85\xf2\xb2J'
        )

        with pytest.raises(HPACKDecodingError):
            d.decode(data)

    def test_truncated_header_value(self):
        """
        If a header value is truncated an error is raised.
        """
        d = Decoder()
        # This is a simple header block that has a bad ending. The interesting
        # part begins on the second line. This indicates a string that has
        # literal name and value. The name is a 5 character huffman-encoded
        # string, but the entire EOS character has been written over the end.
        # This causes hpack to see the header value as being supposed to be
        # 622462 bytes long, which it clearly is not, and so this must fail.
        data = (
            b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
            b'\x00\x85\xf2\xb2J\x87\xff\xff\xff\xfd%B\x7f'
        )

        with pytest.raises(HPACKDecodingError):
            d.decode(data)


class TestDictToIterable:
    """
    The dict_to_iterable function has some subtle requirements: validates that
    everything behaves as expected.

    As much as possible this tries to be exhaustive.
    """
    keys = one_of(
        text().filter(lambda k: k and not k.startswith(':')),
        binary().filter(lambda k: k and not k.startswith(b':'))
    )

    @given(
        special_keys=sets(keys),
        boring_keys=sets(keys),
    )
    def test_ordering(self, special_keys, boring_keys):
        """
        _dict_to_iterable produces an iterable where all the keys beginning
        with a colon are emitted first.
        """
        def _prepend_colon(k):
            if isinstance(k, str):
                return ':' + k
            else:
                return b':' + k

        special_keys = set(map(_prepend_colon, special_keys))
        input_dict = {
            k: b'testval' for k in itertools.chain(
                special_keys,
                boring_keys
            )
        }
        filtered = _dict_to_iterable(input_dict)

        received_special = set()
        received_boring = set()

        for _ in special_keys:
            k, _ = next(filtered)
            received_special.add(k)
        for _ in boring_keys:
            k, _ = next(filtered)
            received_boring.add(k)

        assert special_keys == received_special
        assert boring_keys == received_boring

    @given(
        special_keys=sets(keys),
        boring_keys=sets(keys),
    )
    def test_ordering_applies_to_encoding(self, special_keys, boring_keys):
        """
        When encoding a dictionary the special keys all appear first.
        """
        def _prepend_colon(k):
            if isinstance(k, str):
                return ':' + k
            else:
                return b':' + k

        special_keys = set(map(_prepend_colon, special_keys))
        input_dict = {
            k: b'testval' for k in itertools.chain(
                special_keys,
                boring_keys
            )
        }
        e = Encoder()
        d = Decoder()
        encoded = e.encode(input_dict)
        decoded = iter(d.decode(encoded, raw=True))

        received_special = set()
        received_boring = set()
        expected_special = set(map(_to_bytes, special_keys))
        expected_boring = set(map(_to_bytes, boring_keys))

        for _ in special_keys:
            k, _ = next(decoded)
            received_special.add(k)
        for _ in boring_keys:
            k, _ = next(decoded)
            received_boring.add(k)

        assert expected_special == received_special
        assert expected_boring == received_boring
