1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
import argparse
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qsl
import pyatem.command as commandmodule
from pyatem.protocol import AtemProtocol
switcher = None
_port = None
_inputs = {}
class ApiServer(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path
print(path)
if path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b"OpenSwitcher API server\n")
return
parts = urlparse(path)
command = parts.path[1:]
args = parts.query
classname = command.title().replace('-', '') + "Command"
print("Command: " + classname)
if not hasattr(commandmodule, classname):
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b"Unknown command\n")
return
arguments = dict(parse_qsl(args))
for key in arguments:
try:
arguments[key] = int(arguments[key])
except:
pass
if 'source' in arguments:
if arguments['source'] in _inputs:
arguments['source'] = _inputs[arguments['source']]
try:
cmd = getattr(commandmodule, classname)(**arguments)
switcher.send_commands([cmd])
except Exception as e:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(str(e).encode())
return
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b"OK\n")
class WebThread(threading.Thread):
def run(self):
print("Running API server on :" + str(_port))
httpd = HTTPServer(("", _port), ApiServer)
httpd.serve_forever()
def input_changed(inputproperties):
global _inputs
_inputs[inputproperties.short_name] = inputproperties.index
def connection_ready(*args):
WebThread().start()
def main():
global switcher, _port
parser = argparse.ArgumentParser(description="Atem CLI")
parser.add_argument('ip', help='Atem ip or "usb" for usb')
parser.add_argument('port', help='Port to run the HTTP server on', type=int)
args = parser.parse_args()
_port = args.port
if args.ip == 'usb':
print("Connecting to USB switcher")
switcher = AtemProtocol(usb='auto')
else:
print("Connecting to " + args.ip)
switcher = AtemProtocol(ip=args.ip)
switcher.on('change:input-properties:*', input_changed)
switcher.on('connected', connection_ready)
switcher.connect()
while True:
switcher.loop()
if __name__ == '__main__':
main()
|