File: tcp_client.py

package info (click to toggle)
python-ledgercomm 1.2.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 164 kB
  • sloc: python: 297; makefile: 2
file content (123 lines) | stat: -rwxr-xr-x 3,028 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""ledgercomm.interfaces.tcp_client module."""

import socket
from typing import Tuple

from ledgercomm.interfaces.comm import Comm
from ledgercomm.log import LOG


class TCPClient(Comm):
    """TCPClient class.

    Mainly used to connect to the TCP server of the Speculos emulator.

    Parameters
    ----------
    server : str
        IP address of the TCP server.
    port : int
        Port of the TCP server.

    Attributes
    ----------
    server : str
        IP address of the TCP server.
    port : int
        Port of the TCP server.
    socket : socket.socket
        TCP socket to communicate with the server.
    __opened : bool
        Whether the TCP socket is opened or not.

    """

    def __init__(self, server: str, port: int) -> None:
        """Init constructor of TCPClient."""
        self.server: str = server
        self.port: int = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__opened: bool = False

    def open(self) -> None:
        """Open connection to TCP socket with `self.server` and `self.port`.

        Returns
        -------
        None

        """
        if not self.__opened:
            self.socket.connect((self.server, self.port))
            self.__opened = True

    def send(self, data: bytes) -> int:
        """Send `data` through TCP socket `self.socket`.

        Parameters
        ----------
        data : bytes
            Bytes of data to send.

        Returns
        -------
        int
            Total lenght of data sent through TCP socket.

        """
        if not data:
            raise ValueError("Can't send empty data!")

        LOG.debug("=> %s", data.hex())
        data_len: bytes = int.to_bytes(len(data), 4, byteorder="big")

        return self.socket.send(data_len + data)

    def recv(self) -> Tuple[int, bytes]:
        """Receive data through TCP socket `self.socket`.

        Blocking IO.

        Returns
        -------
        Tuple[int, bytes]
            A pair (sw, rdata) containing the status word and response data.

        """
        length: int = int.from_bytes(self.socket.recv(4), byteorder="big")
        rdata: bytes = self.socket.recv(length)
        sw: int = int.from_bytes(self.socket.recv(2), byteorder="big")

        LOG.debug("<= %s %s", rdata.hex(), hex(sw)[2:])

        return sw, rdata

    def exchange(self, data: bytes) -> Tuple[int, bytes]:
        """Exchange (send + receive) with `self.socket`.

        Parameters
        ----------
        data : bytes
            Bytes with `data` to send.

        Returns
        -------
        Tuple[int, bytes]
            A pair (sw, rdata) containing the status word and response data.

        """
        self.send(data)

        return self.recv()  # blocking IO

    def close(self) -> None:
        """Close connection to TCP socket `self.socket`.

        Returns
        -------
        None

        """
        if self.__opened:
            self.socket.close()
            self.__opened = False