Skip to content

Commit

Permalink
Add rate limiter
Browse files Browse the repository at this point in the history
Add option MESSAGE_INTERVAL. If the user sets this to a positive value,
send out MQTT messages on a schedule. Make sure the average interval
between messages, measured from the start of the programme, never dips
beneath MESSAGE_INTERVAL.

When dealing with stuff like this I always miss goroutines and
channels...

Fix formatting so flake8 no longer complains.
  • Loading branch information
antonijn committed Jan 27, 2024
1 parent 4cf0566 commit 0d7d60c
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 32 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,24 @@ cannot be overridden. Make sure to map the host device accordingly.
<tr>
<td>

`MESSAGE_INTERVAL`

</td>
<td>

Minimum average interval between messages in seconds. Set to positive
value to enable rate limiting.

</td>
<td>

0

</td>
</tr>
<tr>
<td>

`LOG_LEVEL`

</td>
Expand Down
31 changes: 19 additions & 12 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import re
import dsmr_parser.clients
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.settings import serial

def get_env_opt(name, ty, required, default = None):

def get_env_opt(name, ty, required, default=None):
if name in os.environ:
value = os.environ[name]
else:
Expand All @@ -43,13 +43,18 @@ def get_env_opt(name, ty, required, default = None):

return ty(value)


class Config:
def __init__(self):
self.LOG_LEVEL = get_env_opt('LOG_LEVEL', str, False, 'INFO')
logging.getLogger().setLevel(self.LOG_LEVEL)

serial_settings_pfx = 'SERIAL_SETTINGS_'
serial_settings_options = [s[len(serial_settings_pfx):] for s in dir(dsmr_parser.clients) if s.startswith(serial_settings_pfx)]
serial_settings_options = [
s[len(serial_settings_pfx):]
for s in dir(dsmr_parser.clients)
if s.startswith(serial_settings_pfx)
]
logging.debug(f'Possible values for SERIAL_SETTINGS: {serial_settings_options}')
serial_settings = get_env_opt('SERIAL_SETTINGS', serial_settings_options, False, 'V4')

Expand All @@ -63,20 +68,22 @@ def __init__(self):
self.SERIAL_DEVICE = get_env_opt('SERIAL_DEVICE', str, True)
self.DSMR_VERSION = getattr(telegram_specifications, dsmr_version)

self.MQTT_HOST = get_env_opt('MQTT_HOST', str, True)
self.MQTT_PORT = get_env_opt('MQTT_PORT', int, False, None)
self.MQTT_TLS = get_env_opt('MQTT_TLS', bool, False, None)
self.MQTT_HOST = get_env_opt('MQTT_HOST', str, True)
self.MQTT_PORT = get_env_opt('MQTT_PORT', int, False, None)
self.MQTT_TLS = get_env_opt('MQTT_TLS', bool, False, None)
self.MQTT_TLS_INSECURE = get_env_opt('MQTT_TLS_INSECURE', bool, False, False)
self.MQTT_CA_CERTS = get_env_opt('MQTT_CA_CERTS', str, False)
self.MQTT_CERTFILE = get_env_opt('MQTT_CERTFILE', str, False)
self.MQTT_KEYFILE = get_env_opt('MQTT_KEYFILE', str, False)
self.MQTT_USERNAME = get_env_opt('MQTT_USERNAME', str, False)
self.MQTT_PASSWORD = get_env_opt('MQTT_PASSWORD', str, self.MQTT_USERNAME is not None)
self.MQTT_CA_CERTS = get_env_opt('MQTT_CA_CERTS', str, False)
self.MQTT_CERTFILE = get_env_opt('MQTT_CERTFILE', str, False)
self.MQTT_KEYFILE = get_env_opt('MQTT_KEYFILE', str, False)
self.MQTT_USERNAME = get_env_opt('MQTT_USERNAME', str, False)
self.MQTT_PASSWORD = get_env_opt('MQTT_PASSWORD', str, self.MQTT_USERNAME is not None)
self.MQTT_TOPIC_PREFIX = get_env_opt('MQTT_TOPIC_PREFIX', str, False, 'dsmr')

self.HA_DEVICE_ID = get_env_opt('HA_DEVICE_ID', str, False, 'dsmr')
self.HA_DEVICE_ID = get_env_opt('HA_DEVICE_ID', str, False, 'dsmr')
self.HA_DISCOVERY_PREFIX = get_env_opt('HA_DISCOVERY_PREFIX', str, False, 'homeassistant')

self.MESSAGE_INTERVAL = get_env_opt('MESSAGE_INTERVAL', int, False, 0)

# Make sure MQTT_PORT has a valid value
if self.MQTT_PORT is None:
if self.MQTT_TLS is not None and self.MQTT_TLS:
Expand Down
120 changes: 100 additions & 20 deletions mqtt4dsmr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@

import json
import logging
import os
import re
import sys
import dsmr_parser.clients
from enum import Enum
from dsmr_parser import telegram_specifications
import time
from dsmr_parser.clients import SerialReader
from threading import Thread, Condition
import paho.mqtt.client as mqtt
from config import Config


class SensorKind:
def __init__(self, subtopic, device_class, unit, icon, state_class=None):
def __init__(self, subtopic, device_class, unit, icon, state_class):
self.subtopic = subtopic
self.device_class = device_class
self.unit = unit
Expand All @@ -47,6 +45,7 @@ def amend_sensor_dict(self, sensor):
if self.state_class is not None:
sensor['state_class'] = self.state_class


def attr_name(attr):
name = attr[0] + attr[1:].lower().replace('_', ' ')

