1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
"""Rabbit Air TCP-based client."""
import asyncio
import socket
import struct
from typing import Any, Dict
from .client import Client
from .exceptions import NetworkError
class TcpClient(Client):
"""TCP-based client."""
timeout: float = 5.0
@classmethod
def _create_socket(cls) -> socket.socket:
return socket.socket(type=socket.SOCK_STREAM)
@staticmethod
async def _recvall(sock: socket.socket, size: int) -> bytes:
data = b""
loop = asyncio.get_running_loop()
while len(data) < size:
chunk = await loop.sock_recv(sock, size - len(data))
if not chunk:
break
data += chunk
if len(data) != size:
raise NetworkError("Connection was unexpectedly closed")
return data
async def _recvmsg(self) -> bytes:
assert self._sock is not None
header = await self._recvall(self._sock, 2)
size = struct.unpack("<H", header)[0]
return await self._recvall(self._sock, size)
async def _sendmsg(self, data: bytes) -> None:
assert self._sock is not None
loop = asyncio.get_running_loop()
await loop.sock_sendall(self._sock, struct.pack("<H", len(data)) + data)
async def _exchange(self, request_id: int, data: bytes) -> Dict[str, Any]:
return await asyncio.wait_for(super()._exchange(request_id, data), self.timeout)
|