File: test_compression_utils.py

package info (click to toggle)
python-aiohttp 3.11.16-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 16,156 kB
  • sloc: python: 51,898; ansic: 20,843; makefile: 395; javascript: 31; sh: 3
file content (23 lines) | stat: -rw-r--r-- 1,053 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"""Tests for compression utils."""

from aiohttp.compression_utils import ZLibCompressor, ZLibDecompressor


async def test_compression_round_trip_in_executor() -> None:
    """Ensure that compression and decompression work correctly in the executor."""
    compressor = ZLibCompressor(max_sync_chunk_size=1)
    decompressor = ZLibDecompressor(max_sync_chunk_size=1)
    data = b"Hi" * 100
    compressed_data = await compressor.compress(data) + compressor.flush()
    decompressed_data = await decompressor.decompress(compressed_data)
    assert data == decompressed_data


async def test_compression_round_trip_in_event_loop() -> None:
    """Ensure that compression and decompression work correctly in the event loop."""
    compressor = ZLibCompressor(max_sync_chunk_size=10000)
    decompressor = ZLibDecompressor(max_sync_chunk_size=10000)
    data = b"Hi" * 100
    compressed_data = await compressor.compress(data) + compressor.flush()
    decompressed_data = await decompressor.decompress(compressed_data)
    assert data == decompressed_data