1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
#!/usr/bin/env python3
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import psutil
import time
import os
import argparse
import logging
from datetime import datetime
from kasa import SmartStrip
class KasaPlugController():
"""Provides control of a device's charger.
The device's charger must be plugged into one of the 3 outlets of a Kasa Smart
Plug Power Strip (KP303). The outlet name must match the device's host name
(this is intended to prevent inadvertently controlling the wrong device's
charger).
"""
def __init__(self, kasa_power_strip_ip: str):
"""Constructs a KasaPlugController to control the current device's charger.
Args:
kasa_power_strip_ip: IP of the Kasak Smart Plug Power Strip in which
this device's charger is connected.
"""
# The outlet name must match the device's host name.
self.kasa_outlet_name = os.uname()[1].split('.')[0]
# Create the event loop
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# Create the strip controller
self.strip = SmartStrip(kasa_power_strip_ip)
self.loop.run_until_complete(self.strip.update())
self.closed = False
def __del__(self):
self.close()
def turn_on(self):
"""Turns on this device's charger.
"""
logging.info("Turning on the charger")
for plug in self.strip.children:
if plug.alias == self.kasa_outlet_name:
self.loop.run_until_complete(plug.turn_on())
return
logging.error("Cannot find device to turn on")
def turn_off(self):
"""Turns off this device's charger.
"""
logging.info("Turning off the charger")
for plug in self.strip.children:
if plug.alias == self.kasa_outlet_name:
self.loop.run_until_complete(plug.turn_off())
battery = psutil.sensors_battery()
while battery.power_plugged:
logging.info("Waiting for device to no longer be plugged in")
time.sleep(1)
battery = psutil.sensors_battery()
return
logging.error("Cannot find device to turn off")
def discharge_to(self, level: int):
"""Discharges the battery until it reaches a target level.
Args:
level: The target battery level.
"""
self.turn_off()
battery = psutil.sensors_battery()
while battery.percent > level:
logging.info(f"Waiting to discharge to {level}%."
f" Currently at {battery.percent}%")
# Perform arbitrary operations as fast as possible to burn
# CPU and discharge faster.
f_value = 0.81
start = datetime.now()
while ((datetime.now() - start).total_seconds() < 10):
f_value = f_value * 1.7272882
f_value = f_value / 1.7272882
battery = psutil.sensors_battery()
logging.info(f"Discharge to {level}% complete")
def charge_to(self, level: int):
"""Charges the battery until it reaches a target level.
Args:
level: The target battery level.
"""
self.turn_on()
battery = psutil.sensors_battery()
while battery.percent < level:
logging.info(f"Waiting to charge to {level}%."
f" Currently at {battery.percent}%")
time.sleep(10)
battery = psutil.sensors_battery()
logging.info(f"Charge to {level}% complete")
def charge_or_discharge_to(self, level: int):
"""Charges or discharges the battery until it reaches a target level.
Leaves the charger in an unplugged state.
Args:
level: The target battery level.
"""
battery = psutil.sensors_battery()
if battery.percent < level:
self.charge_to(level)
elif battery.percent > level:
self.discharge_to(level)
else:
logging.info(f"Battery is already at the target level {level}%")
self.turn_off()
def close(self):
"""Closes the message loop."""
if self.closed:
return
self.closed = True
self.loop.close()
def get_plug_controller(ip: str):
return KasaPlugController(ip)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Controls kasa power switch connected to this device.')
parser.add_argument("--kasa_power_strip_ip",
required=True,
help="IP address of the kasa power switch.")
parser.add_argument("--charge_level",
type=int,
required=True,
help="Desired charge level.")
args = parser.parse_args()
kasa_plug_controller = KasaPlugController(args.kasa_power_strip_ip)
kasa_plug_controller.charge_to(args.charge_level)
kasa_plug_controller.close()
|