1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
"""Tests for peer listener & manager."""
import asyncio
from datetime import UTC, datetime, timedelta
import hashlib
import ipaddress
import os
import snitun
from snitun.multiplexer.channel import MultiplexerChannel
from snitun.multiplexer.core import Multiplexer
from snitun.multiplexer.crypto import CryptoTransport
from snitun.server.listener_peer import PeerListener
from snitun.server.listener_sni import SNIProxy
from snitun.server.peer_manager import PeerManager
from ..conftest import Client
from .const_fernet import create_peer_config
from .const_tls import TLS_1_2
IP_ADDR = ipaddress.ip_address("127.0.0.1")
async def test_server_full(
peer_manager: PeerManager,
peer_listener: PeerListener,
test_client_peer: Client,
sni_proxy: SNIProxy,
test_client_ssl: Client,
) -> None:
"""Run a full flow of with a peer after that disconnect."""
peer_messages = []
peer_address = []
valid = datetime.now(tz=UTC) + timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
test_client_peer.writer.write(fernet_token)
await test_client_peer.writer.drain()
token = await test_client_peer.reader.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
test_client_peer.writer.write(crypto.encrypt(token))
await test_client_peer.writer.drain()
await asyncio.sleep(0.1)
assert peer_manager.peer_available(hostname)
async def mock_new_channel(
multiplexer: Multiplexer,
channel: MultiplexerChannel,
) -> None:
"""Mock new channel."""
while True:
message = await channel.read()
peer_messages.append(message)
peer_address.append(channel.ip_address)
multiplexer = Multiplexer(
crypto,
test_client_peer.reader,
test_client_peer.writer,
snitun.PROTOCOL_VERSION,
mock_new_channel,
)
test_client_ssl.writer.write(TLS_1_2)
await test_client_ssl.writer.drain()
await asyncio.sleep(0.1)
assert peer_messages
assert peer_messages[0] == TLS_1_2
assert peer_address
assert peer_address[0] == IP_ADDR
multiplexer.shutdown()
await multiplexer.wait()
await asyncio.sleep(0.1)
assert not peer_manager.peer_available(hostname)
|