-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmqtt.py
177 lines (141 loc) · 5.43 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# Based on https://github.com/emontnemery/domoticz_mqtt_discovery
# Based on https://github.com/emontnemery/domoticz_mqtt_discovery
# version: 1.0.1
#
# Changelog
# 1.0.1: Aligned with PEP8 styleguide
import Domoticz
import time
import json
try:
import os
except:
Domoticz.Debug("Your Python environment is incomplete!")
class MqttClientSH2:
address = ""
port = ""
_connection = None
isConnected = False
on_mqtt_connected_cb = None
on_mqtt_disconnected_cb = None
on_mqtt_message_cb = None
def __init__(self, address, port, client_id, on_mqtt_connected_cb, on_mqtt_disconnected_cb, on_mqtt_message_cb, on_mqtt_subscribed_cb):
Domoticz.Debug("MqttClient::__init__")
self.address = address
self.port = port
self.client_id = client_id if client_id != "" else self._generate_mqtt_client_id()
self.on_mqtt_connected_cb = on_mqtt_connected_cb
self.on_mqtt_disconnected_cb = on_mqtt_disconnected_cb
self.on_mqtt_subscribed_cb = on_mqtt_subscribed_cb
self.on_mqtt_message_cb = on_mqtt_message_cb
self._open()
def __str__(self):
Domoticz.Debug("MqttClient::__str__")
if (self._connection != None):
return str(self._connection)
else:
return "None"
def _generate_mqtt_client_id(self):
retval = 'Domoticz_' + str(int(time.time()))+'_'
try:
rarray = list(os.urandom(4))
for i in range(len(rarray)):
retval += str(rarray[i])
except:
pass # there are nothing we can do
return retval
def _open(self):
Domoticz.Debug("MqttClient::open")
if (self._connection != None):
self.close()
self.isConnected = False
self._connection = Domoticz.Connection(
Name=self.address,
Transport="TCP/IP",
Protocol="MQTTS" if self.port == "8883" else "MQTT",
Address=self.address,
Port=self.port
)
self._connection.Connect()
def ping(self):
Domoticz.Debug("MqttClient::ping")
if (self._connection == None or not self.isConnected):
self._open()
else:
self._connection.Send({'Verb': 'PING'})
def publish(self, topic, payload, retain=0):
Domoticz.Debug("MqttClient::publish " + topic + " (" + payload + ")")
if (self._connection == None or not self.isConnected):
self._open()
else:
self._connection.Send({
'Verb': 'PUBLISH',
'Topic': topic,
'Payload': bytearray(payload, 'utf-8'),
'Retain': retain
})
def subscribe(self, topics):
Domoticz.Debug("MqttClient::subscribe")
subscriptionlist = []
for topic in topics:
subscriptionlist.append({'Topic': topic, 'QoS': 0})
if (self._connection == None or not self.isConnected):
self._open()
else:
self._connection.Send({'Verb': 'SUBSCRIBE', 'Topics': subscriptionlist})
def close(self):
Domoticz.Debug("MqttClient::close")
if self._connection != None and self._connection.Connected():
self._connection.Send({ 'Verb' : 'DISCONNECT' })
self._connection.Disconnect()
self._connection = None
self.isConnected = False
def onConnect(self, Connection, Status, Description):
if (self._connection != Connection):
return
if (Status == 0):
Domoticz.Log("Connected to MQTT Server: {}:{}".format(
Connection.Address, Connection.Port)
)
Domoticz.Debug("MQTT CLIENT ID: '" + self.client_id + "'")
self._connection.Send({'Verb': 'CONNECT', 'ID': self.client_id})
else:
Domoticz.Error("Failed to connect to: {}:{}, Description: {}".format(
Connection.Address, Connection.Port, Description)
)
def onDisconnect(self, Connection):
if (self._connection != Connection):
return
Domoticz.Debug("MqttClient::onDisonnect")
Domoticz.Error("Disconnected from MQTT Server: {}:{}".format(
Connection.Address, Connection.Port)
)
self.close()
if self.on_mqtt_disconnected_cb != None:
self.on_mqtt_disconnected_cb()
def onHeartbeat(self):
if self._connection is None or (not self._connection.Connecting() and not self._connection.Connected() or not self.isConnected):
Domoticz.Debug("MqttClient::Reconnecting")
self._open()
else:
self.ping()
def onMessage(self, Connection, Data):
if (self._connection != Connection):
return
topic = Data['Topic'] if 'Topic' in Data else ''
payload = Data['Payload'].decode('utf8') if 'Payload' in Data else ''
if Data['Verb'] == "CONNACK":
self.isConnected = True
if self.on_mqtt_connected_cb != None:
self.on_mqtt_connected_cb()
if Data['Verb'] == "SUBACK":
if self.on_mqtt_subscribed_cb != None:
self.on_mqtt_subscribed_cb()
if Data['Verb'] == "PUBLISH":
if self.on_mqtt_message_cb != None:
message = ""
try:
message = json.loads(payload)
except ValueError:
message = payload
self.on_mqtt_message_cb(topic, message)