Skip to content
Merged
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2b1487f
Update bat.py
cr0i Mar 19, 2025
6b8f74d
Update bat.py
cr0i Mar 19, 2025
06011e4
Update bat.py
cr0i Mar 20, 2025
013148d
Update bat.py
cr0i Mar 20, 2025
ed46ac4
Update bat.py
cr0i Mar 20, 2025
f03e949
Update bat.py
cr0i Mar 20, 2025
fd82c83
Update bat.py
cr0i Mar 20, 2025
1592b92
Update bat.py
cr0i Mar 23, 2025
3dfc8cb
Update bat.py
cr0i Mar 23, 2025
fc4faa1
Update bat.py
cr0i Mar 23, 2025
a670195
Update bat.py
cr0i Apr 9, 2025
8036b6f
Update bat.py
cr0i Apr 9, 2025
6c6ca07
Update bat.py
cr0i Apr 18, 2025
7502503
Update bat.py
cr0i Apr 18, 2025
1ccf078
Update bat.py
cr0i Apr 18, 2025
0fbc9fb
Update bat.py
cr0i Apr 23, 2025
b214938
Update bat.py
cr0i Apr 24, 2025
8be8891
Update bat.py
cr0i Apr 24, 2025
f4afef9
Merge branch 'master' into solaredge-speichersteuerung-mit-battery_index
cr0i Apr 24, 2025
29e495f
Update bat.py
cr0i Apr 24, 2025
340fe10
Update bat.py
cr0i Jun 2, 2025
e08c6c1
Update bat.py
cr0i Jun 4, 2025
539e4e0
Update bat.py
cr0i Jun 4, 2025
104f2f6
Update bat.py
cr0i Jun 5, 2025
4341014
Update bat.py
cr0i Jun 5, 2025
8eff0a7
Update bat.py
cr0i Jun 5, 2025
962bd90
Update bat.py
cr0i Jun 5, 2025
3133f93
Update bat.py
cr0i Jun 5, 2025
c8920ca
Update bat.py
cr0i Jun 6, 2025
bc91903
Update bat.py
cr0i Jun 6, 2025
13e54c9
Merge branch 'master' into solaredge-speichersteuerung-mit-battery_index
cr0i Jun 6, 2025
125d340
Update bat.py
cr0i Jun 6, 2025
0abae5a
Update bat.py
cr0i Jun 9, 2025
2c7dbb7
Update bat.py
cr0i Jun 9, 2025
4ca3830
Update bat.py
cr0i Jun 9, 2025
f8e5b96
Update bat.py
cr0i Jun 9, 2025
175dbbc
Update power_limit sign
cr0i Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 183 additions & 2 deletions packages/modules/devices/solaredge/solaredge/bat.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#!/usr/bin/env python3
import logging
from typing import Any, Tuple, TypedDict

from typing import Any, TypedDict, Dict, Union, Optional, Tuple


from pymodbus.constants import Endian
import pymodbus


from control import data


from modules.common import modbus
from modules.common.abstract_device import AbstractBat
Expand All @@ -17,6 +24,11 @@
log = logging.getLogger(__name__)

FLOAT32_UNSUPPORTED = -0xffffff00000000000000000000000000
MAX_DISCHARGE_LIMIT = 5000
DEFAULT_CONTROL_MODE = 1 # Control Mode Max Eigenverbrauch
REMOTE_CONTROL_MODE = 4 # Control Mode Remotesteuerung
DEFAULT_COMMAND_MODE = 0 # Command Mode ohne Steuerung
ACTIVE_COMMAND_MODE = 7 # Command Mode Max Eigenverbrauch bei Steuerung


class KwargsDict(TypedDict):
Expand All @@ -25,6 +37,19 @@ class KwargsDict(TypedDict):


class SolaredgeBat(AbstractBat):
# Define all possible registers with their data types
REGISTERS = {
"Battery1StateOfEnergy": (0xe184, ModbusDataType.FLOAT_32,), # Mirror: 0xf584
"Battery1InstantaneousPower": (0xe174, ModbusDataType.FLOAT_32,), # Mirror: 0xf574
"Battery2StateOfEnergy": (0xe284, ModbusDataType.FLOAT_32,),
"Battery2InstantaneousPower": (0xe274, ModbusDataType.FLOAT_32,),
"StorageControlMode": (0xe004, ModbusDataType.UINT_16,),
"StorageBackupReserved": (0xe008, ModbusDataType.FLOAT_32,),
"StorageChargeDischargeDefaultMode": (0xe00a, ModbusDataType.UINT_16,),
"RemoteControlCommandMode": (0xe00d, ModbusDataType.UINT_16,),
"RemoteControlCommandDischargeLimit": (0xe010, ModbusDataType.FLOAT_32,),
}

