1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
import base64
import logging
import socket
import time
from . import exceptions
class RemoteLegacy():
"""Object for remote control connection."""
def __init__(self, config):
if not config["port"]:
config["port"] = 55000
"""Make a new connection."""
self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if config["timeout"]:
self.connection.settimeout(config["timeout"])
self.connection.connect((config["host"], config["port"]))
payload = b"\x64\x00" \
+ self._serialize_string(config["description"]) \
+ self._serialize_string(config["id"]) \
+ self._serialize_string(config["name"])
packet = b"\x00\x00\x00" + self._serialize_string(payload, True)
logging.info("Sending handshake.")
self.connection.send(packet)
self._read_response(True)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
"""Close the connection."""
if self.connection:
self.connection.close()
self.connection = None
logging.debug("Connection closed.")
def control(self, key):
"""Send a control command."""
if not self.connection:
raise exceptions.ConnectionClosed()
payload = b"\x00\x00\x00" + self._serialize_string(key)
packet = b"\x00\x00\x00" + self._serialize_string(payload, True)
logging.info("Sending control command: %s", key)
self.connection.send(packet)
self._read_response()
time.sleep(self._key_interval)
_key_interval = 0.2
def _read_response(self, first_time=False):
header = self.connection.recv(3)
tv_name_len = int.from_bytes(header[1:3],
byteorder="little")
tv_name = self.connection.recv(tv_name_len)
if first_time:
logging.debug("Connected to '%s'.", tv_name.decode())
response_len = int.from_bytes(self.connection.recv(2),
byteorder="little")
response = self.connection.recv(response_len)
if len(response) == 0:
self.close()
raise exceptions.ConnectionClosed()
if response == b"\x64\x00\x01\x00":
logging.debug("Access granted.")
return
elif response == b"\x64\x00\x00\x00":
raise exceptions.AccessDenied()
elif response[0:1] == b"\x0a":
if first_time:
logging.warning("Waiting for authorization...")
return self._read_response()
elif response[0:1] == b"\x65":
logging.warning("Authorization cancelled.")
raise exceptions.AccessDenied()
elif response == b"\x00\x00\x00\x00":
logging.debug("Control accepted.")
return
raise exceptions.UnhandledResponse(response)
@staticmethod
def _serialize_string(string, raw=False):
if isinstance(string, str):
string = str.encode(string)
if not raw:
string = base64.b64encode(string)
return bytes([len(string)]) + b"\x00" + string
|