-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqtt.js
211 lines (186 loc) · 8.75 KB
/
mqtt.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Primitive MQTT Client for Commandfusion
// Version 0.2
// © 2015 Artyom Syomushkin
var message_queue = ''; // Buffer to store all incoming data
var constants = {
types: {
0: 'reserved',
1: 'connect',
2: 'connack',
3: 'publish',
4: 'puback',
5: 'pubrec',
6: 'pubrel',
7: 'pubcomp',
8: 'subscribe',
9: 'suback',
10: 'unsubscribe',
11: 'unsuback',
12: 'pingreq',
13: 'pingresp',
14: 'disconnect',
15: 'auth'
},
LENGTH_MASK: 0x7F,
LENGTH_FIN_MASK: 0x80
}
//CF.userMain = function() {
// iViewer initialization is now complete and the CF environment is fully available
// CF.watch(CF.FeedbackMatchedEvent, "Mqtt", "MQTT_Receive", MQTT_Process_Data);
//};
// Helper function that takes an array of values and generates a string
// If the value is a number, a byte will be added to the string
// If the value is a string, it will be copied verbatim to the resulting string
function binaryString(array) {
return array.map(function (item) {
return (typeof (item) == "number") ? String.fromCharCode(item) : item;
}).join("");
}
// Send CONNECT Message to MQTT Broker
// client_id: Name of the client
// keep_alive_time : Time interval in seconds for watchdog messages to send to broker
// system : system name, automatically generated by iViewer in command
// Usage example: MQTT_Connect("panel1", 60, system) will send following connect message:
// //\x10\x14\x00\x06MQIsdp\x03\x02\x00\x3c\x00\x06panel1
function MQTT_Connect(client_id, keep_alive_time, system) {
// Start watching for MQTT incoming packets
message_queue = "";
CF.watch(CF.FeedbackMatchedEvent, system, "MQTT_Receive", MQTT_Process_Data);
CF.log("Connecting to : " + system + " Client ID: " + client_id + " Keep Alive interval: " + keep_alive_time + " s");
// Calculate message length: 8 bytes for protocol name(MQISDP), 2 bytes for protocol level and flags,
// 2 bytes for Keep alive time, length of client ID
var message_length = 14 + client_id.length;
CF.log("Message length is : " + message_length);
// Send CONNECT message 0x10 by assembling: 0x10 + Message Length + 0x00 + 0x06(Length of "MQIsdp") + "MQIsdp" + 0x03(Protocol level) +
// + 0x02(Flags) + 0x00 + Keep_alive + Length of Client_ID + Client_ID
var message = binaryString([0x10, message_length, 0x00, 0x06, "MQIsdp", 0x03, 0x02, 0x00, keep_alive_time, 0x00, client_id.length, client_id]);
CF.send(system, message);
CF.log("Message sent " + message);
}
// Send PUBLISH Message to MQTT Broker
// topic : Publish Topic
// paramValue : Value to send
// system : system name, automatically generated by iViewer in command
// Example: MQTT_Publish("/myhome/command/Light_FF_Bath_Ceiling/state","ON", system) will send following message
// \x30\x2f\x00\x2b/myhome/command/Light_FF_Bath_Ceiling/stateON
function MQTT_Publish(topic, paramValue, system) {
CF.log("Publishing to : " + topic + " Value: " + paramValue);
// Calculate message length: 2 bytes for topic lenght byte + topic length + payload length
var message_length = 2 + topic.length + paramValue.length;
CF.log("Message length is : " + message_length);
// Send Publish message 0x31 by assembling: 0x31 + Message Length + 0x00 + Length of topic + Value
var message = binaryString([0x31, message_length, 0x00, topic.length, topic, paramValue]);
CF.send(system, message);
CF.log("Message sent " + message);
}
function MQTT_Publish_State(topic, paramstate, system) {
CF.log("Publishing to : " + topic + " Value: " + paramstate);
// Calculate message length: 2 bytes for topic lenght byte + topic length + payload length
if (paramstate == 0) {
paramValue = "OFF";
}
else {
paramValue = "ON";
}
var message_length = 2 + topic.length + paramValue.length;
CF.log("Message length is : " + message_length);
// Send Publish message by assembling: 0x31 + Message Length + 0x00 + Length of topic + Value
var message = binaryString([0x31, message_length, 0x00, topic.length, topic, paramValue]);
CF.send(system, message);
CF.log("Message sent " + message);
}
// Send SUBSCRIBE Message to MQTT Broker
// topic_filter: Name of the topic to subscribe
// system : system name, automatically generated by iViewer in command
// Usage example: MQTT_Subscribe("/#", system) will send following SUBSCRIBE message:
// \x82\x07\x00\x01\x00\x02/#\x00
function MQTT_Subscribe(topic_filter, system) {
CF.log("Sending Subscribe Message to topic: " + topic_filter + " To system: " + system);
// Calculate message length: Packet identifier (2 bytes) + Topic filter Length(2 bytes) + Topic Filter + QoS(1 bytes)
var message_length = 2 + 2 + topic_filter.length + 1;
CF.log("Message length is : " + message_length);
var message = binaryString([0x82, message_length, 0x00, 0x01, 0x00, topic_filter.length, topic_filter, 0x00]);
CF.send(system, message);
CF.log("Message sent " + message);
}
function parsePacketLength(packet, fullInfoFlag) {
var bytes = 0;
var mul = 1;
var length = 0;
var result = true;
var current;
var padding = 0;
var offset = 1; // First byte in packet is various flags, so jump to next byte
while (bytes < 5) {
current = packet.charCodeAt(offset + bytes++);
length += mul * (current & constants.LENGTH_MASK);
mul *= 0x80;
if ((current & constants.LENGTH_FIN_MASK) === 0)
break;
if (packet.length <= bytes) {
result = false;
break;
}
}
length += (1 + bytes); // Account for the fixed header byte
result = result
? fullInfoFlag ? {
bytes: bytes,
value: length
} : length
: false
return result;
}
// Function processes incoming messages from MQTT Broker, and transfers them to loopback system
// matchedString - incoming data
function MQTT_Process_Data(feedbackItem, matchedString) {
CF.log("Receieved MQTT packet: " + matchedString + " Started with: " + matchedString.charCodeAt(0) + " Length: " + matchedString.length);
// Append new message(s) to queue
message_queue = message_queue + matchedString;
CF.log("Message Queue :" + message_queue);
CF.log("Message Queue length is : " + message_queue.length);
var type = constants.types[message_queue.charCodeAt(0) >> 4];
var result = parsePacketLength(message_queue, true);
if (result === false) {
CF.log("parsePacketLength returned FALSE, which means error calculating length!");
CF.log("Maybe too short packet. Need to wait for next packet.");
return;
}
var message_length = result.value;
var start_byte = result.bytes + 1;
CF.logObject(result);
CF.log("Packet Length: " + result.value);
while (message_queue.length > 0) {
if (message_queue.length < message_length) {
CF.log("Too short packet. Need to wait for next packet");
return;
}
// Start parsing
switch (type) {
case 'publish': // PUBLISH Command
CF.log("Received PUBLISH Message.");
var message = message_queue.substr(0, message_length); // Extract message
CF.log("Extracted message with length: " + message_length + " Message: " + message);
// Extracting topic and value
var topic_length = (message_queue.charCodeAt(start_byte) << 8) | message_queue.charCodeAt(start_byte + 1);
CF.log("Topic length: " + topic_length);
var topic = message.substr(start_byte + 2, topic_length);
var value = message.slice(start_byte + 2 + topic_length);
CF.log("Extracted Topic: " + topic + " Value: " + value);
// Sending extracted topic and value to loopback system in format Topic=Value \x0D. \x=0D is used as EOM Symbol.
// In this way processing can be done by Loopback system Feedbacks
CF.send("Loopback", topic + '=' + value + ' \x0D');
// Remove processed message from queue
message_queue = message_queue.slice(message_length);
CF.log("Remaining queue length: " + message_queue.length);
break;
default:
// All noninteresting messages dropped here (CONACK, SUBACK, PING etc).
//Thanks to MQTT Message format we can remove them without knowing what is it
CF.log("Received unknown Message. " + type);
message_queue = message_queue.slice(message_length);
CF.log("Remaining queue length: " + message_queue.length);
break;
}
}
}