def __init__(self, component_config: SolaredgeBatSetup, **kwargs: Any) -> None:
self.component_config = component_config
self.kwargs: KwargsDict = kwargs
Expand All @@ -35,6 +60,9 @@ def initialize(self) -> None:
self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
self.store = get_bat_value_store(self.component_config.id)
self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
self.min_soc = 8
self.StorageControlMode_Read = DEFAULT_CONTROL_MODE
self.last_mode = 'undefined'

def update(self) -> None:
self.store.set(self.read_state())
Expand Down Expand Up @@ -77,14 +105,167 @@ def get_values(self) -> Tuple[float, float]:
power_reg, ModbusDataType.FLOAT_32, wordorder=Endian.Little, unit=unit
)

# Handle unsupported FLOAT32 case
# Handle unsupported case
if power == FLOAT32_UNSUPPORTED:
power = 0
if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
log.warning(f"Invalid SoC Speicher{battery_index}: {soc}")
else:
self.min_soc = min(int(soc), int(self.min_soc))
log.debug(f"Min-SoC Speicher{battery_index}: {int(self.min_soc)}%.")

return power, soc

def get_imported_exported(self, power: float) -> Tuple[float, float]:
return self.sim_counter.sim_count(power)

def set_power_limit(self, power_limit: Optional[int]) -> None:
unit = self.component_config.configuration.modbus_id
# Use 1 as fallback if battery_index is not set
battery_index = getattr(self.component_config.configuration, "battery_index", 1)

try:
power_limit_mode = data.data.bat_all_data.data.config.power_limit_mode
except AttributeError:
log.warning("power_limit_mode not found, assuming 'no_limit'")
power_limit_mode = 'no_limit'

if power_limit_mode == 'no_limit' and self.last_mode != 'limited':
"""
Keine Speichersteuerung, andere Steuerungen zulassen (SolarEdge One, ioBroker, Node-Red etc.).
Falls andere Steuerungen vorhanden sind, sollten diese nicht beeinflusst werden,
daher erfolgt im Modus "Immer" der Speichersteuerung keine Steuerung.
"""
return

if power_limit is None:
# Keine Ladung mit Speichersteuerung.
if self.last_mode == 'limited':
# Steuerung deaktivieren.
log.debug(f"Speicher{battery_index}:Keine Steuerung gefordert, Steuerung deaktivieren.")
values_to_write = {
"RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
"StorageChargeDischargeDefaultMode": DEFAULT_COMMAND_MODE,
"RemoteControlCommandMode": DEFAULT_COMMAND_MODE,
"StorageControlMode": self.StorageControlMode_Read,
}
self._write_registers(values_to_write, unit)
self.last_mode = None
else:
return

elif abs(power_limit) >= 0:
"""
Ladung mit Speichersteuerung.
SolarEdge entlaedt den Speicher immer nur bis zur SoC-Reserve.
Steuerung beenden, wenn der SoC vom Speicher die SoC-Reserve unterschreitet.
"""
registers_to_read = [
f"Battery{battery_index}StateOfEnergy",
"StorageControlMode",
"StorageBackupReserved",
"RemoteControlCommandDischargeLimit",
]
try:
values = self._read_registers(registers_to_read, unit)
except pymodbus.exceptions.ModbusException as e:
log.error(f"Failed to read registers: {e}")
self.fault_state.error(f"Modbus read error: {e}")
return
soc = values[f"Battery{battery_index}StateOfEnergy"]
if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
log.warning(f"Speicher{battery_index}: Invalid SoC: {soc}")
soc_reserve = max(int(self.min_soc + 2), int(values["StorageBackupReserved"]))
log.debug(f"SoC-Reserve Speicher{battery_index}: {int(soc_reserve)}%.")
discharge_limit = int(values["RemoteControlCommandDischargeLimit"])

if values["StorageControlMode"] == REMOTE_CONTROL_MODE: # Speichersteuerung ist aktiv.
if soc_reserve > soc:
# Speichersteuerung erst deaktivieren, wenn SoC-Reserve unterschritten wird.
# Darf wegen 2 Speichern nicht bereits bei SoC-Reserve deaktiviert werden!
log.debug(f"Speicher{battery_index}: Steuerung deaktivieren. SoC-Reserve unterschritten")
values_to_write = {
"RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
"StorageChargeDischargeDefaultMode": DEFAULT_COMMAND_MODE,
"RemoteControlCommandMode": DEFAULT_COMMAND_MODE,
"StorageControlMode": self.StorageControlMode_Read,
}
self._write_registers(values_to_write, unit)
self.last_mode = None

