diff --git a/README.md b/README.md index 398ccf1..8e733a3 100644 --- a/README.md +++ b/README.md @@ -370,6 +370,24 @@ cannot be overridden. Make sure to map the host device accordingly. +`MESSAGE_INTERVAL` + + + + +Minimum average interval between messages in seconds. Set to positive +value to enable rate limiting. + + + + +0 + + + + + + `LOG_LEVEL` diff --git a/config.py b/config.py index e238187..0eca3c7 100644 --- a/config.py +++ b/config.py @@ -17,9 +17,9 @@ import re import dsmr_parser.clients from dsmr_parser import telegram_specifications -from dsmr_parser.clients.settings import serial -def get_env_opt(name, ty, required, default = None): + +def get_env_opt(name, ty, required, default=None): if name in os.environ: value = os.environ[name] else: @@ -43,13 +43,18 @@ def get_env_opt(name, ty, required, default = None): return ty(value) + class Config: def __init__(self): self.LOG_LEVEL = get_env_opt('LOG_LEVEL', str, False, 'INFO') logging.getLogger().setLevel(self.LOG_LEVEL) serial_settings_pfx = 'SERIAL_SETTINGS_' - serial_settings_options = [s[len(serial_settings_pfx):] for s in dir(dsmr_parser.clients) if s.startswith(serial_settings_pfx)] + serial_settings_options = [ + s[len(serial_settings_pfx):] + for s in dir(dsmr_parser.clients) + if s.startswith(serial_settings_pfx) + ] logging.debug(f'Possible values for SERIAL_SETTINGS: {serial_settings_options}') serial_settings = get_env_opt('SERIAL_SETTINGS', serial_settings_options, False, 'V4') @@ -63,20 +68,22 @@ def __init__(self): self.SERIAL_DEVICE = get_env_opt('SERIAL_DEVICE', str, True) self.DSMR_VERSION = getattr(telegram_specifications, dsmr_version) - self.MQTT_HOST = get_env_opt('MQTT_HOST', str, True) - self.MQTT_PORT = get_env_opt('MQTT_PORT', int, False, None) - self.MQTT_TLS = get_env_opt('MQTT_TLS', bool, False, None) + self.MQTT_HOST = get_env_opt('MQTT_HOST', str, True) + self.MQTT_PORT = get_env_opt('MQTT_PORT', int, False, None) + self.MQTT_TLS = get_env_opt('MQTT_TLS', bool, False, None) self.MQTT_TLS_INSECURE = get_env_opt('MQTT_TLS_INSECURE', bool, False, False) - self.MQTT_CA_CERTS = get_env_opt('MQTT_CA_CERTS', str, False) - self.MQTT_CERTFILE = get_env_opt('MQTT_CERTFILE', str, False) - self.MQTT_KEYFILE = get_env_opt('MQTT_KEYFILE', str, False) - self.MQTT_USERNAME = get_env_opt('MQTT_USERNAME', str, False) - self.MQTT_PASSWORD = get_env_opt('MQTT_PASSWORD', str, self.MQTT_USERNAME is not None) + self.MQTT_CA_CERTS = get_env_opt('MQTT_CA_CERTS', str, False) + self.MQTT_CERTFILE = get_env_opt('MQTT_CERTFILE', str, False) + self.MQTT_KEYFILE = get_env_opt('MQTT_KEYFILE', str, False) + self.MQTT_USERNAME = get_env_opt('MQTT_USERNAME', str, False) + self.MQTT_PASSWORD = get_env_opt('MQTT_PASSWORD', str, self.MQTT_USERNAME is not None) self.MQTT_TOPIC_PREFIX = get_env_opt('MQTT_TOPIC_PREFIX', str, False, 'dsmr') - self.HA_DEVICE_ID = get_env_opt('HA_DEVICE_ID', str, False, 'dsmr') + self.HA_DEVICE_ID = get_env_opt('HA_DEVICE_ID', str, False, 'dsmr') self.HA_DISCOVERY_PREFIX = get_env_opt('HA_DISCOVERY_PREFIX', str, False, 'homeassistant') + self.MESSAGE_INTERVAL = get_env_opt('MESSAGE_INTERVAL', int, False, 0) + # Make sure MQTT_PORT has a valid value if self.MQTT_PORT is None: if self.MQTT_TLS is not None and self.MQTT_TLS: diff --git a/mqtt4dsmr.py b/mqtt4dsmr.py index 5773eca..ad6f39b 100755 --- a/mqtt4dsmr.py +++ b/mqtt4dsmr.py @@ -16,18 +16,16 @@ import json import logging -import os -import re import sys -import dsmr_parser.clients -from enum import Enum -from dsmr_parser import telegram_specifications +import time from dsmr_parser.clients import SerialReader +from threading import Thread, Condition import paho.mqtt.client as mqtt from config import Config + class SensorKind: - def __init__(self, subtopic, device_class, unit, icon, state_class=None): + def __init__(self, subtopic, device_class, unit, icon, state_class): self.subtopic = subtopic self.device_class = device_class self.unit = unit @@ -47,6 +45,7 @@ def amend_sensor_dict(self, sensor): if self.state_class is not None: sensor['state_class'] = self.state_class + def attr_name(attr): name = attr[0] + attr[1:].lower().replace('_', ' ') @@ -57,18 +56,19 @@ def attr_name(attr): return name + class Schema: def __init__(self, telegram, topic): unit_info = { - 'W': SensorKind('elec', 'power', 'W', 'mdi:lightning-bolt', state_class='measurement'), - 'kW': SensorKind('elec', 'power', 'kW', 'mdi:lightning-bolt', state_class='measurement'), - 'Wh': SensorKind('elec', 'energy', 'Wh', 'mdi:lightning-bolt', state_class='total'), - 'kWh': SensorKind('elec', 'energy', 'kWh', 'mdi:lightning-bolt', state_class='total'), - 'V': SensorKind('elec', 'voltage', 'V', 'mdi:lightning-bolt', state_class='measurement'), - 'm3': SensorKind('gas', 'gas', 'm³', 'mdi:meter-gas', state_class='total') + 'W': SensorKind('elec', 'power', 'W', 'mdi:lightning-bolt', 'measurement'), + 'kW': SensorKind('elec', 'power', 'kW', 'mdi:lightning-bolt', 'measurement'), + 'Wh': SensorKind('elec', 'energy', 'Wh', 'mdi:lightning-bolt', 'total'), + 'kWh': SensorKind('elec', 'energy', 'kWh', 'mdi:lightning-bolt', 'total'), + 'V': SensorKind('elec', 'voltage', 'V', 'mdi:lightning-bolt', 'measurement'), + 'm3': SensorKind('gas', 'gas', 'm³', 'mdi:meter-gas', 'total') } - diag = SensorKind('diag', None, None, 'mdi:counter') + diag = SensorKind('diag', None, None, 'mdi:counter', None) self.attributes = {} for attr, obj in telegram: @@ -89,6 +89,7 @@ def __init__(self, telegram, topic): self.attributes[attr] = (kind.topic_name(topic, attr), kind) def publish(self, client, telegram): + logging.debug('Sending message according to schema') for key, (topic, _) in self.attributes.items(): val = getattr(telegram, key).value client.publish(topic, str(val)) @@ -120,6 +121,79 @@ def publish_ha_discovery(self, client, disc_pfx, dev_id, availability): # see https://www.home-assistant.io/integrations/mqtt/#sensors device = simple_device + +class DirectPublisher: + def __init__(self, schema, client): + self.schema = schema + self.client = client + + def publish(self, telegram): + self.schema.publish(self.client, telegram) + + +class RateLimitedPublisher: + def __init__(self, schema, client, interval): + self.schema = schema + self.client = client + self.interval_ns = interval * 1000000000 + self.telegram = None + self.msg = Condition() + self.tick = Condition() + self.total_msgs = 0 + self.epoch = time.monotonic_ns() + self.rate_ok = True + + logging.debug(f'Rate limiter epoch: {self.epoch} ns') + + Thread(target=self.ticker, daemon=True).start() + Thread(target=self.loop, daemon=True).start() + + def next_ts(self): + return self.epoch + self.interval_ns * self.total_msgs + + def ticker(self): + with self.tick: + while True: + self.tick.wait() + sleep_ns = self.next_ts() - time.monotonic_ns() + if sleep_ns > 0: + logging.debug(f'Rate limiter delay: {sleep_ns} ns') + time.sleep(sleep_ns / 1000000000) + else: + logging.debug('No rate limiter delay') + + with self.msg: + self.rate_ok = True + self.msg.notify() + + def loop(self): + with self.msg: + while True: + self.msg.wait() + if not self.rate_ok: + logging.debug('Got message, but not ready to publish yet') + continue + + if self.telegram is not None: + logging.debug('Ready to publish message') + self.schema.publish(self.client, self.telegram) + self.total_msgs += 1 + self.telegram = None + + with self.tick: + # Reset; if the rate is still okay after sending + # a new message, the ticker will tell us so + self.rate_ok = False + self.tick.notify() + else: + logging.debug('Ready to publish, but no message queued') + + def publish(self, telegram): + with self.msg: + self.telegram = telegram + self.msg.notify() + + def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s') @@ -136,7 +210,7 @@ def main(): ) client.tls_insecure_set(cfg.MQTT_TLS_INSECURE) else: - logging.warning('Not using MQTT over TLS; set MQTT_PORT=8883 or set MQTT_TLS=1 to enable TLS') + logging.warning('Not using MQTT over TLS; set MQTT_PORT=8883 or MQTT_TLS=1 to enable TLS') if cfg.MQTT_USERNAME: logging.info('Using MQTT username/password authentication') @@ -160,19 +234,25 @@ def main(): telegram_specification=cfg.DSMR_VERSION ) - schema = None + publisher = None for telegram in serial_reader.read(): - if schema is None: + logging.debug('Received serial message') + + if publisher is None: schema = Schema(telegram, cfg.MQTT_TOPIC_PREFIX) schema.publish_ha_discovery(client, cfg.HA_DISCOVERY_PREFIX, cfg.HA_DEVICE_ID, avail) - schema.publish(client, telegram) + if cfg.MESSAGE_INTERVAL > 0: + publisher = RateLimitedPublisher(schema, client, cfg.MESSAGE_INTERVAL) + else: + publisher = DirectPublisher(schema, client) + + publisher.publish(telegram) + if __name__ == '__main__': - rc = 0 try: main() except Exception as e: logging.exception(e) - rc = 1 - sys.exit(rc) + sys.exit(1)