1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
import asyncio
import ipaddress
import pytest
from pytest_codspeed import BenchmarkFixture
from snitun.multiplexer.channel import MultiplexerChannel
from snitun.multiplexer.core import Multiplexer
IP_ADDR = ipaddress.ip_address("8.8.8.8")
@pytest.mark.parametrize(
("size", "message_count"),
[(2048, 1000), (1024 * 1024, 100)],
ids=["1000@2KiB", "100@1MiB"],
)
def test_multiplex_channel_message(
benchmark: BenchmarkFixture,
multiplexer_client: Multiplexer,
multiplexer_server: Multiplexer,
size: int,
message_count: int,
) -> None:
"""Test writing messages to the channel and reading them back."""
assert not multiplexer_client._channels
assert not multiplexer_server._channels
loop = asyncio.get_event_loop()
async def setup_channel() -> tuple[MultiplexerChannel, MultiplexerChannel]:
channel_client = await multiplexer_client.create_channel(
IP_ADDR,
lambda _: None,
)
await asyncio.sleep(0.1)
channel_server = multiplexer_server._channels.get(channel_client.id)
assert channel_client
assert channel_server
return channel_client, channel_server
payload = b"x" * size
async def _async_read_write_messages(
channel_client: MultiplexerChannel,
channel_server: MultiplexerChannel,
) -> None:
for _ in range(message_count):
await channel_client.write(payload)
await channel_server.read()
channel_client, channel_server = loop.run_until_complete(setup_channel())
@benchmark
def read_write_channel() -> None:
loop.run_until_complete(
_async_read_write_messages(channel_client, channel_server),
)
|