elif discharge_limit not in range(int(abs(power_limit)) - 10, int(abs(power_limit)) + 10):
# Limit nur bei Abweichung von mehr als 10W, um Konflikte bei 2 Speichern zu verhindern.
log.debug(f"Discharge-Limit Speicher{battery_index}: {int(abs(power_limit))}W.")
values_to_write = {
"RemoteControlCommandDischargeLimit": int(min(abs(power_limit), MAX_DISCHARGE_LIMIT))
}
self._write_registers(values_to_write, unit)
self.last_mode = 'limited'

else: # Speichersteuerung ist inaktiv.
if soc_reserve < soc:
# Speichersteuerung nur aktivieren, wenn SoC ueber SoC-Reserve.
log.debug(f"Discharge-Limit aktivieren, Speicher{battery_index}: {int(abs(power_limit))}W.")
self.StorageControlMode_Read = values["StorageControlMode"]
values_to_write = {
"StorageControlMode": REMOTE_CONTROL_MODE,
"StorageChargeDischargeDefaultMode": ACTIVE_COMMAND_MODE,
"RemoteControlCommandMode": ACTIVE_COMMAND_MODE,
"RemoteControlCommandDischargeLimit": int(min(abs(power_limit), MAX_DISCHARGE_LIMIT))
}
self._write_registers(values_to_write, unit)
self.last_mode = 'limited'

def _read_registers(self, register_names: list, unit: int) -> Dict[str, Union[int, float]]:
values = {}
for key in register_names:
address, data_type = self.REGISTERS[key]
try:
values[key] = self.__tcp_client.read_holding_registers(
address, data_type, wordorder=Endian.Little, unit=unit
)
except pymodbus.exceptions.ModbusException as e:
log.error(f"Failed to read register {key} at address {address}: {e}")
self.fault_state.error(f"Modbus read error: {e}")
values[key] = 0 # Fallback value
log.debug(f"Bat raw values {self.__tcp_client.address}: {values}")
return values
# TODO: Optimize to read multiple contiguous registers in a single request if supported by ModbusTcpClient_

def _write_registers(self, values_to_write: Dict[str, Union[int, float]], unit: int) -> None:
for key, value in values_to_write.items():
address, data_type = self.REGISTERS[key]
encoded_value = self._encode_value(value, data_type)
try:
self.__tcp_client.write_registers(address, encoded_value, unit=unit)
log.debug(f"Neuer Wert {encoded_value} in Register {address} geschrieben.")
except pymodbus.exceptions.ModbusException as e:
log.error(f"Failed to write register {key} at address {address}: {e}")
self.fault_state.error(f"Modbus write error: {e}")

def _encode_value(self, value: Union[int, float], data_type: ModbusDataType) -> list:
builder = pymodbus.payload.BinaryPayloadBuilder(
byteorder=pymodbus.constants.Endian.Big,
wordorder=pymodbus.constants.Endian.Little
)
encode_methods = {
ModbusDataType.UINT_32: builder.add_32bit_uint,
ModbusDataType.INT_32: builder.add_32bit_int,
ModbusDataType.UINT_16: builder.add_16bit_uint,
ModbusDataType.INT_16: builder.add_16bit_int,
ModbusDataType.FLOAT_32: builder.add_32bit_float,
}
if data_type in encode_methods:
if data_type == ModbusDataType.FLOAT_32:
encode_methods[data_type](float(value))
else:
encode_methods[data_type](int(value))
else:
raise ValueError(f"Unsupported data type: {data_type}")
return builder.to_registers()
Comment on lines +246 to +265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hast Du probiert, ob die Werte auch ohne vorheriges Encodieren geschrieben werden? Eigentlich macht pymodbus das.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hast Du probiert, ob die Werte auch ohne vorheriges Encodieren geschrieben werden? Eigentlich macht pymodbus das.

Ich habe das aus der SMA Speichersteuerung abgekupfert, ich brauche bei SolarEdge aber wordorder Endian.Little, das konnte ich nur an der Stelle einbauen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, das stimmt. Kannst Du den Code bitte in die common/mdobus.py verschieben? Der ist ja nicht SolarEdge-spezifisch und kann auch von anderen Modulen genutzt werden.

Copy link
Contributor Author

@cr0i cr0i Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ich habe ja die wordorder Little fest hinterlegt, die müsste man dann vermutlich besser per übergebenem Parameter einrichten oder nur für die Kombination eine eigene def (byteorder Big, wordorder Little) anlegen?
Das übersteigt meine Copy&Paste Fähigkeiten, ich könnte nochmal im Forum fragen, ob jemand helfen kann.


def power_limit_controllable(self) -> bool:
return True


component_descriptor = ComponentDescriptor(configuration_factory=SolaredgeBatSetup)