1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
|
import asyncio
from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.shortcuts import clear
from govee_local_api import GoveeController, GoveeDevice, GoveeLightFeatures
def update_device_callback(device: GoveeDevice) -> None:
# print(f"Goveee device update callback: {device}")
pass
def discovered_callback(device: GoveeDevice, is_new: bool) -> bool:
if is_new:
# print(f"Discovered: {device}. New: {is_new}")
device.set_update_callback(update_device_callback)
return True
async def create_controller(
discovery_enabled: bool, manual_device_ip: str | None = None
) -> GoveeController:
controller = GoveeController(
loop=asyncio.get_event_loop(),
listening_address="0.0.0.0",
discovery_enabled=discovery_enabled,
discovered_callback=discovered_callback,
evicted_callback=lambda device: print(f"Evicted {device}"),
)
await controller.start()
if discovery_enabled:
while not controller.devices:
print("Waiting for devices... ")
await asyncio.sleep(1)
else:
if not manual_device_ip:
raise ValueError(
"Manual device IP must be provided if discovery is disabled."
)
print(f"Discovery not enabled. Adding {manual_device_ip} to discovery.")
controller.add_device_to_discovery_queue(manual_device_ip)
while not controller.devices:
print(f"Waiting for device {manual_device_ip} to be discovered...")
await asyncio.sleep(1)
return controller
async def handle_turn_on(device: GoveeDevice) -> None:
print("Turning on the LED strip...")
await device.turn_on()
async def handle_turn_off(device: GoveeDevice) -> None:
print("Turning off the LED strip...")
await device.turn_off()
async def handle_set_brightness(device: GoveeDevice, session: PromptSession) -> None:
while True:
brightness = await session.prompt_async("Enter brightness (0-100): ")
try:
brightness = int(brightness)
if 0 <= brightness <= 100:
print(f"Setting brightness to {brightness}%")
await device.set_brightness(brightness)
break
else:
print("Please enter a value between 0 and 100.")
except ValueError:
print("Invalid input, please enter an integer.")
async def handle_set_color(device: GoveeDevice, session: PromptSession) -> None:
while True:
color = await session.prompt_async("Enter color (R G B, values 0-255): ")
try:
r, g, b = map(int, color.split())
if all(0 <= v <= 255 for v in [r, g, b]):
print(f"Setting color to RGB({r}, {g}, {b})")
await device.set_rgb_color(r, g, b)
break
else:
print("RGB values must be between 0 and 255.")
except ValueError:
print("Invalid input, please enter three integers separated by spaces.")
async def handle_set_segment_color(device: GoveeDevice, session: PromptSession) -> None:
if device.capabilities.features & GoveeLightFeatures.SEGMENT_CONTROL == 0:
print("This device does not support segment control.")
return
while True:
segment = await session.prompt_async(
f"Enter segment number (1-{device.capabilities.segments_count}): "
)
try:
segment = int(segment)
if 1 <= segment <= device.capabilities.segments_count:
color = await session.prompt_async(
"Enter segment color (R G B, values 0-255): "
)
r, g, b = map(int, color.split())
if all(0 <= v <= 255 for v in [r, g, b]):
print(f"Setting segment {segment} color to RGB({r}, {g}, {b})")
await device.set_segment_rgb_color(segment, r, g, b)
break
else:
print("RGB values must be between 0 and 255.")
else:
print(
f"Segment number must be between 1 and {device.capabilities.segments_count}."
)
except ValueError:
print(
"Invalid input. Please enter an integer for the segment and three integers for the color."
)
async def handle_set_scene(device: GoveeDevice, session: PromptSession):
while True:
scenes = device.capabilities.available_scenes
for idx, scene in enumerate(scenes):
print(f"{idx}: {scene}")
scene = await session.prompt_async(f"Enter scene number (0-{len(scenes)}): ")
scene_name = scenes[int(scene)]
if scene_name in device.capabilities.available_scenes:
print(f"Setting scene to {scene}")
await device.set_scene(scene_name)
break
else:
print("Invalid scene. Please choose from the available scenes.")
async def handle_clear_screen(device):
clear()
async def handle_exit(device):
print("Exiting...")
raise SystemExit()
async def devices_menu(session, devices: list[GoveeDevice]) -> GoveeDevice:
selection = None
devices = sorted(devices, key=lambda d: d.sku)
while (
selection is None
or selection == ""
or not selection.isnumeric()
or int(selection) >= len(devices)
):
for idx, device in enumerate(devices):
print(f"{idx}: {device.ip} - {device.sku}")
selection = await session.prompt_async(
f"Choose an option (0-{len(devices) - 1}): "
)
if not selection.isnumeric():
continue
print(f"Selected device: {selection}")
return devices[int(selection)]
async def handle_send_hex(device: GoveeDevice, session: PromptSession) -> None:
hex_data = await session.prompt_async("Enter hex data: ")
await device.send_raw_command(hex_data)
async def handle_manual_device(
device: GoveeDevice, controller: GoveeController, session: PromptSession
) -> None:
ip = await session.prompt_async("Enter device IP: ")
controller.add_device_to_discovery_queue(ip)
async def menu(device: GoveeDevice) -> None:
print("\nDevice: ", device)
print("Select an option:")
print("0. Exit")
print("1. Turn on")
print("2. Turn off")
print("3. Set brightness (0-100)")
print("4. Set color (R G B)")
print("5. Set segment color (Segment, R G B)")
print("6. Set scene")
print("7. Send raw hex")
print("8. Clear screen")
print("9. Clear Device")
print("10. Add device")
async def repl() -> None:
session = PromptSession()
print("Welcome to the LED Control REPL.")
# Dictionary of command handlers
command_handlers = {
"0": handle_exit,
"1": handle_turn_on,
"2": handle_turn_off,
"3": lambda device: handle_set_brightness(device, session),
"4": lambda device: handle_set_color(device, session),
"5": lambda device: handle_set_segment_color(device, session),
"6": lambda device: handle_set_scene(device, session),
"7": lambda device: handle_send_hex(device, session),
"8": handle_clear_screen,
"10": lambda device: handle_manual_device(device, controller, session),
}
enable_discovery = await session.prompt_async("Enable device discovery? (y/n): ")
discovery_enabled: bool = enable_discovery.lower() == "y"
selected_device: GoveeDevice | None = None
manual_device_ip = None
if not discovery_enabled:
manual_device_ip = await session.prompt_async("Enter device IP: ")
controller: GoveeController = await create_controller(
discovery_enabled, manual_device_ip
)
while True:
if not selected_device:
selected_device = await devices_menu(session, controller.devices)
await menu(selected_device)
with patch_stdout():
user_choice = await session.prompt_async(
f"Choose an option (0-{len(command_handlers)}): "
)
if user_choice == "9":
selected_device = None
continue
# Get the command handler from the dictionary, or return an invalid message if not found
handler = command_handlers.get(user_choice)
if handler:
await handler(selected_device) # Call the appropriate command handler
else:
print("Invalid option, please try again.")
async def main():
await repl()
if __name__ == "__main__":
try:
asyncio.run(main())
except (EOFError, KeyboardInterrupt):
print("REPL exited.")
|