Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wallbox interaction #116

Merged
merged 5 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TCP_IP = '192.168.1.57'
USERNAME = '[email protected]'
PASS = 'MySecurePassword'
KEY = 'abc123'
CONFIG = {}
CONFIG = {}
# CONFIG = {"powermeters": [{"index": 6}]}

print("local connection")
Expand All @@ -96,7 +96,7 @@ from e3dc import E3DC
USERNAME = '[email protected]'
PASS = 'MySecurePassword'
SERIALNUMBER = 'S10-012345678910'
CONFIG = {}
CONFIG = {}

print("web connection")
e3dc_obj = E3DC(E3DC.CONNECT_WEB, username=USERNAME, password=PASS, serialNumber = SERIALNUMBER, isPasswordMd5=False, configuration = CONFIG)
Expand Down Expand Up @@ -148,9 +148,19 @@ Poll returns a dictionary like the following:
- `get_powermeter_data()`
- `get_powermeters_data()`
- `get_power_settings()`
- `get_wallbox_data()`
- `set_battery_to_car_mode()`
- `set_power_limits()`
- `set_powersave()`
- `set_wallbox_max_charge_current()`
- `set_wallbox_schuko()`
- `set_wallbox_sunmode()`
- `set_weather_regulated_charge()`
- `toggle_wallbox_charging()`
- `toggle_wallbox_phases()`

- `sendWallboxRequest()`
- `sendWallboxSetRequest()`

See the full documentation on [ReadTheDocs](https://python-e3dc.readthedocs.io/en/latest/)

Expand Down
290 changes: 290 additions & 0 deletions e3dc/_e3dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import datetime
import hashlib
import struct
import time
import uuid
from calendar import monthrange
Expand Down Expand Up @@ -981,6 +982,295 @@ def get_system_status(self, keepAlive: bool = False):
outObj = {k: SystemStatusBools[v] for k, v in outObj.items()}
return outObj

def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False):
"""Polls the wallbox status via rscp protocol locally.

Args:
wbIndex (Optional[int]): Index of the wallbox to poll data for
keepAlive (Optional[bool]): True to keep connection alive

Returns:
dict: Dictionary containing the wallbox status structured as follows::

{
"appSoftware": <version of the app>,
"batteryToCar": <true if the wallbox may use the battery, otherwise false>,
"chargingActive": <true if charging is currently active, otherwise false>,
"chargingCanceled": <true if charging was manually canceled, otherwise false>,
"consumptionNet": <power currently consumed by the wallbox, provided by the grid in watts>,
"consumptionSun": <power currently consumed by the wallbox, provided by the solar panels in watts>,
"energyAll": <total consumed energy this month in watthours>,
"energyNet": <consumed net energy this month in watthours>,
"energySun": <consumed solar energy this month in watthours>,
"index": <index of the requested wallbox>,
"keyState": <state of the key switch at the wallbox>,
"maxChargeCurrent": <configured maximum charge current in A>,
"phases": <number of phases used for charging>,
"schukoOn": <true if the connected schuko of the wallbox is on, otherwise false>,
"soc": <state of charge>,
"sunModeOn": <true if sun-only-mode is active, false if mixed mode is active>
}
"""
req = self.sendRequest(
(
RscpTag.WB_REQ_DATA,
RscpType.Container,
[
(RscpTag.WB_INDEX, RscpType.UChar8, wbIndex),
(RscpTag.WB_REQ_EXTERN_DATA_ALG, RscpType.NoneType, None),
(RscpTag.WB_REQ_EXTERN_DATA_SUN, RscpType.NoneType, None),
(RscpTag.WB_REQ_EXTERN_DATA_NET, RscpType.NoneType, None),
(RscpTag.WB_REQ_APP_SOFTWARE, RscpType.NoneType, None),
(RscpTag.WB_REQ_KEY_STATE, RscpType.NoneType, None),
],
),
keepAlive=True,
)

outObj = {
"index": rscpFindTagIndex(req, RscpTag.WB_INDEX),
"appSoftware": rscpFindTagIndex(req, RscpTag.WB_APP_SOFTWARE),
}

extern_data_alg = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_ALG)
if extern_data_alg is not None:
extern_data = rscpFindTagIndex(extern_data_alg, RscpTag.WB_EXTERN_DATA)
status_byte = extern_data[2]
outObj["sunModeOn"] = (status_byte & 128) != 0
outObj["chargingCanceled"] = (status_byte & 64) != 0
outObj["chargingActive"] = (status_byte & 32) != 0
outObj["plugLocked"] = (status_byte & 16) != 0
outObj["plugged"] = (status_byte & 8) != 0
outObj["soc"] = extern_data[0]
outObj["phases"] = extern_data[1]
outObj["maxChargeCurrent"] = extern_data[3]
outObj["schukoOn"] = extern_data[5] != 0

extern_data_sun = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_SUN)
if extern_data_sun is not None:
extern_data = rscpFindTagIndex(extern_data_sun, RscpTag.WB_EXTERN_DATA)
outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0]
outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0]

