File: test_worker.py

package info (click to toggle)
python-snitun 0.45.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 632 kB
  • sloc: python: 6,646; sh: 5; makefile: 3
file content (136 lines) | stat: -rw-r--r-- 3,875 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Tests for the server worker."""

import asyncio
from datetime import UTC, datetime, timedelta
import hashlib
import os
import socket
import time

from snitun.multiplexer.crypto import CryptoTransport
from snitun.server.worker import ServerWorker

from .const_fernet import FERNET_TOKENS, create_peer_config
from .const_tls import TLS_1_2


def test_worker_up_down(event_loop: asyncio.AbstractEventLoop) -> None:
    """Test if worker start and stop."""
    worker = ServerWorker(FERNET_TOKENS)

    worker.start()
    assert worker.is_alive()
    assert worker.peer_size == 0
    worker.shutdown()

    assert worker.exitcode == 0
    assert not worker.is_alive()


def test_peer_connection(
    test_server_sync: list[socket.socket],
    test_client_sync: socket.socket,
    event_loop: asyncio.AbstractEventLoop,
) -> None:
    """Run a full flow of with a peer."""
    worker = ServerWorker(FERNET_TOKENS)
    valid = datetime.now(tz=UTC) + timedelta(days=1)
    aes_key = os.urandom(32)
    aes_iv = os.urandom(16)
    hostname = "localhost"
    fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)

    worker.start()
    crypto = CryptoTransport(aes_key, aes_iv)

    worker.handover_connection(test_server_sync[-1], fernet_token, None)

    token = test_client_sync.recv(32)
    token = hashlib.sha256(crypto.decrypt(token)).digest()
    test_client_sync.sendall(crypto.encrypt(token))

    time.sleep(1)
    assert worker.is_responsible_peer(hostname)
    assert worker.peer_size == 1

    worker.shutdown()

    assert worker.peer_size == 0


def test_peer_connection_disconnect(
    test_server_sync: list[socket.socket],
    test_client_sync: socket.socket,
    event_loop: asyncio.AbstractEventLoop,
) -> None:
    """Run a full flow of with a peer & disconnect."""
    worker = ServerWorker(FERNET_TOKENS)
    valid = datetime.now(tz=UTC) + timedelta(days=1)
    aes_key = os.urandom(32)
    aes_iv = os.urandom(16)
    hostname = "localhost"
    fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)

    worker.start()
    crypto = CryptoTransport(aes_key, aes_iv)

    worker.handover_connection(test_server_sync[-1], fernet_token, None)

    token = test_client_sync.recv(32)
    token = hashlib.sha256(crypto.decrypt(token)).digest()
    test_client_sync.sendall(crypto.encrypt(token))

    time.sleep(1)
    assert worker.is_responsible_peer(hostname)
    assert worker.peer_size == 1

    test_client_sync.shutdown(socket.SHUT_RDWR)
    time.sleep(1)
    assert not worker.is_responsible_peer(hostname)
    assert worker.peer_size == 0

    worker.shutdown()


def test_sni_connection(
    test_server_sync: list[socket.socket],
    test_client_sync: socket.socket,
    test_client_ssl_sync: socket.socket,
    event_loop: asyncio.AbstractEventLoop,
) -> None:
    """Run a full flow of with a peer."""
    worker = ServerWorker(FERNET_TOKENS)
    valid = datetime.now(tz=UTC) + timedelta(days=1)
    aes_key = os.urandom(32)
    aes_iv = os.urandom(16)
    hostname = "localhost"
    alias = ["localhost.custom"]
    fernet_token = create_peer_config(
        valid.timestamp(),
        hostname,
        aes_key,
        aes_iv,
        alias=alias,
    )

    worker.start()
    crypto = CryptoTransport(aes_key, aes_iv)

    worker.handover_connection(test_server_sync[0], fernet_token, None)

    token = test_client_sync.recv(32)
    token = hashlib.sha256(crypto.decrypt(token)).digest()
    test_client_sync.sendall(crypto.encrypt(token))

    time.sleep(1)
    assert worker.is_responsible_peer(hostname)
    for entry in alias:
        assert worker.is_responsible_peer(entry)

    worker.handover_connection(test_server_sync[1], TLS_1_2, hostname)
    assert len(test_client_sync.recv(1048)) == 32

    assert worker.peer_size == 1
    worker.shutdown()

    assert worker.peer_size == 0