File: dt.py

package info (click to toggle)
python-goodwe 0.4.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 724 kB
  • sloc: python: 6,081; makefile: 9; sh: 5
file content (293 lines) | stat: -rw-r--r-- 13,663 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
from __future__ import annotations

import logging
from typing import Tuple

from .exceptions import InverterError, RequestFailedException, RequestRejectedException
from .inverter import Inverter
from .inverter import OperationMode
from .inverter import SensorKind as Kind
from .modbus import ILLEGAL_DATA_ADDRESS
from .model import is_3_mppt, is_single_phase
from .protocol import ProtocolCommand
from .sensor import *

logger = logging.getLogger(__name__)


class DT(Inverter):
    """Class representing inverter of DT/MS/D-NS/XS or GE's GEP(PSB/PSC) families"""

    __all_sensors: Tuple[Sensor, ...] = (
        Timestamp("timestamp", 30100, "Timestamp"),
        Voltage("vpv1", 30103, "PV1 Voltage", Kind.PV),
        Current("ipv1", 30104, "PV1 Current", Kind.PV),
        Calculated("ppv1",
                   lambda data: round(read_voltage(data, 30103) * read_current(data, 30104)),
                   "PV1 Power", "W", Kind.PV),
        Voltage("vpv2", 30105, "PV2 Voltage", Kind.PV),
        Current("ipv2", 30106, "PV2 Current", Kind.PV),
        Calculated("ppv2",
                   lambda data: round(read_voltage(data, 30105) * read_current(data, 30106)),
                   "PV2 Power", "W", Kind.PV),
        Voltage("vpv3", 30107, "PV3 Voltage", Kind.PV),
        Current("ipv3", 30108, "PV3 Current", Kind.PV),
        Calculated("ppv3",
                   lambda data: round(read_voltage(data, 30107) * read_current(data, 30108)),
                   "PV3 Power", "W", Kind.PV),
        # ppv1 + ppv2 + ppv3
        Calculated("ppv",
                   lambda data: (round(read_voltage(data, 30103) * read_current(data, 30104))) + (round(
                       read_voltage(data, 30105) * read_current(data, 30106))) + (round(
                       read_voltage(data, 30107) * read_current(data, 30108))),
                   "PV Power", "W", Kind.PV),
        # Voltage("vpv4", 14, "PV4 Voltage", Kind.PV),
        # Current("ipv4", 16, "PV4 Current", Kind.PV),
        # Voltage("vpv5", 14, "PV5 Voltage", Kind.PV),
        # Current("ipv5", 16, "PV5 Current", Kind.PV),
        # Voltage("vpv6", 14, "PV6 Voltage", Kind.PV),
        # Current("ipv6", 16, "PV7 Current", Kind.PV),
        Voltage("vline1", 30115, "On-grid L1-L2 Voltage", Kind.AC),
        Voltage("vline2", 30116, "On-grid L2-L3 Voltage", Kind.AC),
        Voltage("vline3", 30117, "On-grid L3-L1 Voltage", Kind.AC),
        Voltage("vgrid1", 30118, "On-grid L1 Voltage", Kind.AC),
        Voltage("vgrid2", 30119, "On-grid L2 Voltage", Kind.AC),
        Voltage("vgrid3", 30120, "On-grid L3 Voltage", Kind.AC),
        Current("igrid1", 30121, "On-grid L1 Current", Kind.AC),
        Current("igrid2", 30122, "On-grid L2 Current", Kind.AC),
        Current("igrid3", 30123, "On-grid L3 Current", Kind.AC),
        Frequency("fgrid1", 30124, "On-grid L1 Frequency", Kind.AC),
        Frequency("fgrid2", 30125, "On-grid L2 Frequency", Kind.AC),
        Frequency("fgrid3", 30126, "On-grid L3 Frequency", Kind.AC),
        Calculated("pgrid1",
                   lambda data: round(read_voltage(data, 30118) * read_current(data, 30121)),
                   "On-grid L1 Power", "W", Kind.AC),
        Calculated("pgrid2",
                   lambda data: round(read_voltage(data, 30119) * read_current(data, 30122)),
                   "On-grid L2 Power", "W", Kind.AC),
        Calculated("pgrid3",
                   lambda data: round(read_voltage(data, 30120) * read_current(data, 30123)),
                   "On-grid L3 Power", "W", Kind.AC),
        # 30127 reserved
        PowerS("total_inverter_power", 30128, "Total Power", Kind.AC),
        Integer("work_mode", 30129, "Work Mode code"),
        Enum2("work_mode_label", 30129, WORK_MODES, "Work Mode"),
        Long("error_codes", 30130, "Error Codes"),
        Integer("warning_code", 30132, "Warning code"),
        Apparent4("apparent_power", 30133, "Apparent Power", Kind.AC),
        Reactive4("reactive_power", 30135, "Reactive Power", Kind.AC),
        # 30137 reserved
        # 30138 reserved
        Decimal("power_factor", 30139, 1000, "Power Factor", "", Kind.GRID),
        # 30140 reserved
        Temp("temperature", 30141, "Inverter Temperature", Kind.AC),
        # 30142 reserved
        # 30143 reserved
        Energy("e_day", 30144, "Today's PV Generation", Kind.PV),
        Energy4("e_total", 30145, "Total PV Generation", Kind.PV),
        Long("h_total", 30147, "Hours Total", "h", Kind.PV),
        Integer("safety_country", 30149, "Safety Country code", "", Kind.AC),
        Enum2("safety_country_label", 30149, SAFETY_COUNTRIES, "Safety Country", Kind.AC),
        # 30150 reserved
        # 30151 reserved
        # 30152 reserved
        # 30153 reserved
        # 30154 reserved
        # 30155 reserved
        # 30156 reserved
        # 30157 reserved
        # 30158 reserved
        # 30159 reserved
        # 30160 reserved
        # 30161 reserved
        Integer("funbit", 30162, "FunBit", "", Kind.PV),
        Voltage("vbus", 30163, "Bus Voltage", Kind.PV),
        Voltage("vnbus", 30164, "NBus Voltage", Kind.PV),
        Long("derating_mode", 30165, "Derating Mode code"),
        EnumBitmap4("derating_mode_label", 30165, DERATING_MODE_CODES, "Derating Mode"),
        # 30167 reserved
        # 30168 reserved
        # 30169 reserved
        # 30170 reserved
        # 30171 reserved
        # 30172 reserved
    )

    # Inverter's meter data
    # Modbus registers from offset 0x75f4 (30196)
    __all_sensors_meter: Tuple[Sensor, ...] = (
        PowerS("active_power", 30196, "Active Power", Kind.GRID),
    )

    # Modbus registers of inverter settings, offsets are modbus register addresses
    __all_settings: Tuple[Sensor, ...] = (
        Timestamp("time", 40313, "Inverter time"),

        Integer("shadow_scan", 40326, "Shadow Scan", "", Kind.PV),
        Integer("grid_export", 40327, "Grid Export Enabled", "", Kind.GRID),
        Integer("grid_export_limit", 40328, "Grid Export Limit", "%", Kind.GRID),
        Integer("start", 40330, "Start / Power On", "", Kind.GRID),
        Integer("stop", 40331, "Stop / Power Off", "", Kind.GRID),
        Integer("restart", 40332, "Restart", "", Kind.GRID),
        Integer("grid_export_hw", 40345, "Grid Export Enabled (HW)", "", Kind.GRID),
    )

    # Settings for single phase inverters
    __settings_single_phase: Tuple[Sensor, ...] = (
        Long("grid_export_limit", 40328, "Grid Export Limit", "W", Kind.GRID),
    )

    # Settings for three phase inverters
    __settings_three_phase: Tuple[Sensor, ...] = (
        Integer("grid_export_limit", 40336, "Grid Export Limit", "%", Kind.GRID),
    )

    def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, retries: int = 3):
        super().__init__(host, port, comm_addr if comm_addr else 0x7f, timeout, retries)
        self._READ_DEVICE_VERSION_INFO: ProtocolCommand = self._read_command(0x7531, 0x0028)
        self._READ_RUNNING_DATA: ProtocolCommand = self._read_command(0x7594, 0x0049)
        self._READ_METER_DATA: ProtocolCommand = self._read_command(0x75f4, 0x01)
        self._sensors = self.__all_sensors
        self._sensors_meter = self.__all_sensors_meter
        self._settings: dict[str, Sensor] = {s.id_: s for s in self.__all_settings}
        self._has_meter: bool = True

    @staticmethod
    def _single_phase_only(s: Sensor) -> bool:
        """Filter to exclude phase2/3 sensors on single phase inverters"""
        return not ((s.id_.endswith('2') or s.id_.endswith('3')) and 'pv' not in s.id_)

    @staticmethod
    def _pv1_pv2_only(s: Sensor) -> bool:
        """Filter to exclude sensors on < 3 PV inverters"""
        return not s.id_.endswith('pv3')

    async def read_device_info(self):
        response = await self._read_from_socket(self._READ_DEVICE_VERSION_INFO)
        response = response.response_data()
        try:
            self.model_name = response[22:32].decode("ascii").rstrip()
        except:
            print("No model name sent from the inverter.")
        # Modbus registers from 30001 - 30040
        self.serial_number = self._decode(response[6:22])  # 30004 - 30012
        self.dsp1_version = read_unsigned_int(response, 66)  # 30034
        self.dsp2_version = read_unsigned_int(response, 68)  # 30035
        self.arm_version = read_unsigned_int(response, 70)  # 30036
        self.dsp_svn_version = read_unsigned_int(response, 72)  # 35037
        self.arm_svn_version = read_unsigned_int(response, 74)  # 35038
        self.firmware = "{}.{}.{:02x}".format(self.dsp1_version, self.dsp2_version, self.arm_version)

        if is_single_phase(self):
            # this is single phase inverter, filter out all L2 and L3 sensors
            self._sensors = tuple(filter(self._single_phase_only, self.__all_sensors))
            self._settings.update({s.id_: s for s in self.__settings_single_phase})
        else:
            self._settings.update({s.id_: s for s in self.__settings_three_phase})

        if is_3_mppt(self):
            # this is 3 PV strings inverter, keep all sensors
            pass
        else:
            # this is only 2 PV strings inverter
            self._sensors = tuple(filter(self._pv1_pv2_only, self._sensors))
        pass

    async def read_runtime_data(self) -> Dict[str, Any]:
        response = await self._read_from_socket(self._READ_RUNNING_DATA)
        data = self._map_response(response, self._sensors)

        if self._has_meter:
            try:
                response = await self._read_from_socket(self._READ_METER_DATA)
                data.update(self._map_response(response, self._sensors_meter))
            except (RequestRejectedException, RequestFailedException):
                logger.info("Meter values not supported, disabling further attempts.")
                self._has_meter = False

        return data

    async def read_setting(self, setting_id: str) -> Any:
        setting = self._settings.get(setting_id)
        if setting:
            return await self._read_setting(setting)
        else:
            if setting_id.startswith("modbus"):
                response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1))
                return int.from_bytes(response.read(2), byteorder="big", signed=True)
            else:
                raise ValueError(f'Unknown setting "{setting_id}"')

    async def _read_setting(self, setting: Sensor) -> Any:
        try:
            count = (setting.size_ + (setting.size_ % 2)) // 2
            response = await self._read_from_socket(self._read_command(setting.offset, count))
            return setting.read_value(response)
        except RequestRejectedException as ex:
            if ex.message == ILLEGAL_DATA_ADDRESS:
                logger.debug("Unsupported setting %s", setting.id_)
                self._settings.pop(setting.id_, None)
                raise ValueError(f'Unknown setting "{setting.id_}"')
            return None

    async def write_setting(self, setting_id: str, value: Any):
        setting = self._settings.get(setting_id)
        if setting:
            await self._write_setting(setting, value)
        else:
            if setting_id.startswith("modbus"):
                await self._read_from_socket(self._write_command(int(setting_id[7:]), int(value)))
            else:
                raise ValueError(f'Unknown setting "{setting_id}"')

    async def _write_setting(self, setting: Sensor, value: Any):
        if setting.size_ == 1:
            # modbus can address/store only 16 bit values, read the other 8 bytes
            response = await self._read_from_socket(self._read_command(setting.offset, 1))
            raw_value = setting.encode_value(value, response.response_data()[0:2])
        else:
            raw_value = setting.encode_value(value)
        if len(raw_value) <= 2:
            value = int.from_bytes(raw_value, byteorder="big", signed=True)
            await self._read_from_socket(self._write_command(setting.offset, value))
        else:
            await self._read_from_socket(self._write_multi_command(setting.offset, raw_value))

    async def read_settings_data(self) -> Dict[str, Any]:
        data = {}
        for setting in self.settings():
            value = await self.read_setting(setting.id_)
            data[setting.id_] = value
        return data

    async def get_grid_export_limit(self) -> int:
        return await self.read_setting('grid_export_limit')

    async def set_grid_export_limit(self, export_limit: int) -> None:
        if export_limit >= 0:
            return await self.write_setting('grid_export_limit', export_limit)

    async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]:
        return ()

    async def get_operation_mode(self) -> OperationMode:
        raise InverterError("Operation not supported.")

    async def set_operation_mode(self, operation_mode: OperationMode, eco_mode_power: int = 100,
                                 eco_mode_soc: int = 100) -> None:
        raise InverterError("Operation not supported.")

    async def get_ongrid_battery_dod(self) -> int:
        raise InverterError("Operation not supported, inverter has no batteries.")

    async def set_ongrid_battery_dod(self, dod: int) -> None:
        raise InverterError("Operation not supported, inverter has no batteries.")

    def sensors(self) -> Tuple[Sensor, ...]:
        result = self._sensors
        if self._has_meter:
            result = result + self._sensors_meter
        return result

    def settings(self) -> Tuple[Sensor, ...]:
        return tuple(self._settings.values())