1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
"""ledgercomm.interfaces.tcp_client module."""
import socket
from typing import Tuple
from ledgercomm.interfaces.comm import Comm
from ledgercomm.log import LOG
class TCPClient(Comm):
"""TCPClient class.
Mainly used to connect to the TCP server of the Speculos emulator.
Parameters
----------
server : str
IP address of the TCP server.
port : int
Port of the TCP server.
Attributes
----------
server : str
IP address of the TCP server.
port : int
Port of the TCP server.
socket : socket.socket
TCP socket to communicate with the server.
__opened : bool
Whether the TCP socket is opened or not.
"""
def __init__(self, server: str, port: int) -> None:
"""Init constructor of TCPClient."""
self.server: str = server
self.port: int = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__opened: bool = False
def open(self) -> None:
"""Open connection to TCP socket with `self.server` and `self.port`.
Returns
-------
None
"""
if not self.__opened:
self.socket.connect((self.server, self.port))
self.__opened = True
def send(self, data: bytes) -> int:
"""Send `data` through TCP socket `self.socket`.
Parameters
----------
data : bytes
Bytes of data to send.
Returns
-------
int
Total lenght of data sent through TCP socket.
"""
if not data:
raise ValueError("Can't send empty data!")
LOG.debug("=> %s", data.hex())
data_len: bytes = int.to_bytes(len(data), 4, byteorder="big")
return self.socket.send(data_len + data)
def recv(self) -> Tuple[int, bytes]:
"""Receive data through TCP socket `self.socket`.
Blocking IO.
Returns
-------
Tuple[int, bytes]
A pair (sw, rdata) containing the status word and response data.
"""
length: int = int.from_bytes(self.socket.recv(4), byteorder="big")
rdata: bytes = self.socket.recv(length)
sw: int = int.from_bytes(self.socket.recv(2), byteorder="big")
LOG.debug("<= %s %s", rdata.hex(), hex(sw)[2:])
return sw, rdata
def exchange(self, data: bytes) -> Tuple[int, bytes]:
"""Exchange (send + receive) with `self.socket`.
Parameters
----------
data : bytes
Bytes with `data` to send.
Returns
-------
Tuple[int, bytes]
A pair (sw, rdata) containing the status word and response data.
"""
self.send(data)
return self.recv() # blocking IO
def close(self) -> None:
"""Close connection to TCP socket `self.socket`.
Returns
-------
None
"""
if self.__opened:
self.socket.close()
self.__opened = False
|