Expand All @@ -57,18 +56,19 @@ def attr_name(attr):

return name


class Schema:
def __init__(self, telegram, topic):
unit_info = {
'W': SensorKind('elec', 'power', 'W', 'mdi:lightning-bolt', state_class='measurement'),
'kW': SensorKind('elec', 'power', 'kW', 'mdi:lightning-bolt', state_class='measurement'),
'Wh': SensorKind('elec', 'energy', 'Wh', 'mdi:lightning-bolt', state_class='total'),
'kWh': SensorKind('elec', 'energy', 'kWh', 'mdi:lightning-bolt', state_class='total'),
'V': SensorKind('elec', 'voltage', 'V', 'mdi:lightning-bolt', state_class='measurement'),
'm3': SensorKind('gas', 'gas', 'm³', 'mdi:meter-gas', state_class='total')
'W': SensorKind('elec', 'power', 'W', 'mdi:lightning-bolt', 'measurement'),
'kW': SensorKind('elec', 'power', 'kW', 'mdi:lightning-bolt', 'measurement'),
'Wh': SensorKind('elec', 'energy', 'Wh', 'mdi:lightning-bolt', 'total'),
'kWh': SensorKind('elec', 'energy', 'kWh', 'mdi:lightning-bolt', 'total'),
'V': SensorKind('elec', 'voltage', 'V', 'mdi:lightning-bolt', 'measurement'),
'm3': SensorKind('gas', 'gas', 'm³', 'mdi:meter-gas', 'total')
}

diag = SensorKind('diag', None, None, 'mdi:counter')
diag = SensorKind('diag', None, None, 'mdi:counter', None)

self.attributes = {}
for attr, obj in telegram:
Expand All @@ -89,6 +89,7 @@ def __init__(self, telegram, topic):
self.attributes[attr] = (kind.topic_name(topic, attr), kind)

def publish(self, client, telegram):
logging.debug('Sending message according to schema')
for key, (topic, _) in self.attributes.items():
val = getattr(telegram, key).value
client.publish(topic, str(val))
Expand Down Expand Up @@ -120,6 +121,79 @@ def publish_ha_discovery(self, client, disc_pfx, dev_id, availability):
# see https://www.home-assistant.io/integrations/mqtt/#sensors
device = simple_device


class DirectPublisher:
def __init__(self, schema, client):
self.schema = schema
self.client = client

def publish(self, telegram):
self.schema.publish(self.client, telegram)


class RateLimitedPublisher:
def __init__(self, schema, client, interval):
self.schema = schema
self.client = client
self.interval_ns = interval * 1000000000
self.telegram = None
self.msg = Condition()
self.tick = Condition()
self.total_msgs = 0
self.epoch = time.monotonic_ns()
self.rate_ok = True

logging.debug(f'Rate limiter epoch: {self.epoch} ns')

Thread(target=self.ticker, daemon=True).start()
Thread(target=self.loop, daemon=True).start()

def next_ts(self):
return self.epoch + self.interval_ns * self.total_msgs

def ticker(self):
with self.tick:
while True:
self.tick.wait()
sleep_ns = self.next_ts() - time.monotonic_ns()
if sleep_ns > 0:
logging.debug(f'Rate limiter delay: {sleep_ns} ns')
time.sleep(sleep_ns / 1000000000)
else:
logging.debug('No rate limiter delay')

with self.msg:
self.rate_ok = True
self.msg.notify()

def loop(self):
with self.msg:
while True:
self.msg.wait()
if not self.rate_ok:
logging.debug('Got message, but not ready to publish yet')
continue

if self.telegram is not None:
logging.debug('Ready to publish message')
self.schema.publish(self.client, self.telegram)
self.total_msgs += 1
self.telegram = None

with self.tick:
# Reset; if the rate is still okay after sending
# a new message, the ticker will tell us so
self.rate_ok = False
self.tick.notify()
else:
logging.debug('Ready to publish, but no message queued')

def publish(self, telegram):
with self.msg:
self.telegram = telegram
self.msg.notify()


def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s')

Expand All @@ -136,7 +210,7 @@ def main():
)
client.tls_insecure_set(cfg.MQTT_TLS_INSECURE)
else:
logging.warning('Not using MQTT over TLS; set MQTT_PORT=8883 or set MQTT_TLS=1 to enable TLS')
logging.warning('Not using MQTT over TLS; set MQTT_PORT=8883 or MQTT_TLS=1 to enable TLS')

if cfg.MQTT_USERNAME:
logging.info('Using MQTT username/password authentication')
Expand All @@ -160,19 +234,25 @@ def main():
telegram_specification=cfg.DSMR_VERSION
)

schema = None
publisher = None
for telegram in serial_reader.read():
if schema is None:
logging.debug('Received serial message')

if publisher is None:
schema = Schema(telegram, cfg.MQTT_TOPIC_PREFIX)
schema.publish_ha_discovery(client, cfg.HA_DISCOVERY_PREFIX, cfg.HA_DEVICE_ID, avail)

schema.publish(client, telegram)
if cfg.MESSAGE_INTERVAL > 0:
publisher = RateLimitedPublisher(schema, client, cfg.MESSAGE_INTERVAL)
else:
publisher = DirectPublisher(schema, client)

publisher.publish(telegram)


if __name__ == '__main__':
rc = 0
try:
main()
except Exception as e:
logging.exception(e)
rc = 1
sys.exit(rc)
sys.exit(1)

0 comments on commit 0d7d60c

Please sign in to comment.