-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpatched_paho.py
254 lines (215 loc) · 10.8 KB
/
patched_paho.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import collections
from contextlib import suppress
import logging
import selectors
from typing import cast, Any, Literal
from greenlet import GreenletExit
import paho.mqtt.client as mqtt
from paho.mqtt.client import CallbackOnPublish_v1, CallbackOnPublish_v2, ReasonCode
from paho.mqtt.enums import (
CallbackAPIVersion, MQTTErrorCode, MQTTProtocolVersion, _ConnectionState
)
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.client import MQTTv311, MQTT_LOG_ERR, PUBLISH, DISCONNECT, time_func, Properties
logger = logging.getLogger()
class PatchedPahoClient(mqtt.Client):
def __init__(
self,
callback_api_version: CallbackAPIVersion = CallbackAPIVersion.VERSION1,
client_id: str | None = "",
clean_session: bool | None = None,
userdata: Any = None,
protocol: MQTTProtocolVersion = MQTTv311,
transport: Literal["tcp", "websockets", "unix"] = "tcp",
reconnect_on_failure: bool = True,
manual_ack: bool = False
):
super().__init__(
callback_api_version, client_id, clean_session, userdata,
protocol, transport, reconnect_on_failure, manual_ack
)
self._current_attempt = None
self._reconnect_delay_base = None
self._reconnect_attempts_max = None
# Allows to force Paho stop activity if the trigger in parent thread is set
self.stop_event = None
def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
if timeout < 0.0:
raise ValueError("Invalid timeout.")
# Here we replace select with selectors call to avoid max open descriptors
# max open descriptors limit. This patch inspired by
# https://github.com/eclipse-paho/paho.mqtt.python/issues/697
sock_sel = selectors.DefaultSelector()
eventmask = selectors.EVENT_READ
with suppress(IndexError):
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
eventmask = selectors.EVENT_WRITE | eventmask
pending_bytes = 0
if hasattr(self._sock, "pending"):
pending_bytes = self._sock.pending()
# if bytes are pending do not wait in select
if pending_bytes > 0:
timeout = 0.0
try:
if self._sockpairR is None:
sock_sel.register(self._sock, eventmask)
else:
sock_sel.register(self._sock, eventmask)
sock_sel.register(self._sockpairR, selectors.EVENT_READ)
events = sock_sel.select(timeout)
except TypeError:
# Socket isn't correct type, in likelihood connection is lost
# ... or we called disconnect(). In that case the socket will
# be closed but some loop (like loop_forever) will continue to
# call _loop(). We still want to break that loop by returning an
# rc != MQTT_ERR_SUCCESS and we don't want state to change from
# mqtt_cs_disconnecting.
if self._state not in (_ConnectionState.MQTT_CS_DISCONNECTING, _ConnectionState.MQTT_CS_DISCONNECTED):
self._state = _ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except ValueError:
# Can occur if we just reconnected but rlist/wlist contain a -1 for
# some reason.
if self._state not in (_ConnectionState.MQTT_CS_DISCONNECTING, _ConnectionState.MQTT_CS_DISCONNECTED):
self._state = _ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except GreenletExit:
# Greenlet finished, let's terminate the connection
return MQTTErrorCode.MQTT_ERR_UNKNOWN
except Exception:
# Note that KeyboardInterrupt, etc. can still terminate since they
# are not derived from Exception
return MQTTErrorCode.MQTT_ERR_UNKNOWN
socklist: list[list] = [[], []]
for key, _event in events:
if key.events & selectors.EVENT_READ:
socklist[0].append(key.fileobj)
if key.events & selectors.EVENT_WRITE:
socklist[1].append(key.fileobj)
if (self._sock in socklist[0] or pending_bytes > 0) and not self.stop_event.is_set():
rc = self.loop_read()
if rc or self._sock is None:
return rc
if self.stop_event.is_set():
return self.use_circuit_breaker(sock_sel)
if (self._sockpairR and self._sockpairR in socklist[0]) and not self.stop_event.is_set():
# Stimulate output write even though we didn't ask for it, because
# at that point publish or other command wasn't present.
socklist[1].insert(0, self._sock)
# Clear sockpairR - only ever a single byte written.
with suppress(BlockingIOError):
# Read many bytes at once - this allows up to 10000 calls to
# publish() inbetween calls to loop().
self._sockpairR.recv(10000)
if self.stop_event.is_set():
return self.use_circuit_breaker(sock_sel)
if self._sock in socklist[1] and not self.stop_event.is_set():
rc = self.loop_write()
if rc or self._sock is None:
return rc
if self.stop_event.is_set():
return self.use_circuit_breaker(sock_sel)
sock_sel.close()
return self.loop_misc()
def use_circuit_breaker(self, socket_selector):
socket_selector.close()
self._state = _ConnectionState.MQTT_CS_DISCONNECTED
self._reconnect_on_failure = False
return MQTTErrorCode.MQTT_ERR_NOMEM
def _packet_write(self) -> MQTTErrorCode:
while True:
if self.stop_event.is_set():
self._handle_pubackcomp = lambda *args: MQTTErrorCode.MQTT_ERR_SUCCESS
# empty in/out buffers
with self._out_message_mutex:
self._out_messages = {}
self._out_packet = collections.deque()
# set empty input buffers
with self._in_message_mutex:
self._in_messages = {}
self._in_packet = {
"command": 0,
"have_remaining": 0,
"remaining_count": [],
"remaining_mult": 1,
"remaining_length": 0,
"packet": bytearray(b""),
"to_process": 0,
"pos": 0,
}
break
try:
packet = self._out_packet.popleft()
except IndexError:
return MQTTErrorCode.MQTT_ERR_SUCCESS
try:
write_length = self._sock_send(
packet['packet'][packet['pos']:])
except (AttributeError, ValueError):
self._out_packet.appendleft(packet)
return MQTTErrorCode.MQTT_ERR_SUCCESS
except BlockingIOError:
self._out_packet.appendleft(packet)
return MQTTErrorCode.MQTT_ERR_AGAIN
except OSError as err:
self._out_packet.appendleft(packet)
self._easy_log(
MQTT_LOG_ERR, 'failed to receive on socket: %s', err)
return MQTTErrorCode.MQTT_ERR_CONN_LOST
if write_length > 0 and not self.stop_event.is_set():
packet['to_process'] -= write_length
packet['pos'] += write_length
if packet['to_process'] == 0:
if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
with self._callback_mutex:
on_publish = self.on_publish
if on_publish:
with self._in_callback_mutex:
try:
if self._callback_api_version == CallbackAPIVersion.VERSION1:
on_publish = cast(CallbackOnPublish_v1, on_publish)
on_publish(self, self._userdata, packet["mid"])
elif self._callback_api_version == CallbackAPIVersion.VERSION2:
on_publish = cast(CallbackOnPublish_v2, on_publish)
on_publish(
self,
self._userdata,
packet["mid"],
ReasonCode(PacketTypes.PUBACK),
Properties(PacketTypes.PUBACK),
)
else:
raise RuntimeError("Unsupported callback API version")
except Exception as err:
self._easy_log(
MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err)
if not self.suppress_exceptions:
raise
# TODO: Something is odd here. I don't see why packet["info"] can't be None.
# A packet could be produced by _handle_connack with qos=0 and no info
# (around line 3645). Ignore the mypy check for now but I feel there is a bug
# somewhere.
packet['info']._set_as_published() # type: ignore
if (packet['command'] & 0xF0) == DISCONNECT:
with self._msgtime_mutex:
self._last_msg_out = time_func()
self._do_on_disconnect(
packet_from_broker=False,
v1_rc=MQTTErrorCode.MQTT_ERR_SUCCESS,
)
self._sock_close()
# Only change to disconnected if the disconnection was wanted
# by the client (== state was disconnecting). If the broker disconnected
# use unilaterally don't change the state and client may reconnect.
if self._state == _ConnectionState.MQTT_CS_DISCONNECTING:
self._state = _ConnectionState.MQTT_CS_DISCONNECTED
return MQTTErrorCode.MQTT_ERR_SUCCESS
else:
# We haven't finished with this packet
self._out_packet.appendleft(packet)
else:
break
with self._msgtime_mutex:
self._last_msg_out = time_func()
return MQTTErrorCode.MQTT_ERR_SUCCESS