extern_data_net = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_NET)
if extern_data_net is not None:
extern_data = rscpFindTagIndex(extern_data_net, RscpTag.WB_EXTERN_DATA)
outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0]
outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0]

if "energySun" in outObj and "energyNet" in outObj:
outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"]

key_state = rscpFindTag(req, RscpTag.WB_KEY_STATE)
if key_state is not None:
outObj["keyState"] = rscpFindTagIndex(key_state, RscpTag.WB_KEY_STATE)

req = self.sendRequest(
(RscpTag.EMS_REQ_BATTERY_TO_CAR_MODE, RscpType.NoneType, None),
keepAlive=keepAlive,
)
battery_to_car = rscpFindTag(req, RscpTag.EMS_BATTERY_TO_CAR_MODE)
if battery_to_car is not None:
outObj["batteryToCar"] = rscpFindTagIndex(
battery_to_car, RscpTag.EMS_BATTERY_TO_CAR_MODE
)

outObj = {k: v for k, v in sorted(outObj.items())}
return outObj

def set_wallbox_sunmode(
self, enable: bool, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the sun mode of the wallbox via rscp protocol locally.

Args:
enable (bool): True to enable sun mode, otherwise false,
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=0, value=1 if enable else 2, wbIndex=wbIndex, keepAlive=keepAlive
)

def set_wallbox_schuko(
self, on: bool, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the Schuko of the wallbox via rscp protocol locally.

Args:
on (bool): True to activate the Schuko, otherwise false
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success (wallbox has understood the request, but might have ignored an unsupported value)
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=5, value=1 if on else 0, wbIndex=wbIndex, keepAlive=keepAlive
)

def set_wallbox_max_charge_current(
self, max_charge_current: int, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the maximum charge current of the wallbox via rscp protocol locally.

Args:
max_charge_current (int): maximum allowed charge current in A
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success (wallbox has understood the request, but might have clipped the value)
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=2,
value=max_charge_current,
request=RscpTag.WB_REQ_SET_PARAM_1,
wbIndex=wbIndex,
keepAlive=keepAlive,
)

def toggle_wallbox_charging(
self, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Toggles charging of the wallbox via rscp protocol locally.

Args:
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=4, value=1, wbIndex=wbIndex, keepAlive=keepAlive
)

def toggle_wallbox_phases(self, wbIndex: int = 0, keepAlive: bool = False) -> bool:
"""Toggles the number of phases used for charging by the wallbox between 1 and 3 via rscp protocol locally.

Args:
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=3, value=1, wbIndex=wbIndex, keepAlive=keepAlive
)

def sendWallboxRequest(
self,
dataIndex: int,
value: int,
request: RscpTag = RscpTag.WB_REQ_SET_EXTERN,
wbIndex: int = 0,
keepAlive: bool = False,
) -> Tuple[str | int | RscpTag, str | int | RscpType, Any]:
"""Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally.

Args:
dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5)
value (int): byte value to be set in the WB_EXTERN_DATA array at the given index
request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2),
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
An object with the received data
"""
dataArray = bytearray([0, 0, 0, 0, 0, 0])
dataArray[dataIndex] = value
result = self.sendRequest(
(
RscpTag.WB_REQ_DATA,
RscpType.Container,
[
(RscpTag.WB_INDEX, RscpType.UChar8, wbIndex),
(
request,
RscpType.Container,
[
(RscpTag.WB_EXTERN_DATA, RscpType.ByteArray, dataArray),
(
RscpTag.WB_EXTERN_DATA_LEN,
RscpType.UChar8,
len(dataArray),
),
],
),
],
),
keepAlive=keepAlive,
)
return result

def sendWallboxSetRequest(
self,
dataIndex: int,
value: int,
request: RscpTag = RscpTag.WB_REQ_SET_EXTERN,
wbIndex: int = 0,
keepAlive: bool = False,
) -> bool:
"""Sends a low-level set request with WB_EXTERN_DATA to the wallbox via rscp protocol locally and evaluates the response.

Args:
dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5)
value (int): byte value to be set in the WB_EXTERN_DATA array at the given index
request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2),
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success
False if error
"""
response = self.sendWallboxRequest(
dataIndex, value, request, wbIndex, keepAlive
)

if response[0] != RscpTag.WB_DATA.name:
return False
responseData = response[2][-1]
return (
responseData[0][2:] == request.name[6:]
and responseData[1] != RscpType.Error.name
)

def set_battery_to_car_mode(self, enabled: bool, keepAlive: bool = False):
"""Sets whether the wallbox may use the battery.

Args:
enabled (bool): True to enable charging the car using the battery
keepAlive (Optional[bool]): True to keep connection alive

Returns:
True if success
False if error
"""
enabledValue = 1 if enabled else 0

response = self.sendRequest(
(RscpTag.EMS_REQ_SET_BATTERY_TO_CAR_MODE, RscpType.UChar8, enabledValue),
keepAlive=keepAlive,
)

return response == (
RscpTag.EMS_SET_BATTERY_TO_CAR_MODE.name,
RscpType.UChar8.name,
enabledValue,
)

def get_batteries(self, keepAlive: bool = False):
"""Scans for installed batteries via rscp protocol.

Expand Down