1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
import argparse
import collections
import json
import logging
import os
import socket
import errno
from . import __doc__ as doc
from . import __title__ as title
from . import __version__ as version
from . import exceptions
from . import Remote
def _read_config():
config = collections.defaultdict(lambda: None, {
"name": "samsungctl",
"description": "PC",
"id": "",
"method": "legacy",
"timeout": 0,
})
file_loaded = False
directories = []
xdg_config = os.getenv("XDG_CONFIG_HOME")
if xdg_config:
directories.append(xdg_config)
directories.append(os.path.join(os.getenv("HOME"), ".config"))
directories.append("/etc")
for directory in directories:
path = os.path.join(directory, "samsungctl.conf")
try:
config_file = open(path)
except IOError as e:
if e.errno == errno.ENOENT:
continue
else:
raise
else:
file_loaded = True
break
if not file_loaded:
return config
with config_file:
try:
config_json = json.load(config_file)
except ValueError as e:
messsage = "Warning: Could not parse the configuration file.\n %s"
logging.warning(message, e)
return config
config.update(config_json)
return config
def main():
epilog = "E.g. %(prog)s --host 192.168.0.10 --name myremote KEY_VOLDOWN"
parser = argparse.ArgumentParser(prog=title, description=doc,
epilog=epilog)
parser.add_argument("--version", action="version",
version="%(prog)s {0}".format(version))
parser.add_argument("-v", "--verbose", action="count",
help="increase output verbosity")
parser.add_argument("-q", "--quiet", action="store_true",
help="suppress non-fatal output")
parser.add_argument("-i", "--interactive", action="store_true",
help="interactive control")
parser.add_argument("--host", help="TV hostname or IP address")
parser.add_argument("--port", type=int, help="TV port number (TCP)")
parser.add_argument("--method",
help="Connection method (legacy or websocket)")
parser.add_argument("--name", help="remote control name")
parser.add_argument("--description", metavar="DESC",
help="remote control description")
parser.add_argument("--id", help="remote control id")
parser.add_argument("--timeout", type=float,
help="socket timeout in seconds (0 = no timeout)")
parser.add_argument("key", nargs="*",
help="keys to be sent (e.g. KEY_VOLDOWN)")
args = parser.parse_args()
if args.quiet:
log_level = logging.ERROR
elif not args.verbose:
log_level = logging.WARNING
elif args.verbose == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG
logging.basicConfig(format="%(message)s", level=log_level)
config = _read_config()
config.update({k: v for k, v in vars(args).items() if v is not None})
if not config["host"]:
logging.error("Error: --host must be set")
return
try:
with Remote(config) as remote:
for key in args.key:
remote.control(key)
if args.interactive:
logging.getLogger().setLevel(logging.ERROR)
from . import interactive
interactive.run(remote)
elif len(args.key) == 0:
logging.warning("Warning: No keys specified.")
except exceptions.ConnectionClosed:
logging.error("Error: Connection closed!")
except exceptions.AccessDenied:
logging.error("Error: Access denied!")
except exceptions.UnknownMethod:
logging.error("Error: Unknown method '{}'".format(config["method"]))
except socket.timeout:
logging.error("Error: Timed out!")
except OSError as e:
logging.error("Error: %s", e.strerror)
if __name__ == "__main__":
main()
|