Skip to content

Commit

Permalink
a simple IEC-62056-21 impl
Browse files Browse the repository at this point in the history
  • Loading branch information
marq24 committed Jun 23, 2024
1 parent 325c59d commit 88ab36b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
80 changes: 46 additions & 34 deletions custom_components/tibber_local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def should_poll(self) -> bool:
class IntBasedObisCode:
# This is for sure a VERY STUPID Python class - but I am a NOOB - would be cool, if someone could teach me
# how I could fast convert my number array to the required format...
def __init__(self, obis_src: list):
def __init__(self, obis_src: list, do_log_output: bool):
try:
_a = int(obis_src[1])
_b = int(obis_src[2])
Expand All @@ -214,8 +214,9 @@ def __init__(self, obis_src: list):
# self.obis_short = f'{_c}.{_d}.{_e}'
self.obis_hex = f'{self.get_as_two_digit_hex(_a)}{self.get_as_two_digit_hex(_b)}{self.get_as_two_digit_hex(_c)}{self.get_as_two_digit_hex(_d)}{self.get_as_two_digit_hex(_e)}{self.get_as_two_digit_hex(_f)}'
except Exception as e:
_LOGGER.warning(
f"could not parse a value as int from list {obis_src} - Please check the position of your Tibber Pulse reading head (you might need to rotate it few degrees anti clock wise) - Exception: {e}")
if (do_log_output):
_LOGGER.warning(
f"could not parse a value as int from list {obis_src} - Please check the position of your Tibber Pulse reading head (you might need to rotate it few degrees anti clock wise) - Exception: {e}")

@staticmethod
def get_as_two_digit_hex(input: int) -> str:
Expand Down Expand Up @@ -254,6 +255,10 @@ async def detect_com_mode(self):
await self._check_modes_internal(MODE_99_PLAINTEXT, MODE_3_SML_1_04)
elif self._com_mode == MODE_0_AutoScanMode:
await self._check_modes_internal(MODE_3_SML_1_04, MODE_99_PLAINTEXT)
elif self._com_mode == MODE_1_IEC_62056_21:
# https://github.com/marq24/ha-tibber-pulse-local/issues/29
# looks like we can parse 'IEC_62056_21' as plaintext?!
await self._check_modes_internal(MODE_99_PLAINTEXT, MODE_3_SML_1_04)

# finally raise value error if not implemented yet!
if self._com_mode not in ENUM_IMPLEMENTATIONS:
Expand Down Expand Up @@ -308,8 +313,6 @@ async def read_tibber_local(self, mode: int, retry: bool, log_payload: bool = Fa
if res.status == 200:
if mode == MODE_3_SML_1_04:
await self.read_sml(await res.read(), retry, log_payload)
#elif mode == MODE_1_IEC_62056_21:
# await self.read_ice62056(await res.read(), retry, log_payload)
elif mode == MODE_99_PLAINTEXT:
await self.read_plaintext(await res.text(), retry, log_payload)
else:
Expand All @@ -330,41 +333,50 @@ async def read_plaintext(self, plaintext: str, retry: bool, log_payload: bool):
plaintext = plaintext.replace(' ', '\r')

for a_line in plaintext.splitlines():
try:
# a patch for invalid reading?!
# a_line = a_line.replace('."55*', '.255*')

# obis pattern is 'a-b:c.d.e*f'
parts = re.split('(.*?)-(.*?):(.*?)\\.(.*?)\\.(.*?)\\*(.*?)\\((.*?)\\)', a_line)
if len(parts) == 9:
int_obc = IntBasedObisCode(parts, not self.ignore_parse_errors)
value = parts[7]
unit = None
if '*' in value:
val_with_unit = value.split("*")
if '.' in val_with_unit[0]:
value = float(val_with_unit[0])
else:
value = int(val_with_unit[0])

# a patch for invalid reading?!
# a_line = a_line.replace('."55*', '.255*')

# obis pattern is 'a-b:c.d.e*f'
parts = re.split('(.*?)-(.*?):(.*?)\\.(.*?)\\.(.*?)\\*(.*?)\\((.*?)\\)', a_line)
if len(parts) == 9:
int_obc = IntBasedObisCode(parts)
value = parts[7]
unit = None
if '*' in value:
val_with_unit = value.split("*")
if '.' in val_with_unit[0]:
value = float(val_with_unit[0])
# converting any "kilo" unit to base unit...
# so kWh will be converted to Wh - or kV will be V
if val_with_unit[1].lower()[0] == 'k':
value = value * 1000;
val_with_unit[1] = val_with_unit[1][1:]
unit = self.find_unit_int_from_string(val_with_unit[1])

# creating finally the "right" object from the parsed information
entry = SmlListEntry()
entry.obis = ObisCode(int_obc.obis_hex)
entry.value = value
entry.unit = unit
temp_obis_values.append(entry)
else:
if parts[0] == '!':
break;
elif parts[0][0] != '/':
if not self.ignore_parse_errors:
_LOGGER.debug(f'unknown entry: {parts[0]} (line: {a_line})')
# else:
# print('ignore '+ parts[0])

unit = self.find_unit_int_from_string(val_with_unit[1])

# creating finally the "right" object from the parsed information
if hasattr(int_obc, "obis_hex"):
entry = SmlListEntry()
entry.obis = ObisCode(int_obc.obis_hex)
entry.value = value
entry.unit = unit
temp_obis_values.append(entry)
else:
if parts[0] == '!':
break;
elif len(parts[0]) > 0 and parts[0][0] != '/':
if not self.ignore_parse_errors:
_LOGGER.debug(f'unknown entry: {parts[0]} (line: {a_line})')
#else:
# print('ignore '+ parts[0])

except Exception as e:
if not self.ignore_parse_errors:
_LOGGER.info(f"{e}")

if len(temp_obis_values) > 0:
self._obis_values = {}
Expand Down
2 changes: 1 addition & 1 deletion custom_components/tibber_local/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
"iot_class": "local_polling",
"issue_tracker": "https://github.com/marq24/ha-tibber-pulse-local/issues",
"requirements": ["smllib==1.4"],
"version": "2024.6.2"
"version": "2024.6.3"
}

0 comments on commit 88ab36b

Please sign in to comment.