From 00e6ef41097fa8c91adad8a6aa2e6b31cb8ad892 Mon Sep 17 00:00:00 2001 From: Eiko Wagenknecht Date: Mon, 2 Oct 2023 20:09:25 +0200 Subject: [PATCH] Update RSCP tags and move to enum Signed-off-by: Eiko Wagenknecht --- e3dc/_e3dc.py | 836 +++--- e3dc/_e3dc_rscp_local.py | 9 +- e3dc/_e3dc_rscp_web.py | 176 +- e3dc/_rscpLib.py | 117 +- e3dc/_rscpTags.py | 5751 ++++++++++++++++++++++++-------------- 5 files changed, 4320 insertions(+), 2569 deletions(-) diff --git a/e3dc/_e3dc.py b/e3dc/_e3dc.py index 786fb0a..154eead 100644 --- a/e3dc/_e3dc.py +++ b/e3dc/_e3dc.py @@ -22,7 +22,7 @@ ) from ._e3dc_rscp_web import E3DC_RSCP_web from ._rscpLib import rscpFindTag, rscpFindTagIndex -from ._rscpTags import getPowermeterType +from ._rscpTags import RscpTag, RscpType, getPowermeterType REMOTE_ADDRESS = "https://s10.e3dc.com/s10/phpcmd/cmd.php" REQUEST_INTERVAL_SEC = 10 # minimum interval between requests @@ -383,23 +383,23 @@ def poll_rscp(self, keepAlive=False): ): return self.lastRequest - ts = self.sendRequestTag("INFO_REQ_UTC_TIME", keepAlive=True) - soc = self.sendRequestTag("EMS_REQ_BAT_SOC", keepAlive=True) - solar = self.sendRequestTag("EMS_REQ_POWER_PV", keepAlive=True) - add = self.sendRequestTag("EMS_REQ_POWER_ADD", keepAlive=True) - bat = self.sendRequestTag("EMS_REQ_POWER_BAT", keepAlive=True) - home = self.sendRequestTag("EMS_REQ_POWER_HOME", keepAlive=True) - grid = self.sendRequestTag("EMS_REQ_POWER_GRID", keepAlive=True) - wb = self.sendRequestTag("EMS_REQ_POWER_WB_ALL", keepAlive=True) + ts = self.sendRequestTag(RscpTag.INFO_REQ_UTC_TIME, keepAlive=True) + soc = self.sendRequestTag(RscpTag.EMS_REQ_BAT_SOC, keepAlive=True) + solar = self.sendRequestTag(RscpTag.EMS_REQ_POWER_PV, keepAlive=True) + add = self.sendRequestTag(RscpTag.EMS_REQ_POWER_ADD, keepAlive=True) + bat = self.sendRequestTag(RscpTag.EMS_REQ_POWER_BAT, keepAlive=True) + home = self.sendRequestTag(RscpTag.EMS_REQ_POWER_HOME, keepAlive=True) + grid = self.sendRequestTag(RscpTag.EMS_REQ_POWER_GRID, keepAlive=True) + wb = self.sendRequestTag(RscpTag.EMS_REQ_POWER_WB_ALL, keepAlive=True) sc = round( - self.sendRequestTag("EMS_REQ_SELF_CONSUMPTION", keepAlive=True), + self.sendRequestTag(RscpTag.EMS_REQ_SELF_CONSUMPTION, keepAlive=True), 2, ) # last call, use keepAlive value autarky = round( - self.sendRequestTag("EMS_REQ_AUTARKY", keepAlive=keepAlive), + self.sendRequestTag(RscpTag.EMS_REQ_AUTARKY, keepAlive=keepAlive), 2, ) @@ -440,10 +440,11 @@ def poll_switches(self, keepAlive=False): self.rscp.connect() switchDesc = self.sendRequest( - ("HA_REQ_DATAPOINT_LIST", "None", None), keepAlive=True + (RscpTag.HA_REQ_DATAPOINT_LIST, RscpType.NoneType, None), keepAlive=True ) switchStatus = self.sendRequest( - ("HA_REQ_ACTUATOR_STATES", "None", None), keepAlive=keepAlive + (RscpTag.HA_REQ_ACTUATOR_STATES, RscpType.NoneType, None), + keepAlive=keepAlive, ) descList = switchDesc[2] # get the payload of the container @@ -452,10 +453,12 @@ def poll_switches(self, keepAlive=False): switchList = [] for switch in range(len(descList)): - switchID = rscpFindTagIndex(descList[switch], "HA_DATAPOINT_INDEX") - switchType = rscpFindTagIndex(descList[switch], "HA_DATAPOINT_TYPE") - switchName = rscpFindTagIndex(descList[switch], "HA_DATAPOINT_NAME") - switchStatus = rscpFindTagIndex(statusList[switch], "HA_DATAPOINT_STATE") + switchID = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_INDEX) + switchType = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_TYPE) + switchName = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_NAME) + switchStatus = rscpFindTagIndex( + statusList[switch], RscpTag.HA_DATAPOINT_STATE + ) switchList.append( { "id": switchID, @@ -483,17 +486,17 @@ def set_switch_onoff(self, switchID, value, keepAlive=False): result = self.sendRequest( ( - "HA_REQ_COMMAND_ACTUATOR", - "Container", + RscpTag.HA_REQ_COMMAND_ACTUATOR, + RscpType.Container, [ - ("HA_DATAPOINT_INDEX", "Uint16", switchID), - ("HA_REQ_COMMAND", "CString", cmd), + (RscpTag.HA_DATAPOINT_INDEX, RscpType.Uint16, switchID), + (RscpTag.HA_REQ_COMMAND, RscpType.CString, cmd), ], ), keepAlive=keepAlive, ) - if result[0] == "HA_COMMAND_ACTUATOR" and result[2]: + if result[0] == RscpTag.HA_COMMAND_ACTUATOR and result[2]: return True else: return False # operation did not succeed @@ -556,7 +559,7 @@ def sendRequestTag(self, tag, retries=3, keepAlive=False): e3dc.SendError: if retries are reached """ return self.sendRequest( - (tag, "None", None), retries=retries, keepAlive=keepAlive + (tag, RscpType.NoneType, None), retries=retries, keepAlive=keepAlive )[2] def get_idle_periods(self, keepAlive=False): @@ -606,24 +609,25 @@ def get_idle_periods(self, keepAlive=False): } """ idlePeriodsRaw = self.sendRequest( - ("EMS_REQ_GET_IDLE_PERIODS", "None", None), keepAlive=keepAlive + (RscpTag.EMS_REQ_GET_IDLE_PERIODS, RscpType.NoneType, None), + keepAlive=keepAlive, ) - if idlePeriodsRaw[0] != "EMS_GET_IDLE_PERIODS": + if idlePeriodsRaw[0] != RscpTag.EMS_GET_IDLE_PERIODS: return None idlePeriods = {"idleCharge": [None] * 7, "idleDischarge": [None] * 7} # initialize for period in idlePeriodsRaw[2]: - active = rscpFindTagIndex(period, "EMS_IDLE_PERIOD_ACTIVE") - typ = rscpFindTagIndex(period, "EMS_IDLE_PERIOD_TYPE") - day = rscpFindTagIndex(period, "EMS_IDLE_PERIOD_DAY") - start = rscpFindTag(period, "EMS_IDLE_PERIOD_START") - startHour = rscpFindTagIndex(start, "EMS_IDLE_PERIOD_HOUR") - startMin = rscpFindTagIndex(start, "EMS_IDLE_PERIOD_MINUTE") - end = rscpFindTag(period, "EMS_IDLE_PERIOD_END") - endHour = rscpFindTagIndex(end, "EMS_IDLE_PERIOD_HOUR") - endMin = rscpFindTagIndex(end, "EMS_IDLE_PERIOD_MINUTE") + active = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_ACTIVE) + typ = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_TYPE) + day = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_DAY) + start = rscpFindTag(period, RscpTag.EMS_IDLE_PERIOD_START) + startHour = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_HOUR) + startMin = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_MINUTE) + end = rscpFindTag(period, RscpTag.EMS_IDLE_PERIOD_END) + endHour = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_HOUR) + endMin = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_MINUTE) periodObj = { "day": day, "start": [startHour, startMin], @@ -755,52 +759,52 @@ def set_idle_periods(self, idlePeriods, keepAlive=False): ) < (idlePeriod["end"][0] * 60 + idlePeriod["end"][1]): periodList.append( ( - "EMS_IDLE_PERIOD", - "Container", + RscpTag.EMS_IDLE_PERIOD, + RscpType.Container, [ ( - "EMS_IDLE_PERIOD_TYPE", - "UChar8", + RscpTag.EMS_IDLE_PERIOD_TYPE, + RscpType.UChar8, self._IDLE_TYPE[idle_type], ), ( - "EMS_IDLE_PERIOD_DAY", - "UChar8", + RscpTag.EMS_IDLE_PERIOD_DAY, + RscpType.UChar8, idlePeriod["day"], ), ( - "EMS_IDLE_PERIOD_ACTIVE", - "Bool", + RscpTag.EMS_IDLE_PERIOD_ACTIVE, + RscpType.Bool, idlePeriod["active"], ), ( - "EMS_IDLE_PERIOD_START", - "Container", + RscpTag.EMS_IDLE_PERIOD_START, + RscpType.Container, [ ( - "EMS_IDLE_PERIOD_HOUR", - "UChar8", + RscpTag.EMS_IDLE_PERIOD_HOUR, + RscpType.UChar8, idlePeriod["start"][0], ), ( - "EMS_IDLE_PERIOD_MINUTE", - "UChar8", + RscpTag.EMS_IDLE_PERIOD_MINUTE, + RscpType.UChar8, idlePeriod["start"][1], ), ], ), ( - "EMS_IDLE_PERIOD_END", - "Container", + RscpTag.EMS_IDLE_PERIOD_END, + RscpType.Container, [ ( - "EMS_IDLE_PERIOD_HOUR", - "UChar8", + RscpTag.EMS_IDLE_PERIOD_HOUR, + RscpType.UChar8, idlePeriod["end"][0], ), ( - "EMS_IDLE_PERIOD_MINUTE", - "UChar8", + RscpTag.EMS_IDLE_PERIOD_MINUTE, + RscpType.UChar8, idlePeriod["end"][1], ), ], @@ -824,10 +828,11 @@ def set_idle_periods(self, idlePeriods, keepAlive=False): raise TypeError(idle_type + " is not a dict") result = self.sendRequest( - ("EMS_REQ_SET_IDLE_PERIODS", "Container", periodList), keepAlive=keepAlive + (RscpTag.EMS_REQ_SET_IDLE_PERIODS, RscpType.Container, periodList), + keepAlive=keepAlive, ) - if result[0] != "EMS_SET_IDLE_PERIODS" or result[2] != 1: + if result[0] != RscpTag.EMS_SET_IDLE_PERIODS or result[2] != 1: return False return True @@ -863,30 +868,46 @@ def get_db_data_timestamp( response = self.sendRequest( ( - "DB_REQ_HISTORY_DATA_DAY", - "Container", + RscpTag.DB_REQ_HISTORY_DATA_DAY, + RscpType.Container, [ - ("DB_REQ_HISTORY_TIME_START", "Uint64", startTimestamp), - ("DB_REQ_HISTORY_TIME_INTERVAL", "Uint64", timespanSeconds), - ("DB_REQ_HISTORY_TIME_SPAN", "Uint64", timespanSeconds), + ( + RscpTag.DB_REQ_HISTORY_TIME_START, + RscpType.Uint64, + startTimestamp, + ), + ( + RscpTag.DB_REQ_HISTORY_TIME_INTERVAL, + RscpType.Uint64, + timespanSeconds, + ), + ( + RscpTag.DB_REQ_HISTORY_TIME_SPAN, + RscpType.Uint64, + timespanSeconds, + ), ], ), keepAlive=keepAlive, ) outObj = { - "autarky": rscpFindTagIndex(response[2][0], "DB_AUTARKY"), - "bat_power_in": rscpFindTagIndex(response[2][0], "DB_BAT_POWER_IN"), - "bat_power_out": rscpFindTagIndex(response[2][0], "DB_BAT_POWER_OUT"), + "autarky": rscpFindTagIndex(response[2][0], RscpTag.DB_AUTARKY), + "bat_power_in": rscpFindTagIndex(response[2][0], RscpTag.DB_BAT_POWER_IN), + "bat_power_out": rscpFindTagIndex(response[2][0], RscpTag.DB_BAT_POWER_OUT), "consumed_production": rscpFindTagIndex( - response[2][0], "DB_CONSUMED_PRODUCTION" + response[2][0], RscpTag.DB_CONSUMED_PRODUCTION + ), + "consumption": rscpFindTagIndex(response[2][0], RscpTag.DB_CONSUMPTION), + "grid_power_in": rscpFindTagIndex(response[2][0], RscpTag.DB_GRID_POWER_IN), + "grid_power_out": rscpFindTagIndex( + response[2][0], RscpTag.DB_GRID_POWER_OUT ), - "consumption": rscpFindTagIndex(response[2][0], "DB_CONSUMPTION"), - "grid_power_in": rscpFindTagIndex(response[2][0], "DB_GRID_POWER_IN"), - "grid_power_out": rscpFindTagIndex(response[2][0], "DB_GRID_POWER_OUT"), "startTimestamp": startTimestamp, - "stateOfCharge": rscpFindTagIndex(response[2][0], "DB_BAT_CHARGE_LEVEL"), - "solarProduction": rscpFindTagIndex(response[2][0], "DB_DC_POWER"), + "stateOfCharge": rscpFindTagIndex( + response[2][0], RscpTag.DB_BAT_CHARGE_LEVEL + ), + "solarProduction": rscpFindTagIndex(response[2][0], RscpTag.DB_DC_POWER), "timespanSeconds": timespanSeconds, } @@ -955,47 +976,60 @@ def get_system_info_static(self, keepAlive=False): keepAlive (Optional[bool]): True to keep connection alive """ self.deratePercent = round( - self.sendRequestTag("EMS_REQ_DERATE_AT_PERCENT_VALUE", keepAlive=True) * 100 + self.sendRequestTag(RscpTag.EMS_REQ_DERATE_AT_PERCENT_VALUE, keepAlive=True) + * 100 ) self.deratePower = self.sendRequestTag( - "EMS_REQ_DERATE_AT_POWER_VALUE", keepAlive=True + RscpTag.EMS_REQ_DERATE_AT_POWER_VALUE, keepAlive=True ) self.installedPeakPower = self.sendRequestTag( - "EMS_REQ_INSTALLED_PEAK_POWER", keepAlive=True + RscpTag.EMS_REQ_INSTALLED_PEAK_POWER, keepAlive=True ) self.externalSourceAvailable = self.sendRequestTag( - "EMS_REQ_EXT_SRC_AVAILABLE", keepAlive=True + RscpTag.EMS_REQ_EXT_SRC_AVAILABLE, keepAlive=True + ) + self.macAddress = self.sendRequestTag( + RscpTag.INFO_REQ_MAC_ADDRESS, keepAlive=True ) - self.macAddress = self.sendRequestTag("INFO_REQ_MAC_ADDRESS", keepAlive=True) if ( not self.serialNumber ): # do not send this for a web connection because it screws up the handshake! self._set_serial( - self.sendRequestTag("INFO_REQ_SERIAL_NUMBER", keepAlive=True) + self.sendRequestTag(RscpTag.INFO_REQ_SERIAL_NUMBER, keepAlive=True) ) - sys_specs = self.sendRequestTag("EMS_REQ_GET_SYS_SPECS", keepAlive=keepAlive) + sys_specs = self.sendRequestTag( + RscpTag.EMS_REQ_GET_SYS_SPECS, keepAlive=keepAlive + ) for item in sys_specs: if ( - rscpFindTagIndex(item, "EMS_SYS_SPEC_NAME") + rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "installedBatteryCapacity" ): self.installedBatteryCapacity = rscpFindTagIndex( - item, "EMS_SYS_SPEC_VALUE_INT" + item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) - elif rscpFindTagIndex(item, "EMS_SYS_SPEC_NAME") == "maxAcPower": - self.maxAcPower = rscpFindTagIndex(item, "EMS_SYS_SPEC_VALUE_INT") - elif rscpFindTagIndex(item, "EMS_SYS_SPEC_NAME") == "maxBatChargePower": + elif rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "maxAcPower": + self.maxAcPower = rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_VALUE_INT) + elif ( + rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) == "maxBatChargePower" + ): self.maxBatChargePower = rscpFindTagIndex( - item, "EMS_SYS_SPEC_VALUE_INT" + item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) - elif rscpFindTagIndex(item, "EMS_SYS_SPEC_NAME") == "maxBatDischargPower": + elif ( + rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) + == "maxBatDischargPower" + ): self.maxBatDischargePower = rscpFindTagIndex( - item, "EMS_SYS_SPEC_VALUE_INT" + item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) - elif rscpFindTagIndex(item, "EMS_SYS_SPEC_NAME") == "startDischargeDefault": + elif ( + rscpFindTagIndex(item, RscpTag.EMS_SYS_SPEC_NAME) + == "startDischargeDefault" + ): self.startDischargeDefault = rscpFindTagIndex( - item, "EMS_SYS_SPEC_VALUE_INT" + item, RscpTag.EMS_SYS_SPEC_VALUE_INT ) # EMS_REQ_SPECIFICATION_VALUES @@ -1027,7 +1061,7 @@ def get_system_info(self, keepAlive=False): } """ # use keepAlive setting for last request - sw = self.sendRequestTag("INFO_REQ_SW_RELEASE", keepAlive=keepAlive) + sw = self.sendRequestTag(RscpTag.INFO_REQ_SW_RELEASE, keepAlive=keepAlive) # EMS_EMERGENCY_POWER_STATUS @@ -1080,7 +1114,7 @@ def get_system_status(self, keepAlive=False): } """ # use keepAlive setting for last request - sw = self.sendRequestTag("EMS_REQ_SYS_STATUS", keepAlive=keepAlive) + sw = self.sendRequestTag(RscpTag.EMS_REQ_SYS_STATUS, keepAlive=keepAlive) SystemStatusBools = [bool(int(i)) for i in reversed(list(f"{sw:022b}"))] outObj = { @@ -1197,101 +1231,123 @@ def get_battery_data(self, batIndex=None, dcbs=None, keepAlive=False): req = self.sendRequest( ( - "BAT_REQ_DATA", - "Container", + RscpTag.BAT_REQ_DATA, + RscpType.Container, [ - ("BAT_INDEX", "Uint16", batIndex), - ("BAT_REQ_ASOC", "None", None), - ("BAT_REQ_CHARGE_CYCLES", "None", None), - ("BAT_REQ_CURRENT", "None", None), - ("BAT_REQ_DCB_COUNT", "None", None), - ("BAT_REQ_DESIGN_CAPACITY", "None", None), - ("BAT_REQ_DEVICE_NAME", "None", None), - ("BAT_REQ_DEVICE_STATE", "None", None), - ("BAT_REQ_EOD_VOLTAGE", "None", None), - ("BAT_REQ_ERROR_CODE", "None", None), - ("BAT_REQ_FCC", "None", None), - ("BAT_REQ_MAX_BAT_VOLTAGE", "None", None), - ("BAT_REQ_MAX_CHARGE_CURRENT", "None", None), - ("BAT_REQ_MAX_DISCHARGE_CURRENT", "None", None), - ("BAT_REQ_MAX_DCB_CELL_TEMPERATURE", "None", None), - ("BAT_REQ_MIN_DCB_CELL_TEMPERATURE", "None", None), - ("BAT_REQ_INTERNALS", "None", None), - ("BAT_REQ_MODULE_VOLTAGE", "None", None), - ("BAT_REQ_RC", "None", None), - ("BAT_REQ_READY_FOR_SHUTDOWN", "None", None), - ("BAT_REQ_RSOC", "None", None), - ("BAT_REQ_RSOC_REAL", "None", None), - ("BAT_REQ_STATUS_CODE", "None", None), - ("BAT_REQ_TERMINAL_VOLTAGE", "None", None), - ("BAT_REQ_TOTAL_USE_TIME", "None", None), - ("BAT_REQ_TOTAL_DISCHARGE_TIME", "None", None), - ("BAT_REQ_TRAINING_MODE", "None", None), - ("BAT_REQ_USABLE_CAPACITY", "None", None), - ("BAT_REQ_USABLE_REMAINING_CAPACITY", "None", None), + (RscpTag.BAT_INDEX, RscpType.Uint16, batIndex), + (RscpTag.BAT_REQ_ASOC, RscpType.NoneType, None), + (RscpTag.BAT_REQ_CHARGE_CYCLES, RscpType.NoneType, None), + (RscpTag.BAT_REQ_CURRENT, RscpType.NoneType, None), + (RscpTag.BAT_REQ_DCB_COUNT, RscpType.NoneType, None), + (RscpTag.BAT_REQ_DESIGN_CAPACITY, RscpType.NoneType, None), + (RscpTag.BAT_REQ_DEVICE_NAME, RscpType.NoneType, None), + (RscpTag.BAT_REQ_DEVICE_STATE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_EOD_VOLTAGE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_ERROR_CODE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_FCC, RscpType.NoneType, None), + (RscpTag.BAT_REQ_MAX_BAT_VOLTAGE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_MAX_CHARGE_CURRENT, RscpType.NoneType, None), + ( + RscpTag.BAT_REQ_MAX_DISCHARGE_CURRENT, + RscpType.NoneType, + None, + ), + ( + RscpTag.BAT_REQ_MAX_DCB_CELL_TEMPERATURE, + RscpType.NoneType, + None, + ), + ( + RscpTag.BAT_REQ_MIN_DCB_CELL_TEMPERATURE, + RscpType.NoneType, + None, + ), + (RscpTag.BAT_REQ_INTERNALS, RscpType.NoneType, None), + (RscpTag.BAT_REQ_MODULE_VOLTAGE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_RC, RscpType.NoneType, None), + (RscpTag.BAT_REQ_READY_FOR_SHUTDOWN, RscpType.NoneType, None), + (RscpTag.BAT_REQ_RSOC, RscpType.NoneType, None), + (RscpTag.BAT_REQ_RSOC_REAL, RscpType.NoneType, None), + (RscpTag.BAT_REQ_STATUS_CODE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_TERMINAL_VOLTAGE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_TOTAL_USE_TIME, RscpType.NoneType, None), + (RscpTag.BAT_REQ_TOTAL_DISCHARGE_TIME, RscpType.NoneType, None), + (RscpTag.BAT_REQ_TRAINING_MODE, RscpType.NoneType, None), + (RscpTag.BAT_REQ_USABLE_CAPACITY, RscpType.NoneType, None), + ( + RscpTag.BAT_REQ_USABLE_REMAINING_CAPACITY, + RscpType.NoneType, + None, + ), ], ), keepAlive=True, ) - dcbCount = rscpFindTagIndex(req, "BAT_DCB_COUNT") - deviceStateContainer = rscpFindTag(req, "BAT_DEVICE_STATE") + dcbCount = rscpFindTagIndex(req, RscpTag.BAT_DCB_COUNT) + deviceStateContainer = rscpFindTag(req, RscpTag.BAT_DEVICE_STATE) outObj = { - "asoc": rscpFindTagIndex(req, "BAT_ASOC"), - "chargeCycles": rscpFindTagIndex(req, "BAT_CHARGE_CYCLES"), - "current": round(rscpFindTagIndex(req, "BAT_CURRENT"), 2), + "asoc": rscpFindTagIndex(req, RscpTag.BAT_ASOC), + "chargeCycles": rscpFindTagIndex(req, RscpTag.BAT_CHARGE_CYCLES), + "current": round(rscpFindTagIndex(req, RscpTag.BAT_CURRENT), 2), "dcbCount": dcbCount, "dcbs": {}, - "designCapacity": round(rscpFindTagIndex(req, "BAT_DESIGN_CAPACITY"), 2), + "designCapacity": round( + rscpFindTagIndex(req, RscpTag.BAT_DESIGN_CAPACITY), 2 + ), "deviceConnected": rscpFindTagIndex( - deviceStateContainer, "BAT_DEVICE_CONNECTED" + deviceStateContainer, RscpTag.BAT_DEVICE_CONNECTED ), "deviceInService": rscpFindTagIndex( - deviceStateContainer, "BAT_DEVICE_IN_SERVICE" + deviceStateContainer, RscpTag.BAT_DEVICE_IN_SERVICE ), - "deviceName": rscpFindTagIndex(req, "BAT_DEVICE_NAME"), + "deviceName": rscpFindTagIndex(req, RscpTag.BAT_DEVICE_NAME), "deviceWorking": rscpFindTagIndex( - deviceStateContainer, "BAT_DEVICE_WORKING" + deviceStateContainer, RscpTag.BAT_DEVICE_WORKING ), - "eodVoltage": round(rscpFindTagIndex(req, "BAT_EOD_VOLTAGE"), 2), - "errorCode": rscpFindTagIndex(req, "BAT_ERROR_CODE"), - "fcc": rscpFindTagIndex(req, "BAT_FCC"), + "eodVoltage": round(rscpFindTagIndex(req, RscpTag.BAT_EOD_VOLTAGE), 2), + "errorCode": rscpFindTagIndex(req, RscpTag.BAT_ERROR_CODE), + "fcc": rscpFindTagIndex(req, RscpTag.BAT_FCC), "index": batIndex, - "maxBatVoltage": round(rscpFindTagIndex(req, "BAT_MAX_BAT_VOLTAGE"), 2), + "maxBatVoltage": round( + rscpFindTagIndex(req, RscpTag.BAT_MAX_BAT_VOLTAGE), 2 + ), "maxChargeCurrent": round( - rscpFindTagIndex(req, "BAT_MAX_CHARGE_CURRENT"), 2 + rscpFindTagIndex(req, RscpTag.BAT_MAX_CHARGE_CURRENT), 2 ), "maxDischargeCurrent": round( - rscpFindTagIndex(req, "BAT_MAX_DISCHARGE_CURRENT"), 2 + rscpFindTagIndex(req, RscpTag.BAT_MAX_DISCHARGE_CURRENT), 2 ), "maxDcbCellTemp": round( - rscpFindTagIndex(req, "BAT_MAX_DCB_CELL_TEMPERATURE"), 2 - ), - "measuredResistance": round( - rscpFindTagIndex(req, "BAT_MEASURED_RESISTANCE"), 4 - ), - "measuredResistanceRun": round( - rscpFindTagIndex(req, "BAT_RUN_MEASURED_RESISTANCE"), 4 + rscpFindTagIndex(req, RscpTag.BAT_MAX_DCB_CELL_TEMPERATURE), 2 ), "minDcbCellTemp": round( - rscpFindTagIndex(req, "BAT_MIN_DCB_CELL_TEMPERATURE"), 2 + rscpFindTagIndex(req, RscpTag.BAT_MIN_DCB_CELL_TEMPERATURE), 2 ), - "moduleVoltage": round(rscpFindTagIndex(req, "BAT_MODULE_VOLTAGE"), 2), - "rc": round(rscpFindTagIndex(req, "BAT_RC"), 2), + "moduleVoltage": round( + rscpFindTagIndex(req, RscpTag.BAT_MODULE_VOLTAGE), 2 + ), + "rc": round(rscpFindTagIndex(req, RscpTag.BAT_RC), 2), "readyForShutdown": round( - rscpFindTagIndex(req, "BAT_READY_FOR_SHUTDOWN"), 2 + rscpFindTagIndex(req, RscpTag.BAT_READY_FOR_SHUTDOWN), 2 + ), + "rsoc": round(rscpFindTagIndex(req, RscpTag.BAT_RSOC), 2), + "rsocReal": round(rscpFindTagIndex(req, RscpTag.BAT_RSOC_REAL), 2), + "statusCode": rscpFindTagIndex(req, RscpTag.BAT_STATUS_CODE), + "terminalVoltage": round( + rscpFindTagIndex(req, RscpTag.BAT_TERMINAL_VOLTAGE), 2 + ), + "totalUseTime": rscpFindTagIndex(req, RscpTag.BAT_TOTAL_USE_TIME), + "totalDischargeTime": rscpFindTagIndex( + req, RscpTag.BAT_TOTAL_DISCHARGE_TIME + ), + "trainingMode": rscpFindTagIndex(req, RscpTag.BAT_TRAINING_MODE), + "usuableCapacity": round( + rscpFindTagIndex(req, RscpTag.BAT_USABLE_CAPACITY), 2 ), - "rsoc": round(rscpFindTagIndex(req, "BAT_RSOC"), 2), - "rsocReal": round(rscpFindTagIndex(req, "BAT_RSOC_REAL"), 2), - "statusCode": rscpFindTagIndex(req, "BAT_STATUS_CODE"), - "terminalVoltage": round(rscpFindTagIndex(req, "BAT_TERMINAL_VOLTAGE"), 2), - "totalUseTime": rscpFindTagIndex(req, "BAT_TOTAL_USE_TIME"), - "totalDischargeTime": rscpFindTagIndex(req, "BAT_TOTAL_DISCHARGE_TIME"), - "trainingMode": rscpFindTagIndex(req, "BAT_TRAINING_MODE"), - "usuableCapacity": round(rscpFindTagIndex(req, "BAT_USABLE_CAPACITY"), 2), "usuableRemainingCapacity": round( - rscpFindTagIndex(req, "BAT_USABLE_REMAINING_CAPACITY"), 2 + rscpFindTagIndex(req, RscpTag.BAT_USABLE_REMAINING_CAPACITY), 2 ), } @@ -1301,13 +1357,21 @@ def get_battery_data(self, batIndex=None, dcbs=None, keepAlive=False): for dcb in dcbs: req = self.sendRequest( ( - "BAT_REQ_DATA", - "Container", + RscpTag.BAT_REQ_DATA, + RscpType.Container, [ - ("BAT_INDEX", "Uint16", batIndex), - ("BAT_REQ_DCB_ALL_CELL_TEMPERATURES", "Uint16", dcb), - ("BAT_REQ_DCB_ALL_CELL_VOLTAGES", "Uint16", dcb), - ("BAT_REQ_DCB_INFO", "Uint16", dcb), + (RscpTag.BAT_INDEX, RscpType.Uint16, batIndex), + ( + RscpTag.BAT_REQ_DCB_ALL_CELL_TEMPERATURES, + RscpType.Uint16, + dcb, + ), + ( + RscpTag.BAT_REQ_DCB_ALL_CELL_VOLTAGES, + RscpType.Uint16, + dcb, + ), + (RscpTag.BAT_REQ_DCB_INFO, RscpType.Uint16, dcb), ], ), keepAlive=True @@ -1315,72 +1379,89 @@ def get_battery_data(self, batIndex=None, dcbs=None, keepAlive=False): else keepAlive, # last request should honor keepAlive ) - info = rscpFindTag(req, "BAT_DCB_INFO") + info = rscpFindTag(req, RscpTag.BAT_DCB_INFO) temperatures_raw = rscpFindTagIndex( - rscpFindTag(req, "BAT_DCB_ALL_CELL_TEMPERATURES"), "BAT_DATA" + rscpFindTag(req, RscpTag.BAT_DCB_ALL_CELL_TEMPERATURES), + RscpTag.BAT_DATA, ) temperatures = [] - sensorCount = rscpFindTagIndex(info, "BAT_DCB_NR_SENSOR") + sensorCount = rscpFindTagIndex(info, RscpTag.BAT_DCB_NR_SENSOR) for sensor in range(0, sensorCount): temperatures.append(round(temperatures_raw[sensor][2], 2)) voltages_raw = rscpFindTagIndex( - rscpFindTag(req, "BAT_DCB_ALL_CELL_VOLTAGES"), "BAT_DATA" + rscpFindTag(req, RscpTag.BAT_DCB_ALL_CELL_VOLTAGES), RscpTag.BAT_DATA ) voltages = [] - seriesCellCount = rscpFindTagIndex(info, "BAT_DCB_NR_SERIES_CELL") + seriesCellCount = rscpFindTagIndex(info, RscpTag.BAT_DCB_NR_SERIES_CELL) for cell in range(0, seriesCellCount): voltages.append(round(voltages_raw[cell][2], 2)) dcbobj = { - "current": rscpFindTagIndex(info, "BAT_DCB_CURRENT"), - "currentAvg30s": rscpFindTagIndex(info, "BAT_DCB_CURRENT_AVG_30S"), - "cycleCount": rscpFindTagIndex(info, "BAT_DCB_CYCLE_COUNT"), - "designCapacity": rscpFindTagIndex(info, "BAT_DCB_DESIGN_CAPACITY"), - "designVoltage": rscpFindTagIndex(info, "BAT_DCB_DESIGN_VOLTAGE"), - "deviceName": rscpFindTagIndex(info, "BAT_DCB_DEVICE_NAME"), - "endOfDischarge": rscpFindTagIndex(info, "BAT_DCB_END_OF_DISCHARGE"), - "error": rscpFindTagIndex(info, "BAT_DCB_ERROR"), + "current": rscpFindTagIndex(info, RscpTag.BAT_DCB_CURRENT), + "currentAvg30s": rscpFindTagIndex( + info, RscpTag.BAT_DCB_CURRENT_AVG_30S + ), + "cycleCount": rscpFindTagIndex(info, RscpTag.BAT_DCB_CYCLE_COUNT), + "designCapacity": rscpFindTagIndex( + info, RscpTag.BAT_DCB_DESIGN_CAPACITY + ), + "designVoltage": rscpFindTagIndex(info, RscpTag.BAT_DCB_DESIGN_VOLTAGE), + "deviceName": rscpFindTagIndex(info, RscpTag.BAT_DCB_DEVICE_NAME), + "endOfDischarge": rscpFindTagIndex( + info, RscpTag.BAT_DCB_END_OF_DISCHARGE + ), + "error": rscpFindTagIndex(info, RscpTag.BAT_DCB_ERROR), "fullChargeCapacity": rscpFindTagIndex( - info, "BAT_DCB_FULL_CHARGE_CAPACITY" + info, RscpTag.BAT_DCB_FULL_CHARGE_CAPACITY + ), + "fwVersion": rscpFindTagIndex(info, RscpTag.BAT_DCB_FW_VERSION), + "manufactureDate": rscpFindTagIndex( + info, RscpTag.BAT_DCB_MANUFACTURE_DATE + ), + "manufactureName": rscpFindTagIndex( + info, RscpTag.BAT_DCB_MANUFACTURE_NAME ), - "fwVersion": rscpFindTagIndex(info, "BAT_DCB_FW_VERSION"), - "manufactureDate": rscpFindTagIndex(info, "BAT_DCB_MANUFACTURE_DATE"), - "manufactureName": rscpFindTagIndex(info, "BAT_DCB_MANUFACTURE_NAME"), "maxChargeCurrent": rscpFindTagIndex( - info, "BAT_DCB_MAX_CHARGE_CURRENT" + info, RscpTag.BAT_DCB_MAX_CHARGE_CURRENT ), "maxChargeTemperature": rscpFindTagIndex( - info, "BAT_DCB_CHARGE_HIGH_TEMPERATURE" + info, RscpTag.BAT_DCB_CHARGE_HIGH_TEMPERATURE ), "maxChargeVoltage": rscpFindTagIndex( - info, "BAT_DCB_MAX_CHARGE_VOLTAGE" + info, RscpTag.BAT_DCB_MAX_CHARGE_VOLTAGE ), "maxDischargeCurrent": rscpFindTagIndex( - info, "BAT_DCB_MAX_DISCHARGE_CURRENT" + info, RscpTag.BAT_DCB_MAX_DISCHARGE_CURRENT ), "minChargeTemperature": rscpFindTagIndex( - info, "BAT_DCB_CHARGE_LOW_TEMPERATURE" + info, RscpTag.BAT_DCB_CHARGE_LOW_TEMPERATURE + ), + "parallelCellCount": rscpFindTagIndex( + info, RscpTag.BAT_DCB_NR_PARALLEL_CELL ), - "parallelCellCount": rscpFindTagIndex(info, "BAT_DCB_NR_PARALLEL_CELL"), "sensorCount": sensorCount, "seriesCellCount": seriesCellCount, - "pcbVersion": rscpFindTagIndex(info, "BAT_DCB_PCB_VERSION"), - "protocolVersion": rscpFindTagIndex(info, "BAT_DCB_PROTOCOL_VERSION"), + "pcbVersion": rscpFindTagIndex(info, RscpTag.BAT_DCB_PCB_VERSION), + "protocolVersion": rscpFindTagIndex( + info, RscpTag.BAT_DCB_PROTOCOL_VERSION + ), "remainingCapacity": rscpFindTagIndex( - info, "BAT_DCB_REMAINING_CAPACITY" + info, RscpTag.BAT_DCB_REMAINING_CAPACITY ), - "serialCode": rscpFindTagIndex(info, "BAT_DCB_SERIALCODE"), - "serialNo": rscpFindTagIndex(info, "BAT_DCB_SERIALNO"), - "soc": rscpFindTagIndex(info, "BAT_DCB_SOC"), - "soh": rscpFindTagIndex(info, "BAT_DCB_SOH"), - "status": rscpFindTagIndex(info, "BAT_DCB_STATUS"), + "serialCode": rscpFindTagIndex(info, RscpTag.BAT_DCB_SERIALCODE), + "serialNo": rscpFindTagIndex(info, RscpTag.BAT_DCB_SERIALNO), + "soc": rscpFindTagIndex(info, RscpTag.BAT_DCB_SOC), + "soh": rscpFindTagIndex(info, RscpTag.BAT_DCB_SOH), + "status": rscpFindTagIndex(info, RscpTag.BAT_DCB_STATUS), "temperatures": temperatures, - "voltage": rscpFindTagIndex(info, "BAT_DCB_VOLTAGE"), - "voltageAvg30s": rscpFindTagIndex(info, "BAT_DCB_VOLTAGE_AVG_30S"), + "voltage": rscpFindTagIndex(info, RscpTag.BAT_DCB_VOLTAGE), + "voltageAvg30s": rscpFindTagIndex( + info, RscpTag.BAT_DCB_VOLTAGE_AVG_30S + ), "voltages": voltages, - "warning": rscpFindTagIndex(info, "BAT_DCB_WARNING"), + "warning": rscpFindTagIndex(info, RscpTag.BAT_DCB_WARNING), } outObj["dcbs"][dcb] = dcbobj return outObj @@ -1495,117 +1576,125 @@ def get_pvi_data(self, pviIndex=None, strings=None, phases=None, keepAlive=False req = self.sendRequest( ( - "PVI_REQ_DATA", - "Container", + RscpTag.PVI_REQ_DATA, + RscpType.Container, [ - ("PVI_INDEX", "Uint16", pviIndex), - ("PVI_REQ_AC_MAX_PHASE_COUNT", "None", None), - ("PVI_REQ_TEMPERATURE_COUNT", "None", None), - ("PVI_REQ_DC_MAX_STRING_COUNT", "None", None), - ("PVI_REQ_USED_STRING_COUNT", "None", None), - ("PVI_REQ_TYPE", "None", None), - ("PVI_REQ_SERIAL_NUMBER", "None", None), - ("PVI_REQ_VERSION", "None", None), - ("PVI_REQ_ON_GRID", "None", None), - ("PVI_REQ_STATE", "None", None), - ("PVI_REQ_LAST_ERROR", "None", None), - ("PVI_REQ_COS_PHI", "None", None), - ("PVI_REQ_VOLTAGE_MONITORING", "None", None), - ("PVI_REQ_POWER_MODE", "None", None), - ("PVI_REQ_SYSTEM_MODE", "None", None), - ("PVI_REQ_FREQUENCY_UNDER_OVER", "None", None), - ("PVI_REQ_MAX_TEMPERATURE", "None", None), - ("PVI_REQ_MIN_TEMPERATURE", "None", None), - ("PVI_REQ_AC_MAX_APPARENTPOWER", "None", None), - ("PVI_REQ_DEVICE_STATE", "None", None), + (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), + (RscpTag.PVI_REQ_AC_MAX_PHASE_COUNT, RscpType.NoneType, None), + (RscpTag.PVI_REQ_TEMPERATURE_COUNT, RscpType.NoneType, None), + (RscpTag.PVI_REQ_DC_MAX_STRING_COUNT, RscpType.NoneType, None), + (RscpTag.PVI_REQ_USED_STRING_COUNT, RscpType.NoneType, None), + (RscpTag.PVI_REQ_TYPE, RscpType.NoneType, None), + (RscpTag.PVI_REQ_SERIAL_NUMBER, RscpType.NoneType, None), + (RscpTag.PVI_REQ_VERSION, RscpType.NoneType, None), + (RscpTag.PVI_REQ_ON_GRID, RscpType.NoneType, None), + (RscpTag.PVI_REQ_STATE, RscpType.NoneType, None), + (RscpTag.PVI_REQ_LAST_ERROR, RscpType.NoneType, None), + (RscpTag.PVI_REQ_COS_PHI, RscpType.NoneType, None), + (RscpTag.PVI_REQ_VOLTAGE_MONITORING, RscpType.NoneType, None), + (RscpTag.PVI_REQ_POWER_MODE, RscpType.NoneType, None), + (RscpTag.PVI_REQ_SYSTEM_MODE, RscpType.NoneType, None), + (RscpTag.PVI_REQ_FREQUENCY_UNDER_OVER, RscpType.NoneType, None), + (RscpTag.PVI_REQ_MAX_TEMPERATURE, RscpType.NoneType, None), + (RscpTag.PVI_REQ_MIN_TEMPERATURE, RscpType.NoneType, None), + (RscpTag.PVI_REQ_AC_MAX_APPARENTPOWER, RscpType.NoneType, None), + (RscpTag.PVI_REQ_DEVICE_STATE, RscpType.NoneType, None), ], ), keepAlive=True, ) - maxPhaseCount = int(rscpFindTagIndex(req, "PVI_AC_MAX_PHASE_COUNT")) - maxStringCount = int(rscpFindTagIndex(req, "PVI_DC_MAX_STRING_COUNT")) - usedStringCount = int(rscpFindTagIndex(req, "PVI_USED_STRING_COUNT")) + maxPhaseCount = int(rscpFindTagIndex(req, RscpTag.PVI_AC_MAX_PHASE_COUNT)) + maxStringCount = int(rscpFindTagIndex(req, RscpTag.PVI_DC_MAX_STRING_COUNT)) + usedStringCount = int(rscpFindTagIndex(req, RscpTag.PVI_USED_STRING_COUNT)) - voltageMonitoring = rscpFindTag(req, "PVI_VOLTAGE_MONITORING") - cosPhi = rscpFindTag(req, "PVI_COS_PHI") - frequency = rscpFindTag(req, "PVI_FREQUENCY_UNDER_OVER") - deviceState = rscpFindTag(req, "PVI_DEVICE_STATE") + voltageMonitoring = rscpFindTag(req, RscpTag.PVI_VOLTAGE_MONITORING) + cosPhi = rscpFindTag(req, RscpTag.PVI_COS_PHI) + frequency = rscpFindTag(req, RscpTag.PVI_FREQUENCY_UNDER_OVER) + deviceState = rscpFindTag(req, RscpTag.PVI_DEVICE_STATE) outObj = { "acMaxApparentPower": rscpFindTagIndex( - rscpFindTag(req, "PVI_AC_MAX_APPARENTPOWER"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_AC_MAX_APPARENTPOWER), RscpTag.PVI_VALUE ), "cosPhi": { - "active": rscpFindTagIndex(cosPhi, "PVI_COS_PHI_IS_AKTIV"), - "value": rscpFindTagIndex(cosPhi, "PVI_COS_PHI_VALUE"), - "excited": rscpFindTagIndex(cosPhi, "PVI_COS_PHI_EXCITED"), + "active": rscpFindTagIndex(cosPhi, RscpTag.PVI_COS_PHI_IS_AKTIV), + "value": rscpFindTagIndex(cosPhi, RscpTag.PVI_COS_PHI_VALUE), + "excited": rscpFindTagIndex(cosPhi, RscpTag.PVI_COS_PHI_EXCITED), }, "deviceState": { - "connected": rscpFindTagIndex(deviceState, "PVI_DEVICE_CONNECTED"), - "working": rscpFindTagIndex(deviceState, "PVI_DEVICE_WORKING"), - "inService": rscpFindTagIndex(deviceState, "PVI_DEVICE_IN_SERVICE"), + "connected": rscpFindTagIndex( + deviceState, RscpTag.PVI_DEVICE_CONNECTED + ), + "working": rscpFindTagIndex(deviceState, RscpTag.PVI_DEVICE_WORKING), + "inService": rscpFindTagIndex( + deviceState, RscpTag.PVI_DEVICE_IN_SERVICE + ), }, "frequency": { - "under": rscpFindTagIndex(frequency, "PVI_FREQUENCY_UNDER"), - "over": rscpFindTagIndex(frequency, "PVI_FREQUENCY_OVER"), + "under": rscpFindTagIndex(frequency, RscpTag.PVI_FREQUENCY_UNDER), + "over": rscpFindTagIndex(frequency, RscpTag.PVI_FREQUENCY_OVER), }, "index": pviIndex, - "lastError": rscpFindTagIndex(req, "PVI_LAST_ERROR"), + "lastError": rscpFindTagIndex(req, RscpTag.PVI_LAST_ERROR), "maxPhaseCount": maxPhaseCount, "maxStringCount": maxStringCount, - "onGrid": rscpFindTagIndex(req, "PVI_ON_GRID"), + "onGrid": rscpFindTagIndex(req, RscpTag.PVI_ON_GRID), "phases": {}, - "powerMode": rscpFindTagIndex(req, "PVI_POWER_MODE"), - "serialNumber": rscpFindTagIndex(req, "PVI_SERIAL_NUMBER"), - "state": rscpFindTagIndex(req, "PVI_STATE"), + "powerMode": rscpFindTagIndex(req, RscpTag.PVI_POWER_MODE), + "serialNumber": rscpFindTagIndex(req, RscpTag.PVI_SERIAL_NUMBER), + "state": rscpFindTagIndex(req, RscpTag.PVI_STATE), "strings": {}, - "systemMode": rscpFindTagIndex(req, "PVI_SYSTEM_MODE"), + "systemMode": rscpFindTagIndex(req, RscpTag.PVI_SYSTEM_MODE), "temperature": { "max": rscpFindTagIndex( - rscpFindTag(req, "PVI_MAX_TEMPERATURE"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_MAX_TEMPERATURE), RscpTag.PVI_VALUE ), "min": rscpFindTagIndex( - rscpFindTag(req, "PVI_MIN_TEMPERATURE"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_MIN_TEMPERATURE), RscpTag.PVI_VALUE ), "values": [], }, - "type": rscpFindTagIndex(req, "PVI_TYPE"), + "type": rscpFindTagIndex(req, RscpTag.PVI_TYPE), "version": rscpFindTagIndex( - rscpFindTag(req, "PVI_VERSION"), "PVI_VERSION_MAIN" + rscpFindTag(req, RscpTag.PVI_VERSION), RscpTag.PVI_VERSION_MAIN ), "voltageMonitoring": { "thresholdTop": rscpFindTagIndex( - voltageMonitoring, "PVI_VOLTAGE_MONITORING_THRESHOLD_TOP" + voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_THRESHOLD_TOP ), "thresholdBottom": rscpFindTagIndex( - voltageMonitoring, "PVI_VOLTAGE_MONITORING_THRESHOLD_BOTTOM" + voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_THRESHOLD_BOTTOM ), "slopeUp": rscpFindTagIndex( - voltageMonitoring, "PVI_VOLTAGE_MONITORING_SLOPE_UP" + voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_SLOPE_UP ), "slopeDown": rscpFindTagIndex( - voltageMonitoring, "PVI_VOLTAGE_MONITORING_SLOPE_DOWN" + voltageMonitoring, RscpTag.PVI_VOLTAGE_MONITORING_SLOPE_DOWN ), }, } - temperatures = range(0, int(rscpFindTagIndex(req, "PVI_TEMPERATURE_COUNT"))) + temperatures = range( + 0, int(rscpFindTagIndex(req, RscpTag.PVI_TEMPERATURE_COUNT)) + ) for temperature in temperatures: req = self.sendRequest( ( - "PVI_REQ_DATA", - "Container", + RscpTag.PVI_REQ_DATA, + RscpType.Container, [ - ("PVI_INDEX", "Uint16", pviIndex), - ("PVI_REQ_TEMPERATURE", "Uint16", temperature), + (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), + (RscpTag.PVI_REQ_TEMPERATURE, RscpType.Uint16, temperature), ], ), keepAlive=True, ) outObj["temperature"]["values"].append( round( - rscpFindTagIndex(rscpFindTag(req, "PVI_TEMPERATURE"), "PVI_VALUE"), + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_TEMPERATURE), RscpTag.PVI_VALUE + ), 2, ) ) @@ -1616,52 +1705,68 @@ def get_pvi_data(self, pviIndex=None, strings=None, phases=None, keepAlive=False for phase in phases: req = self.sendRequest( ( - "PVI_REQ_DATA", - "Container", + RscpTag.PVI_REQ_DATA, + RscpType.Container, [ - ("PVI_INDEX", "Uint16", pviIndex), - ("PVI_REQ_AC_POWER", "Uint16", phase), - ("PVI_REQ_AC_VOLTAGE", "Uint16", phase), - ("PVI_REQ_AC_CURRENT", "Uint16", phase), - ("PVI_REQ_AC_APPARENTPOWER", "Uint16", phase), - ("PVI_REQ_AC_REACTIVEPOWER", "Uint16", phase), - ("PVI_REQ_AC_ENERGY_ALL", "Uint16", phase), - ("PVI_REQ_AC_ENERGY_GRID_CONSUMPTION", "Uint16", phase), + (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), + (RscpTag.PVI_REQ_AC_POWER, RscpType.Uint16, phase), + (RscpTag.PVI_REQ_AC_VOLTAGE, RscpType.Uint16, phase), + (RscpTag.PVI_REQ_AC_CURRENT, RscpType.Uint16, phase), + (RscpTag.PVI_REQ_AC_APPARENTPOWER, RscpType.Uint16, phase), + (RscpTag.PVI_REQ_AC_REACTIVEPOWER, RscpType.Uint16, phase), + (RscpTag.PVI_REQ_AC_ENERGY_ALL, RscpType.Uint16, phase), + ( + RscpTag.PVI_REQ_AC_ENERGY_GRID_CONSUMPTION, + RscpType.Uint16, + phase, + ), ], ), keepAlive=True, ) phaseobj = { "power": round( - rscpFindTagIndex(rscpFindTag(req, "PVI_AC_POWER"), "PVI_VALUE"), 2 + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_AC_POWER), RscpTag.PVI_VALUE + ), + 2, ), "voltage": round( - rscpFindTagIndex(rscpFindTag(req, "PVI_AC_VOLTAGE"), "PVI_VALUE"), 2 + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_AC_VOLTAGE), RscpTag.PVI_VALUE + ), + 2, ), "current": round( - rscpFindTagIndex(rscpFindTag(req, "PVI_AC_CURRENT"), "PVI_VALUE"), 2 + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_AC_CURRENT), RscpTag.PVI_VALUE + ), + 2, ), "apparentPower": round( - rscpFindTag(rscpFindTag(req, "PVI_AC_APPARENTPOWER"), "PVI_VALUE")[ - 2 - ], + rscpFindTag( + rscpFindTag(req, RscpTag.PVI_AC_APPARENTPOWER), + RscpTag.PVI_VALUE, + )[2], 2, ), "reactivePower": round( rscpFindTagIndex( - rscpFindTag(req, "PVI_AC_REACTIVEPOWER"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_AC_REACTIVEPOWER), + RscpTag.PVI_VALUE, ), 2, ), "energyAll": round( rscpFindTagIndex( - rscpFindTag(req, "PVI_AC_ENERGY_ALL"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_AC_ENERGY_ALL), RscpTag.PVI_VALUE ), 2, ), "energyGridConsumption": round( rscpFindTagIndex( - rscpFindTag(req, "PVI_AC_ENERGY_GRID_CONSUMPTION"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_AC_ENERGY_GRID_CONSUMPTION), + RscpTag.PVI_VALUE, ), 2, ), @@ -1674,14 +1779,18 @@ def get_pvi_data(self, pviIndex=None, strings=None, phases=None, keepAlive=False for string in strings: req = self.sendRequest( ( - "PVI_REQ_DATA", - "Container", + RscpTag.PVI_REQ_DATA, + RscpType.Container, [ - ("PVI_INDEX", "Uint16", pviIndex), - ("PVI_REQ_DC_POWER", "Uint16", string), - ("PVI_REQ_DC_VOLTAGE", "Uint16", string), - ("PVI_REQ_DC_CURRENT", "Uint16", string), - ("PVI_REQ_DC_STRING_ENERGY_ALL", "Uint16", string), + (RscpTag.PVI_INDEX, RscpType.Uint16, pviIndex), + (RscpTag.PVI_REQ_DC_POWER, RscpType.Uint16, string), + (RscpTag.PVI_REQ_DC_VOLTAGE, RscpType.Uint16, string), + (RscpTag.PVI_REQ_DC_CURRENT, RscpType.Uint16, string), + ( + RscpTag.PVI_REQ_DC_STRING_ENERGY_ALL, + RscpType.Uint16, + string, + ), ], ), keepAlive=True @@ -1690,17 +1799,27 @@ def get_pvi_data(self, pviIndex=None, strings=None, phases=None, keepAlive=False ) stringobj = { "power": round( - rscpFindTagIndex(rscpFindTag(req, "PVI_DC_POWER"), "PVI_VALUE"), 2 + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_DC_POWER), RscpTag.PVI_VALUE + ), + 2, ), "voltage": round( - rscpFindTagIndex(rscpFindTag(req, "PVI_DC_VOLTAGE"), "PVI_VALUE"), 2 + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_DC_VOLTAGE), RscpTag.PVI_VALUE + ), + 2, ), "current": round( - rscpFindTagIndex(rscpFindTag(req, "PVI_DC_CURRENT"), "PVI_VALUE"), 2 + rscpFindTagIndex( + rscpFindTag(req, RscpTag.PVI_DC_CURRENT), RscpTag.PVI_VALUE + ), + 2, ), "energyAll": round( rscpFindTagIndex( - rscpFindTag(req, "PVI_DC_STRING_ENERGY_ALL"), "PVI_VALUE" + rscpFindTag(req, RscpTag.PVI_DC_STRING_ENERGY_ALL), + RscpTag.PVI_VALUE, ), 2, ), @@ -1768,17 +1887,17 @@ def get_powermeters(self, keepAlive=False): ): # max 8 powermeters according to E3DC spec res = self.sendRequest( ( - "PM_REQ_DATA", - "Container", + RscpTag.PM_REQ_DATA, + RscpType.Container, [ - ("PM_INDEX", "Uint16", pmIndex), - ("PM_REQ_TYPE", "None", None), + (RscpTag.PM_INDEX, RscpType.Uint16, pmIndex), + (RscpTag.PM_REQ_TYPE, RscpType.NoneType, None), ], ), keepAlive=True if pmIndex < (maxPowermeters - 1) else keepAlive, ) - pmType = rscpFindTagIndex(res, "PM_TYPE") + pmType = rscpFindTagIndex(res, RscpTag.PM_TYPE) if pmType is not None: outObj.append( @@ -1829,51 +1948,51 @@ def get_powermeter_data(self, pmIndex=None, keepAlive=False): res = self.sendRequest( ( - "PM_REQ_DATA", - "Container", + RscpTag.PM_REQ_DATA, + RscpType.Container, [ - ("PM_INDEX", "Uint16", pmIndex), - ("PM_REQ_POWER_L1", "None", None), - ("PM_REQ_POWER_L2", "None", None), - ("PM_REQ_POWER_L3", "None", None), - ("PM_REQ_VOLTAGE_L1", "None", None), - ("PM_REQ_VOLTAGE_L2", "None", None), - ("PM_REQ_VOLTAGE_L3", "None", None), - ("PM_REQ_ENERGY_L1", "None", None), - ("PM_REQ_ENERGY_L2", "None", None), - ("PM_REQ_ENERGY_L3", "None", None), - ("PM_REQ_MAX_PHASE_POWER", "None", None), - ("PM_REQ_ACTIVE_PHASES", "None", None), - ("PM_REQ_TYPE", "None", None), - ("PM_REQ_MODE", "None", None), + (RscpTag.PM_INDEX, RscpType.Uint16, pmIndex), + (RscpTag.PM_REQ_POWER_L1, RscpType.NoneType, None), + (RscpTag.PM_REQ_POWER_L2, RscpType.NoneType, None), + (RscpTag.PM_REQ_POWER_L3, RscpType.NoneType, None), + (RscpTag.PM_REQ_VOLTAGE_L1, RscpType.NoneType, None), + (RscpTag.PM_REQ_VOLTAGE_L2, RscpType.NoneType, None), + (RscpTag.PM_REQ_VOLTAGE_L3, RscpType.NoneType, None), + (RscpTag.PM_REQ_ENERGY_L1, RscpType.NoneType, None), + (RscpTag.PM_REQ_ENERGY_L2, RscpType.NoneType, None), + (RscpTag.PM_REQ_ENERGY_L3, RscpType.NoneType, None), + (RscpTag.PM_REQ_MAX_PHASE_POWER, RscpType.NoneType, None), + (RscpTag.PM_REQ_ACTIVE_PHASES, RscpType.NoneType, None), + (RscpTag.PM_REQ_TYPE, RscpType.NoneType, None), + (RscpTag.PM_REQ_MODE, RscpType.NoneType, None), ], ), keepAlive=keepAlive, ) - activePhasesChar = rscpFindTagIndex(res, "PM_ACTIVE_PHASES") + activePhasesChar = rscpFindTagIndex(res, RscpTag.PM_ACTIVE_PHASES) activePhases = f"{activePhasesChar:03b}" outObj = { "activePhases": activePhases, "energy": { - "L1": rscpFindTagIndex(res, "PM_ENERGY_L1"), - "L2": rscpFindTagIndex(res, "PM_ENERGY_L2"), - "L3": rscpFindTagIndex(res, "PM_ENERGY_L3"), + "L1": rscpFindTagIndex(res, RscpTag.PM_ENERGY_L1), + "L2": rscpFindTagIndex(res, RscpTag.PM_ENERGY_L2), + "L3": rscpFindTagIndex(res, RscpTag.PM_ENERGY_L3), }, "index": pmIndex, - "maxPhasePower": rscpFindTagIndex(res, "PM_MAX_PHASE_POWER"), - "mode": rscpFindTagIndex(res, "PM_MODE"), + "maxPhasePower": rscpFindTagIndex(res, RscpTag.PM_MAX_PHASE_POWER), + "mode": rscpFindTagIndex(res, RscpTag.PM_MODE), "power": { - "L1": rscpFindTagIndex(res, "PM_POWER_L1"), - "L2": rscpFindTagIndex(res, "PM_POWER_L2"), - "L3": rscpFindTagIndex(res, "PM_POWER_L3"), + "L1": rscpFindTagIndex(res, RscpTag.PM_POWER_L1), + "L2": rscpFindTagIndex(res, RscpTag.PM_POWER_L2), + "L3": rscpFindTagIndex(res, RscpTag.PM_POWER_L3), }, - "type": rscpFindTagIndex(res, "PM_TYPE"), + "type": rscpFindTagIndex(res, RscpTag.PM_TYPE), "voltage": { - "L1": round(rscpFindTagIndex(res, "PM_VOLTAGE_L1"), 4), - "L2": round(rscpFindTagIndex(res, "PM_VOLTAGE_L2"), 4), - "L3": round(rscpFindTagIndex(res, "PM_VOLTAGE_L3"), 4), + "L1": round(rscpFindTagIndex(res, RscpTag.PM_VOLTAGE_L1), 4), + "L2": round(rscpFindTagIndex(res, RscpTag.PM_VOLTAGE_L2), 4), + "L3": round(rscpFindTagIndex(res, RscpTag.PM_VOLTAGE_L3), 4), }, } return outObj @@ -1929,17 +2048,18 @@ def get_power_settings(self, keepAlive=False): } """ res = self.sendRequest( - ("EMS_REQ_GET_POWER_SETTINGS", "None", None), keepAlive=keepAlive + (RscpTag.EMS_REQ_GET_POWER_SETTINGS, RscpType.NoneType, None), + keepAlive=keepAlive, ) - dischargeStartPower = rscpFindTagIndex(res, "EMS_DISCHARGE_START_POWER") - maxChargePower = rscpFindTagIndex(res, "EMS_MAX_CHARGE_POWER") - maxDischargePower = rscpFindTagIndex(res, "EMS_MAX_DISCHARGE_POWER") - powerLimitsUsed = rscpFindTagIndex(res, "EMS_POWER_LIMITS_USED") - powerSaveEnabled = rscpFindTagIndex(res, "EMS_POWERSAVE_ENABLED") - weatherForecastMode = rscpFindTagIndex(res, "EMS_WEATHER_FORECAST_MODE") + dischargeStartPower = rscpFindTagIndex(res, RscpTag.EMS_DISCHARGE_START_POWER) + maxChargePower = rscpFindTagIndex(res, RscpTag.EMS_MAX_CHARGE_POWER) + maxDischargePower = rscpFindTagIndex(res, RscpTag.EMS_MAX_DISCHARGE_POWER) + powerLimitsUsed = rscpFindTagIndex(res, RscpTag.EMS_POWER_LIMITS_USED) + powerSaveEnabled = rscpFindTagIndex(res, RscpTag.EMS_POWERSAVE_ENABLED) + weatherForecastMode = rscpFindTagIndex(res, RscpTag.EMS_WEATHER_FORECAST_MODE) weatherRegulatedChargeEnabled = rscpFindTagIndex( - res, "EMS_WEATHER_REGULATED_CHARGE_ENABLED" + res, RscpTag.EMS_WEATHER_REGULATED_CHARGE_ENABLED ) outObj = { @@ -1987,13 +2107,21 @@ def set_power_limits( if enable: res = self.sendRequest( ( - "EMS_REQ_SET_POWER_SETTINGS", - "Container", + RscpTag.EMS_REQ_SET_POWER_SETTINGS, + RscpType.Container, [ - ("EMS_POWER_LIMITS_USED", "Bool", True), - ("EMS_MAX_DISCHARGE_POWER", "Uint32", max_discharge), - ("EMS_MAX_CHARGE_POWER", "Uint32", max_charge), - ("EMS_DISCHARGE_START_POWER", "Uint32", discharge_start), + (RscpTag.EMS_POWER_LIMITS_USED, RscpType.Bool, True), + ( + RscpTag.EMS_MAX_DISCHARGE_POWER, + RscpType.UInt32, + max_discharge, + ), + (RscpTag.EMS_MAX_CHARGE_POWER, RscpType.UInt32, max_charge), + ( + RscpTag.EMS_DISCHARGE_START_POWER, + RscpType.UInt32, + discharge_start, + ), ], ), keepAlive=keepAlive, @@ -2001,9 +2129,9 @@ def set_power_limits( else: res = self.sendRequest( ( - "EMS_REQ_SET_POWER_SETTINGS", - "Container", - [("EMS_POWER_LIMITS_USED", "Bool", False)], + RscpTag.EMS_REQ_SET_POWER_SETTINGS, + RscpType.Container, + [(RscpTag.EMS_POWER_LIMITS_USED, RscpType.Bool, False)], ), keepAlive=keepAlive, ) @@ -2031,23 +2159,23 @@ def set_powersave(self, enable, keepAlive=False): """ res = self.sendRequest( ( - "EMS_REQ_SET_POWER_SETTINGS", - "Container", - [("EMS_POWERSAVE_ENABLED", "UChar8", int(enable))], + RscpTag.EMS_REQ_SET_POWER_SETTINGS, + RscpType.Container, + [(RscpTag.EMS_POWERSAVE_ENABLED, RscpType.UChar8, int(enable))], ), keepAlive=keepAlive, ) # Returns value of EMS_REQ_SET_POWER_SETTINGS, we get a success flag here, # that we normalize and push outside. - # [ "EMS_SET_POWER_SETTINGS", - # "Container", + # [ RscpTag.EMS_SET_POWER_SETTINGS, + # RscpType.Container, # [ - # ["EMS_RES_POWERSAVE_ENABLED", "Char8", 0] + # [RscpTag.EMS_RES_POWERSAVE_ENABLED, "Char8", 0] # ] # ] - if rscpFindTagIndex(res, "EMS_RES_POWERSAVE_ENABLED") == 0: + if rscpFindTagIndex(res, RscpTag.EMS_RES_POWERSAVE_ENABLED) == 0: return 0 else: return -1 @@ -2066,18 +2194,30 @@ def set_weather_regulated_charge(self, enable, keepAlive=False): if enable: res = self.sendRequest( ( - "EMS_REQ_SET_POWER_SETTINGS", - "Container", - [("EMS_WEATHER_REGULATED_CHARGE_ENABLED", "UChar8", 1)], + RscpTag.EMS_REQ_SET_POWER_SETTINGS, + RscpType.Container, + [ + ( + RscpTag.EMS_WEATHER_REGULATED_CHARGE_ENABLED, + RscpType.UChar8, + 1, + ) + ], ), keepAlive=keepAlive, ) else: res = self.sendRequest( ( - "EMS_REQ_SET_POWER_SETTINGS", - "Container", - [("EMS_WEATHER_REGULATED_CHARGE_ENABLED", "UChar8", 0)], + RscpTag.EMS_REQ_SET_POWER_SETTINGS, + RscpType.Container, + [ + ( + RscpTag.EMS_WEATHER_REGULATED_CHARGE_ENABLED, + RscpType.UChar8, + 0, + ) + ], ), keepAlive=keepAlive, ) diff --git a/e3dc/_e3dc_rscp_local.py b/e3dc/_e3dc_rscp_local.py index 3dd0ff1..24dec47 100644 --- a/e3dc/_e3dc_rscp_local.py +++ b/e3dc/_e3dc_rscp_local.py @@ -8,6 +8,7 @@ from ._RSCPEncryptDecrypt import RSCPEncryptDecrypt from ._rscpLib import rscpDecode, rscpEncode, rscpFrame +from ._rscpTags import RscpTag, RscpType PORT = 5033 BUFFER_SIZE = 1024 * 32 @@ -122,11 +123,11 @@ def connect(self): self.sendRequest( ( - "RSCP_REQ_AUTHENTICATION", - "Container", + RscpTag.RSCP_REQ_AUTHENTICATION, + RscpType.Container, [ - ("RSCP_AUTHENTICATION_USER", "CString", self.username), - ("RSCP_AUTHENTICATION_PASSWORD", "CString", self.password), + (RscpTag.RSCP_AUTHENTICATION_USER, RscpType.CString, self.username), + (RscpTag.RSCP_AUTHENTICATION_PASSWORD, RscpType.CString, self.password), ], ) ) diff --git a/e3dc/_e3dc_rscp_web.py b/e3dc/_e3dc_rscp_web.py index 1274780..4e31010 100644 --- a/e3dc/_e3dc_rscp_web.py +++ b/e3dc/_e3dc_rscp_web.py @@ -13,6 +13,7 @@ import tzlocal import websocket +from ._rscpTags import RscpTag, RscpType from . import _rscpLib as rscpLib """ @@ -120,14 +121,18 @@ def buildVirtualConn(self): """Method to create Virtual Connection.""" virtualConn = rscpLib.rscpFrame( rscpLib.rscpEncode( - "SERVER_REQ_NEW_VIRTUAL_CONNECTION", - "Container", + RscpTag.SERVER_REQ_NEW_VIRTUAL_CONNECTION, + RscpType.Container, [ - ("SERVER_USER", "CString", self.username), - ("SERVER_PASSWD", "CString", self.password), - ("SERVER_IDENTIFIER", "CString", self.serialNumberWithPrefix), - ("SERVER_TYPE", "Int32", 4), - ("SERVER_HASH_CODE", "Int32", 1234567890), + (RscpTag.SERVER_USER, RscpType.CString, self.username), + (RscpTag.SERVER_PASSWD, RscpType.CString, self.password), + ( + RscpTag.SERVER_IDENTIFIER, + RscpType.CString, + self.serialNumberWithPrefix, + ), + (RscpTag.SERVER_TYPE, RscpType.Int32, 4), + (RscpTag.SERVER_HASH_CODE, RscpType.Int32, 1234567890), ], ) ) @@ -138,46 +143,73 @@ def buildVirtualConn(self): def respondToINFORequest(self, decoded): """Create Response to INFO request.""" TIMEZONE_STR, utcDiffS = calcTimeZone() - tag = decoded[0] - # print(tag) - if tag == "INFO_REQ_IP_ADDRESS": - return rscpLib.rscpEncode("INFO_IP_ADDRESS", "CString", "0.0.0.0") - elif tag == "INFO_REQ_SUBNET_MASK": - return rscpLib.rscpEncode("INFO_SUBNET_MASK", "CString", "0.0.0.0") - elif tag == "INFO_REQ_GATEWAY": - return rscpLib.rscpEncode("INFO_GATEWAY", "CString", "0.0.0.0") - elif tag == "INFO_REQ_DNS": - return rscpLib.rscpEncode("INFO_DNS", "CString", "0.0.0.0") - elif tag == "INFO_REQ_DHCP_STATUS": - return rscpLib.rscpEncode("INFO_DHCP_STATUS", "Bool", "false") - elif tag == "INFO_REQ_TIME": + + try: + tag = RscpTag[decoded[0]] + except KeyError: + # This is a tag unknown to this library + return None + + if tag == RscpTag.INFO_REQ_IP_ADDRESS: + return rscpLib.rscpEncode( + RscpTag.INFO_IP_ADDRESS, RscpType.CString, "0.0.0.0" + ) + elif tag == RscpTag.INFO_REQ_SUBNET_MASK: + return rscpLib.rscpEncode( + RscpTag.INFO_SUBNET_MASK, RscpType.CString, "0.0.0.0" + ) + elif tag == RscpTag.INFO_REQ_GATEWAY: + return rscpLib.rscpEncode( + RscpTag.INFO_GATEWAY, RscpType.CString, "0.0.0.0" + ) + elif tag == RscpTag.INFO_REQ_DNS: + return rscpLib.rscpEncode(RscpTag.INFO_DNS, RscpType.CString, "0.0.0.0") + elif tag == RscpTag.INFO_REQ_DHCP_STATUS: + return rscpLib.rscpEncode( + RscpTag.INFO_DHCP_STATUS, RscpType.Bool, "false" + ) + elif tag == RscpTag.INFO_REQ_TIME: return rscpLib.rscpEncode( - "INFO_TIME", "ByteArray", timestampEncode(time.time()) + RscpTag.INFO_TIME, RscpType.ByteArray, timestampEncode(time.time()) ) - elif tag == "INFO_REQ_TIME_ZONE": - return rscpLib.rscpEncode("INFO_TIME_ZONE", "CString", TIMEZONE_STR) - elif tag == "INFO_REQ_UTC_TIME": + elif tag == RscpTag.INFO_REQ_TIME_ZONE: return rscpLib.rscpEncode( - "INFO_UTC_TIME", "ByteArray", timestampEncode(time.time() - utcDiffS) + RscpTag.INFO_TIME_ZONE, RscpType.CString, TIMEZONE_STR ) - elif tag == "INFO_REQ_A35_SERIAL_NUMBER": - return rscpLib.rscpEncode("INFO_A35_SERIAL_NUMBER", "CString", "123456") - elif tag == "INFO_REQ_INFO": + elif tag == RscpTag.INFO_REQ_UTC_TIME: return rscpLib.rscpEncode( - "INFO_INFO", - "Container", + RscpTag.INFO_UTC_TIME, + RscpType.ByteArray, + timestampEncode(time.time() - utcDiffS), + ) + elif tag == RscpTag.INFO_REQ_A35_SERIAL_NUMBER: + return rscpLib.rscpEncode( + RscpTag.INFO_A35_SERIAL_NUMBER, RscpType.CString, "123456" + ) + elif tag == RscpTag.INFO_REQ_INFO: + return rscpLib.rscpEncode( + RscpTag.INFO_INFO, + RscpType.Container, [ ( - "INFO_SERIAL_NUMBER", - "CString", + RscpTag.INFO_SERIAL_NUMBER, + RscpType.CString, "WEB_" + hashlib.md5(self.username + bytes(self.conId)).hexdigest(), ), - ("INFO_PRODUCTION_DATE", "CString", "570412800000"), - ("INFO_MAC_ADDRESS", "CString", "00:00:00:00:00:00"), + ( + RscpTag.INFO_PRODUCTION_DATE, + RscpType.CString, + "570412800000", + ), + ( + RscpTag.INFO_MAC_ADDRESS, + RscpType.CString, + "00:00:00:00:00:00", + ), ], ) - elif tag == "INFO_SERIAL_NUMBER": + elif tag == RscpTag.INFO_SERIAL_NUMBER: self.webSerialno = decoded[2] self.buildVirtualConn() return "" @@ -186,19 +218,27 @@ def respondToINFORequest(self, decoded): def registerConnectionHandler(self, decodedMsg): """Registering Connection Handler.""" if self.conId is None: - self.conId = rscpLib.rscpFindTag(decodedMsg, "SERVER_CONNECTION_ID")[2] - self.authLevel = rscpLib.rscpFindTag(decodedMsg, "SERVER_AUTH_LEVEL")[2] + self.conId = rscpLib.rscpFindTag(decodedMsg, RscpTag.SERVER_CONNECTION_ID)[ + 2 + ] + self.authLevel = rscpLib.rscpFindTag(decodedMsg, RscpTag.SERVER_AUTH_LEVEL)[ + 2 + ] else: - self.virtConId = rscpLib.rscpFindTag(decodedMsg, "SERVER_CONNECTION_ID")[2] - self.virtAuthLevel = rscpLib.rscpFindTag(decodedMsg, "SERVER_AUTH_LEVEL")[2] - # reply = rscpLib.rscpFrame(rscpLib.rscpEncode("SERVER_CONNECTION_REGISTERED", "Container", [decodedMsg[2][0], decodedMsg[2][1]])); + self.virtConId = rscpLib.rscpFindTag( + decodedMsg, RscpTag.SERVER_CONNECTION_ID + )[2] + self.virtAuthLevel = rscpLib.rscpFindTag( + decodedMsg, RscpTag.SERVER_AUTH_LEVEL + )[2] + # reply = rscpLib.rscpFrame(rscpLib.rscpEncode(RscpTag.SERVER_CONNECTION_REGISTERED, RscpType.Container, [decodedMsg[2][0], decodedMsg[2][1]])); reply = rscpLib.rscpFrame( rscpLib.rscpEncode( - "SERVER_CONNECTION_REGISTERED", - "Container", + RscpTag.SERVER_CONNECTION_REGISTERED, + RscpType.Container, [ - rscpLib.rscpFindTag(decodedMsg, "SERVER_CONNECTION_ID"), - rscpLib.rscpFindTag(decodedMsg, "SERVER_AUTH_LEVEL"), + rscpLib.rscpFindTag(decodedMsg, RscpTag.SERVER_CONNECTION_ID), + rscpLib.rscpFindTag(decodedMsg, RscpTag.SERVER_AUTH_LEVEL), ], ) ) @@ -212,21 +252,27 @@ def on_message(self, message): decodedMsg = rscpLib.rscpDecode(message)[0] + try: + tag = RscpTag[decodedMsg[0]] + except KeyError: + # This is a tag unknown to this library + pass + # print "Decoded received message", decodedMsg - if decodedMsg[0] == "SERVER_REQ_PING": + if tag == RscpTag.SERVER_REQ_PING: pingFrame = rscpLib.rscpFrame( - rscpLib.rscpEncode("SERVER_PING", "None", None) + rscpLib.rscpEncode(RscpTag.SERVER_PING, RscpType.NoneType, None) ) self.ws.send(pingFrame, websocket.ABNF.OPCODE_BINARY) return - elif decodedMsg[0] == "SERVER_REGISTER_CONNECTION": + elif tag == RscpTag.SERVER_REGISTER_CONNECTION: self.registerConnectionHandler(decodedMsg) - elif decodedMsg[0] == "SERVER_UNREGISTER_CONNECTION": + elif tag == RscpTag.SERVER_UNREGISTER_CONNECTION: # this signifies some error self.disconnect() - elif decodedMsg[0] == "SERVER_REQ_RSCP_CMD": + elif tag == RscpTag.SERVER_REQ_RSCP_CMD: data = rscpLib.rscpFrameDecode( - rscpLib.rscpFindTag(decodedMsg, "SERVER_RSCP_DATA")[2] + rscpLib.rscpFindTag(decodedMsg, RscpTag.SERVER_RSCP_DATA)[2] )[0] response = b"" self.responseCallbackCalled = False @@ -244,7 +290,7 @@ def on_message(self, message): self.responseCallbackCalled = True responseChunk = b"" - if type(responseChunk) is str: + if isinstance(responseChunk, str): responseChunk = responseChunk.encode("utf-8") response += responseChunk if self.responseCallbackCalled: @@ -253,13 +299,13 @@ def on_message(self, message): return # do not send an empty response innerFrame = rscpLib.rscpFrame(response) responseContainer = rscpLib.rscpEncode( - "SERVER_REQ_RSCP_CMD", - "Container", + RscpTag.SERVER_REQ_RSCP_CMD, + RscpType.Container, [ - ("SERVER_CONNECTION_ID", "Int64", self.conId), - ("SERVER_AUTH_LEVEL", "UChar8", self.authLevel), - ("SERVER_RSCP_DATA_LEN", "Int32", len(innerFrame)), - ("SERVER_RSCP_DATA", "ByteArray", innerFrame), + (RscpTag.SERVER_CONNECTION_ID, RscpType.Int64, self.conId), + (RscpTag.SERVER_AUTH_LEVEL, RscpType.UChar8, self.authLevel), + (RscpTag.SERVER_RSCP_DATA_LEN, RscpType.Int32, len(innerFrame)), + (RscpTag.SERVER_RSCP_DATA, RscpType.ByteArray, innerFrame), ], ) @@ -310,13 +356,17 @@ def _sendRequest_internal(self, innerFrame, callback=None, synchronous=False): outerFrame = rscpLib.rscpFrame( rscpLib.rscpEncode( - "SERVER_REQ_RSCP_CMD", - "Container", + RscpTag.SERVER_REQ_RSCP_CMD, + RscpType.Container, [ - ("SERVER_CONNECTION_ID", "Int64", self.virtConId), - ("SERVER_AUTH_LEVEL", "UChar8", self.virtAuthLevel), - ("SERVER_RSCP_DATA_LEN", "Int32", len(innerFrame)), - ("SERVER_RSCP_DATA", "ByteArray", innerFrame), + (RscpTag.SERVER_CONNECTION_ID, RscpType.Int64, self.virtConId), + ( + RscpTag.SERVER_AUTH_LEVEL, + RscpType.UChar8, + self.virtAuthLevel, + ), + (RscpTag.SERVER_RSCP_DATA_LEN, RscpType.Int32, len(innerFrame)), + (RscpTag.SERVER_RSCP_DATA, RscpType.ByteArray, innerFrame), ], ) ) diff --git a/e3dc/_rscpLib.py b/e3dc/_rscpLib.py index c31fd53..b6e57fa 100644 --- a/e3dc/_rscpLib.py +++ b/e3dc/_rscpLib.py @@ -9,7 +9,15 @@ import time import zlib -from . import _rscpTags as rscpTags +from ._rscpTags import ( + RscpType, + getStrDatatype, + getStrTag, + getHexTag, + getHexDatatype, + getDatatype, + getErrorcode, +) DEBUG_DICT = {"print_rscp": False} @@ -27,25 +35,25 @@ def set_debug(debug): packFmtDict_FixedSize = { - "Bool": "?", - "Char8": "b", - "UChar8": "B", - "Int16": "h", - "Uint16": "H", - "Int32": "i", - "Uint32": "I", - "Int64": "q", - "Uint64": "Q", - "Float32": "f", - "Double64": "d", + RscpType.Bool: "?", + RscpType.Char8: "b", + RscpType.UChar8: "B", + RscpType.Int16: "h", + RscpType.Uint16: "H", + RscpType.Int32: "i", + RscpType.Uint32: "I", + RscpType.Int64: "q", + RscpType.Uint64: "Q", + RscpType.Float32: "f", + RscpType.Double64: "d", } packFmtDict_VarSize = { - "Bitfield": "s", - "CString": "s", - "Container": "s", - "ByteArray": "s", - "Error": "s", + RscpType.Bitfield: "s", + RscpType.CString: "s", + RscpType.Container: "s", + RscpType.ByteArray: "s", + RscpType.Error: "s", } @@ -54,14 +62,20 @@ def rscpFindTag(decodedMsg, tag): Args: decodedMsg (list): the decoded message - tag (str): the RSCP Tag string to search for + tag (RscpTag): the RSCP Tag to search for Returns: list: the found tag """ + try: + tagStr = getStrTag(tag) + except KeyError: + # Tag is unknown to this library + return None + if decodedMsg is None: return None - if decodedMsg[0] == tag: + if decodedMsg[0] == tagStr: return decodedMsg if isinstance(decodedMsg[2], list): for msg in decodedMsg[2]: @@ -76,15 +90,15 @@ def rscpFindTagIndex(decodedMsg, tag, index=2): Args: decodedMsg (list): the decoded message - tag (str): the RSCP Tag string to search for + tag (RscpTag): the RSCP Tag to search for index (Optional[int]): the index of the found tag to return. Default is 2, the value of the Tag. Returns: the content of the configured index for the tag. """ - tag = rscpFindTag(decodedMsg, tag) - if tag is not None: - return tag[index] + res = rscpFindTag(decodedMsg, tag) + if res is not None: + return res[index] return None @@ -105,17 +119,17 @@ def rscpEncode(tagStr, typeStr=None, data=None): typeStr = tagStr[1] data = tagStr[2] tagStr = tagStr[0] - else: - if typeStr is None: - raise TypeError("Second argument must not be none if first is not a tuple") - - tagHex = rscpTags.getHexTag(tagStr) - typeHex = rscpTags.getHexDatatype(typeStr) + elif typeStr is None: + raise TypeError("Second argument must not be none if first is not a tuple") + + tagHex = getHexTag(tagStr) + typeHex = getHexDatatype(typeStr) + type_ = getDatatype(typeStr) if DEBUG_DICT["print_rscp"]: print(">", tagStr, typeStr, data) - if type(data) is str: + if isinstance(data, str): data = data.encode("utf-8") packFmt = ( @@ -123,11 +137,9 @@ def rscpEncode(tagStr, typeStr=None, data=None): ) headerLen = struct.calcsize(packFmt) - if typeStr == "None": # special case: no content + if type_ == RscpType.NoneType: # special case: no content return struct.pack(packFmt, tagHex, typeHex, 0) - elif ( - typeStr == "Timestamp" - ): # timestamp has a special format, divided into 32 bit integers + elif type_ == RscpType.Timestamp: # timestamp has a special format, divided into 32 bit integers ts = int(data / 1000) # this is int64 ms = (data - ts * 1000) * 1e6 # ms are multiplied by 10^6 @@ -138,7 +150,7 @@ def rscpEncode(tagStr, typeStr=None, data=None): length = struct.calcsize(packFmt) - headerLen return struct.pack(packFmt, tagHex, typeHex, length, hiword, loword, ms) - elif typeStr == "Container": + elif type_ == RscpType.Container: if isinstance(data, list): newData = b"" for dataChunk in data: @@ -146,11 +158,11 @@ def rscpEncode(tagStr, typeStr=None, data=None): dataChunk[0], dataChunk[1], dataChunk[2] ) # transform each dataChunk into byte array data = newData - packFmt += str(len(data)) + packFmtDict_VarSize[typeStr] - elif typeStr in packFmtDict_FixedSize: - packFmt += packFmtDict_FixedSize[typeStr] - elif typeStr in packFmtDict_VarSize: - packFmt += str(len(data)) + packFmtDict_VarSize[typeStr] + packFmt += str(len(data)) + packFmtDict_VarSize[type_] + elif type_ in packFmtDict_FixedSize: + packFmt += packFmtDict_FixedSize[type_] + elif type_ in packFmtDict_VarSize: + packFmt += str(len(data)) + packFmtDict_VarSize[type_] length = struct.calcsize(packFmt) - headerLen return struct.pack(packFmt, tagHex, typeHex, length, data) @@ -231,10 +243,11 @@ def rscpDecode(data): headerFmt, data[: struct.calcsize(headerFmt)] ) # print (hex(hexTag), hex(hexType), length, data[struct.calcsize(headerFmt):]) - strTag = rscpTags.getTag(hexTag) - strType = rscpTags.getDatatype(hexType) + strTag = getStrTag(hexTag) + strType = getStrDatatype(hexType) + type_ = getDatatype(hexType) - if strType == "Container": + if type_ == RscpType.Container: # this is a container: parse the inside dataList = [] curByte = headerSize @@ -243,7 +256,7 @@ def rscpDecode(data): curByte += usedLength dataList.append(innerData) return (strTag, strType, dataList), curByte - elif strType == "Timestamp": + elif type_ == RscpType.Timestamp: fmt = " RscpTag: + """Get tag as RscpTag.""" + if isinstance(tag, int): + tag = RscpTag(tag) + elif isinstance(tag, str): + tag = RscpTag[tag] - Attributes: - type_str (str): type as string + return tag - Returns: - int: hex representation of the given type_str - """ - for k, v in rscpDataTypes.items(): - if v == type_str: - return k - raise KeyError(type_str) +def getHexTag(tag: int | str | RscpTag) -> int: + """Get tag as hex.""" + if isinstance(tag, int): + tag = RscpTag(tag) + elif isinstance(tag, str): + tag = RscpTag[tag] -def getDatatype(type_hex): - """Get data type as string. + return tag.value - Attributes: - type_hex (int): type as hex - Returns: - str: String representation of the given type_hex - """ - return rscpDataTypes[type_hex] +def getStrTag(tag: int | str | RscpTag) -> str: + """Get tag as string.""" + if isinstance(tag, int): + tag = RscpTag(tag) + elif isinstance(tag, str): + tag = RscpTag[tag] + + return tag.name + + +def getDatatype(type_: int | str | RscpType) -> RscpType: + """Get datatype as RscpType.""" + if isinstance(type_, int): + type_ = RscpType(type_) + elif isinstance(type_, str): + if type_ == "None": + type_ = RscpType.NoneType + else: + type_ = RscpType[type_] + + return type_ + + +def getHexDatatype(type_: int | str | RscpType) -> int: + """Get datatype as hex.""" + if isinstance(type_, int): + type_ = RscpType(type_) + elif isinstance(type_, str): + if type_ == "None": + type_ = RscpType.NoneType + else: + type_ = RscpType[type_] + + return type_.value + + +def getStrDatatype(type_: int | str | RscpType) -> str: + """Get datatype as string.""" + if isinstance(type_, int): + type_ = RscpType(type_) + elif isinstance(type_, str): + if type_ == "None": + type_ = RscpType.NoneType + else: + type_ = RscpType[type_] + + if RscpType == RscpType.NoneType: + return "None" + return type_.name def getErrorcode(error_hex):