File: dump-unknown.py

package info (click to toggle)
pyatem 0.13.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,776 kB
  • sloc: python: 15,193; xml: 435; ansic: 256; sh: 26; makefile: 20
file content (86 lines) | stat: -rw-r--r-- 1,913 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import argparse
import signal

import tabulate

from pyatem.protocol import AtemProtocol
from pyatem.hexdump import hexdump

switcher = None
result = {}


def on_change(key, contents):
    global switcher
    global result
    if isinstance(contents, bytes):
        if key not in result:
            result[key] = set()
        result[key].add(contents)

    if key == 'multiviewer-input' or key == 'time':
        return
    
    print(key)
    if isinstance(contents, bytes):
        print('    ' + hexdump(contents, 'return').replace('\n', '\n    '))
    else:
        print('    ' + repr(contents))


def sigint_handler(signal, frame):
    print('--- [ Report ] ---')
    rows = []

    for field in result:
        d = ''
        for r in result[field]:
            d += repr(r) + '\n'
        rows.append((
            field,
            len(result[field]),
            d
        ))

    print(tabulate.tabulate(rows, headers=['field', 'count', 'data']))
    exit(0)


def on_connected():
    global switcher
    global testqueue
    global prepqueue
    global stats_start

    signal.signal(signal.SIGINT, sigint_handler)

    print("Connection successful")
    model = switcher.mixerstate['product-name']
    print(f"Detected hardware: {model.name}")


def on_disconnected():
    print("Hardware has disconnected")


def run(device):
    global switcher
    print(f"Connecting to {device}...")
    switcher = AtemProtocol(device)
    switcher.on('connected', on_connected)
    switcher.on('disconnected', on_disconnected)
    switcher.on('change', on_change)
    switcher.connect()
    while True:
        switcher.loop()


def main():
    parser = argparse.ArgumentParser(description="Find non-implemented fields in the connected hardware")
    parser.add_argument('device', help="Device ip address or 'usb'")
    args = parser.parse_args()
    run(args.device)


if __name__ == '__main__':
    main()