File: peer.py

package info (click to toggle)
python-snitun 0.45.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 664 kB
  • sloc: python: 6,681; sh: 5; makefile: 3
file content (129 lines) | stat: -rw-r--r-- 3,633 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""Represent a single Peer."""

from __future__ import annotations

import asyncio
from datetime import UTC, datetime
import hashlib
import logging
import os

from ..exceptions import MultiplexerTransportDecrypt, SniTunChallengeError
from ..multiplexer.core import Multiplexer
from ..multiplexer.crypto import CryptoTransport
from ..utils.asyncio import asyncio_timeout

_LOGGER = logging.getLogger(__name__)


class Peer:
    """Representation of a Peer."""

    def __init__(
        self,
        hostname: str,
        valid: datetime,
        aes_key: bytes,
        aes_iv: bytes,
        protocol_version: int,
        throttling: int | None = None,
        alias: list[str] | None = None,
    ) -> None:
        """Initialize a Peer."""
        self._hostname = hostname
        self._valid = valid
        self._throttling = throttling
        self._alias = alias or []
        self._multiplexer: Multiplexer | None = None
        self._crypto = CryptoTransport(aes_key, aes_iv)
        self._protocol_version = protocol_version

    @property
    def hostname(self) -> str:
        """Return his hostname."""
        return self._hostname

    @property
    def alias(self) -> list[str]:
        """Return the alias."""
        return self._alias

    @property
    def all_hostnames(self) -> list[str]:
        """Return a list of the base hostname and any alias."""
        return [self._hostname, *self._alias]

    @property
    def is_connected(self) -> bool:
        """Return True if we are connected to peer."""
        if not self._multiplexer:
            return False
        return self._multiplexer.is_connected

    @property
    def is_valid(self) -> bool:
        """Return True if the peer is valid."""
        return self._valid > datetime.now(tz=UTC)

    @property
    def multiplexer(self) -> Multiplexer | None:
        """Return Multiplexer object."""
        return self._multiplexer

    @property
    def is_ready(self) -> bool:
        """Return true if the Peer is ready to process data."""
        if self.multiplexer is None:
            return False
        return self.multiplexer.is_connected

    @property
    def protocol_version(self) -> int:
        """Return the protocol version."""
        return self._protocol_version

    async def init_multiplexer_challenge(
        self,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ) -> None:
        """Initialize multiplexer."""
        try:
            token = hashlib.sha256(os.urandom(40)).digest()
            writer.write(self._crypto.encrypt(token))

            async with asyncio_timeout.timeout(60):
                await writer.drain()
                data = await reader.readexactly(32)

            # Check Token
            data = self._crypto.decrypt(data)
            assert hashlib.sha256(token).digest() == data

        except (
            TimeoutError,
            asyncio.IncompleteReadError,
            MultiplexerTransportDecrypt,
            AssertionError,
            OSError,
        ) as err:
            raise SniTunChallengeError("Wrong challenge from peer") from err

        # Start Multiplexer
        self._multiplexer = Multiplexer(
            self._crypto,
            reader,
            writer,
            self._protocol_version,
            throttling=self._throttling,
        )

    def wait_disconnect(self) -> asyncio.Future[None]:
        """Wait until peer is disconnected.

        Return a coroutine.
        """
        if not self._multiplexer:
            raise RuntimeError("No Transport initialize for peer")

        return self._multiplexer.wait()