You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Mostly, aiomqtt help done the work asyncly and nicely.
However, recently when trying to subscribe a sensor-mqtt-topic for sending these messages to a unix domain socket,I got a blocking outcome, with lots "mqtt queue logger.warning".
Of cause,these messages were about 90 Hz in-coming, with QoS=0。
But, maybe a suggestion is there :
Can aiomqtt queue has a ring buffer option, for high frequency in-coming messages?
Next is:
relate aiomqq client.py code:
def_on_message(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) ->None:
# Convert the paho.mqtt message into our own Message typem=Message._from_paho_message(message) # noqa: SLF001# Put the message in the message queuetry:
self._queue.put_nowait(m)
exceptasyncio.QueueFull:
self._logger.warning("Message queue is full. Discarding message.")
try to have a ring buffer (Now not async....this is the problem we want to have your suggestion....thanks)
from collections import deque
...
import asyncio
import aiomqtt
BUFFER_SIZE = 10
mqtt_to_unix_buffer = deque(maxlen=BUFFER_SIZE)
...
async def forward_mqtt_to_unix(mqtt_client, unix_reader, unix_writer):
global mqtt_to_unix_count
async with mqtt_client.filtered_messages(COMMAND_TOPIC) as messages:
async for message in messages:
data = json.loads(message.payload)
mqtt_to_unix_buffer.append(data)
mqtt_to_unix_count += 1
try:
await unix_writer.write(json.dumps(data).encode())
await unix_writer.drain()
except Exception as e:
print(f"Error sending to Unix Socket: {e}")
break
The text was updated successfully, but these errors were encountered:
#302 contains code that supports separate queues and doesn't require any topic filtering if the server supports subscription IDs (most do).
If you then split the Unix writer into two tasks (one that reads messages from the queue and stores them in a ring buffer, with possible data loss, and another that reads from the buffer and generates your JSON) there should no longer be a problem.
NB I'd recommend to simply forward the JSON to the Unix socket instead of decoding and re-encoding the data …?
Mostly, aiomqtt help done the work asyncly and nicely.
However, recently when trying to subscribe a sensor-mqtt-topic for sending these messages to a unix domain socket,I got a blocking outcome, with lots "mqtt queue logger.warning".
Of cause,these messages were about 90 Hz in-coming, with QoS=0。
But, maybe a suggestion is there :
Can aiomqtt queue has a ring buffer option, for high frequency in-coming messages?
Next is:
The text was updated successfully, but these errors were encountered: