1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
"""Simple test script to check inverter UDP protocol communication"""
import asyncio
import logging
import sys
from importlib.metadata import version
# Force the local files, not pip installed lib
sys.path.insert(0, '../goodwe')
sys.path.insert(0, '../../goodwe')
import goodwe
logging.basicConfig(
format="%(asctime)-15s %(funcName)s(%(lineno)d) - %(levelname)s: %(message)s",
stream=sys.stderr,
level=getattr(logging, "DEBUG", None),
)
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
module_ver = None
try:
module_ver = version('goodwe')
except ModuleNotFoundError:
pass
if module_ver:
print("WARNING !!!")
print("==============================")
print(f"You are executing code with installed pip version goodwe:{module_ver}")
print("You are not testing the local files, if that was what you meant !!!")
print("==============================")
# Set the appropriate IP address
IP_ADDRESS = "192.168.2.14"
PORT = 8899
FAMILY = "ET" # One of ET, EH, ES, EM, DT, NS, XS or None to detect family automatically
COMM_ADDR = 0xf7 # Usually 0xf7 for ET/EH/EM/ES or 0x7f for DT/D-NS/XS, or None for default value
TIMEOUT = 1
RETRIES = 3
inverter = asyncio.run(goodwe.connect(IP_ADDRESS, PORT, FAMILY, COMM_ADDR, TIMEOUT, RETRIES))
print(f"Identified inverter\n"
f"- Model: {inverter.model_name}\n"
f"- SerialNr: {inverter.serial_number}\n"
f"- Rated power: {inverter.rated_power}\n"
f"- A/C output type: {inverter.ac_output_type}\n"
f"- Firmware: {inverter.firmware}\n"
f"- ARM firmware: {inverter.arm_firmware}\n"
f"- Modbus version: {inverter.modbus_version}\n"
f"- DSP1 version: {inverter.dsp1_version}\n"
f"- DSP2 version: {inverter.dsp2_version}\n"
f"- DSP svn version: {inverter.dsp_svn_version}\n"
f"- Arm version: {inverter.arm_version}\n"
f"- ARM svn version: {inverter.arm_svn_version}\n"
)
# -----------------
# Read runtime data
# -----------------
response = asyncio.run(inverter.read_runtime_data())
for sensor in inverter.sensors():
if sensor.id_ in response:
print(f"{sensor.id_}: \t\t {sensor.name} = {response[sensor.id_]} {sensor.unit}")
# -------------
# Read settings
# -------------
# response = asyncio.run(inverter.read_settings_data())
# for setting in inverter.settings():
# if setting.id_ in response:
# print(f"{setting.id_}: \t\t {setting.name} = {response[setting.id_]} {setting.unit}")
# -----------------
# Set inverter time
# -----------------
# print(asyncio.run(inverter.read_setting('time')))
# asyncio.run(inverter.write_setting('time', datetime.datetime.now()))
# print(asyncio.run(inverter.read_setting('time')))
# ------------------------------
# Set inverter grid export limit
# ------------------------------
# print(asyncio.run(inverter.get_grid_export_limit()))
# asyncio.run(inverter.set_grid_export_limit(4000))
# print(asyncio.run(inverter.get_grid_export_limit()))
# ---------------------------
# Set inverter operation mode
# ---------------------------
# print(asyncio.run(inverter.get_operation_mode()))
# asyncio.run(inverter.set_operation_mode(goodwe.OperationMode.BACKUP))
# print(asyncio.run(inverter.get_operation_mode()))
# --------------------
# Set inverter setting
# --------------------
# print(asyncio.run(inverter.read_setting('grid_export_limit')))
# asyncio.run(inverter.write_setting('grid_export_limit', 4000))
# print(asyncio.run(inverter.read_setting('grid_export_limit')))
# --------------------
# Get inverter modbus setting
# --------------------
# print(asyncio.run(inverter.read_setting('modbus-47000')))
# -------------------------------
# Execute modbus RTU protocol command
# -------------------------------
# response = asyncio.run(goodwe.protocol.ModbusRtuReadCommand(COMM_ADDR, 0x88b8, 0x21).execute(
# goodwe.protocol.UdpInverterProtocol(IP_ADDRESS, PORT, TIMEOUT, RETRIES)))
# print(response)
# -------------------------------
# Execute modbus TCP protocol command
# -------------------------------
# response = asyncio.run(goodwe.protocol.ModbusTcpReadCommand(180, 301, 3).execute(
# goodwe.protocol.TcpInverterProtocol('192.168.1.13', 502, TIMEOUT, RETRIES)))
# print(response.response_data().hex())
# -------------------------------
# Execute AA55 protocol command
# -------------------------------
# response = asyncio.run(goodwe.protocol.Aa55ProtocolCommand("010200", "0182").execute(
# goodwe.protocol.UdpInverterProtocol(IP_ADDRESS, PORT, TIMEOUT, RETRIES)))
# print(response)
# -----------------
# Test parallel requests
#
# async def run_in_parallel(inverter):
# a, b, c, = await asyncio.gather(inverter.get_grid_export_limit(), inverter.get_ongrid_battery_dod(),
# inverter.read_runtime_data())
# print(a)
# print(b)
# print(c)
#
# asyncio.run(run_in_parallel(inverter))
|