1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
|
#!/usr/bin/python3
import argparse
import atexit
import cmd
import os
import shutil
import sys
import tempfile
import time
PYLIB = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'pylib')
sys.path.insert(1, PYLIB)
import openrazer._fake_driver as fake_driver
class FakeDevicePrompt(cmd.Cmd):
def __init__(self, device_map, *args, **kwargs):
super(FakeDevicePrompt, self).__init__(*args, **kwargs)
self._device_map = device_map
self._current_device = None
self._ep = {}
self._read = []
self._write = []
# If only 1 device, auto use that
if len(self._device_map) == 1:
self._change_device(list(self._device_map.keys())[0])
else:
self._change_device(None)
def _change_device(self, device_name=None):
if device_name is not None:
self._current_device = device_name
self.prompt = self._current_device + "> "
for endpoint, details in self._device_map[self._current_device].endpoints.items():
self._ep[endpoint] = details[2]
self._read = [endpoint for endpoint, perm in self._ep.items()]
self._write = [endpoint for endpoint, perm in self._ep.items() if perm in ('w', 'rw')]
else:
self._current_device = None
self.prompt = "> "
def do_dev(self, arg):
"""
Change current device
"""
if arg in self._device_map:
if arg is None or len(arg) == 0:
print('Need to specify a device name. One of: {0}'.format(','.join(self._device_map.keys())))
else:
self._change_device(arg)
else:
print('Invalid device name: {0}'.format(arg))
def complete_dev(self, text, line, begidx, endidx):
if not text:
completions = list(self._device_map.keys())
else:
completions = [item for item in list(self._device_map.keys()) if item.startswith(text)]
return completions
def do_list(self, arg):
"""List available device files"""
if self._current_device is not None:
print('Device files')
print('------------')
for endpoint, permission in self._ep.items():
if permission in ('r', 'rw'):
print(" {0:-<2}- {1}".format(permission, endpoint))
else:
print(" {0:->2}- {1}".format(permission, endpoint))
print()
print('Event files')
print('-----------')
for event_id, event_value in sorted(self._device_map[self._current_device].events.items(), key=lambda x: x[0]):
print(" {0: >2} {1}".format(event_id, event_value[0]))
else:
print('Devices')
print('-------')
for device in list(self._device_map.keys()):
print(' {0}'.format(device))
def do_ls(self, arg):
"""List available device files"""
self.do_list(arg)
def do_read(self, arg, binary=False):
"""Read ASCII from given device file"""
if self._current_device is not None:
if arg in self._ep:
result = self._device_map[self._current_device].get(arg, binary=binary)
print(result)
elif arg in self._ep:
print('Device endpoint not readable')
else:
print("Device endpoint not found")
def do_binary_read(self, arg):
"""Read binary from given device file"""
self.do_read(arg, binary=True)
def complete_read(self, text, line, begidx, endidx):
if not text:
completions = self._read
else:
completions = [item for item in self._read if item.startswith(text)]
return completions
complete_binary_read = complete_read
def do_write(self, arg):
"""Write ASCII to device file. DEVICE_FILE DATA"""
if self._current_device is not None:
try:
device_file, data = arg.split(' ', 1)
if device_file in self._ep:
if len(data) > 0:
self._device_map[self._current_device].set(device_file, data)
print("{0}: {1}".format(device_file, self._device_map[self._current_device].get(device_file)))
else:
print("Device endpoint not found")
except ValueError:
print("Must specify a device endpoint then a space then data to write")
def complete_write(self, text, line, begidx, endidx):
if not text:
completions = self._write
else:
completions = [item for item in self._write if item.startswith(text)]
return completions
def do_event(self, arg):
"""Emit an event, format: EVENT_ID KEY_ID STATE
Where state in 'up' 'down' and 'repeat'
"""
if self._current_device is not None:
try:
event_file, key_id, value = arg.split(' ')
except ValueError:
print("Usage: event event_file key_id value")
return
if event_file not in self._device_map[self._current_device].events:
print("Event ID {0} is invalid".format(event_file))
else:
try:
bytes_written = self._device_map[self._current_device].emit_kb_event(event_file, int(key_id), value)
print("Wrote {0} bytes to {1}".format(bytes_written, self._device_map[self._current_device].events[event_file][0]))
except ValueError as err:
print("Caught exception: {0}".format(err))
def do_exit(self, arg):
"""Exit"""
if self._current_device is not None:
self._change_device(None)
return False
else:
return True
def do_EOF(self, arg):
"""Press Ctrl+D to exit"""
self.do_exit(arg)
def create_envionment(device_name, destination):
os.makedirs(destination, exist_ok=True)
try:
fake_device = fake_driver.FakeDevice(device_name, tmp_dir=destination)
return fake_device
except ValueError:
print('Device {0}.cfg not found'.format(device_name))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('device', metavar='DEVICE', nargs='*', help='Device config name')
parser.add_argument('--dest', metavar='DESTDIR', required=False, default=None, help='Directory to create driver files in. If omitted then a tmp directory is used')
parser.add_argument('--all', action='store_true', help='Create all possible fake devices')
parser.add_argument('--non-interactive', dest='interactive', action='store_false', help='Dont display prompt, just hang until killed')
parser.add_argument('--clear-dest', action='store_true', help='Clear the destination folder if it exists before starting')
return parser.parse_args()
def run():
args = parse_args()
if args.dest is None:
destination = tempfile.mkdtemp(prefix='tmp_', suffix='_fakerazer')
else:
destination = args.dest
if args.clear_dest and os.path.exists(destination):
shutil.rmtree(destination, ignore_errors=True)
if args.all:
devices = fake_driver.SPECS
else:
devices = args.device
device_map = {}
for device in devices:
# Device name: FakeDriver
fake_device = create_envionment(device, destination)
if fake_device is not None:
device_map[device] = fake_device
if len(device_map) == 0:
print("ERROR: No valid devices passed to script, you either need to pass devices as arguments or use '--all'")
sys.exit(1)
# Register cleanup
if args.dest is None:
atexit.register(lambda: shutil.rmtree(destination, ignore_errors=True))
else:
for device in device_map.values():
# device = FakeDriver
atexit.register(device.close)
print("Device test directory: {0}".format(destination))
try:
if not args.interactive:
print("Sleeping forever, use Ctrl-C to exit...")
while True:
time.sleep(99999999)
else:
FakeDevicePrompt(device_map).cmdloop()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
run()
|