-
Notifications
You must be signed in to change notification settings - Fork 2
/
ac_control.py
executable file
·108 lines (80 loc) · 2.97 KB
/
ac_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python3
import paho.mqtt.client as mqtt
import json
from proto import parse_packet, init_set_cmd, set, make_packet
MQTTHOST = "192.168.0.1"
last_cmd = init_set_cmd()
def to_intlist(hex_list):
return [
int(hex_list[i:i + 2], 16) for i in range(len(hex_list)) if i % 2 == 0
]
def to_hexlist(int_list):
hex_list = ""
for i in int_list:
hex_list += "{:02X}".format(i)
return hex_list
def set_cmd(arg, val):
global last_cmd
set(last_cmd, arg, val)
pkt = to_hexlist(make_packet(last_cmd))
client.publish("cmnd/AC/SerialSend5", pkt)
def setup_mqtt(rc):
def result_fn(_a, _b, msg):
print(msg.payload.decode("utf-8"))
jsn = json.loads(msg.payload.decode("utf-8"))
if "SerialReceived" not in jsn:
return
state = jsn["SerialReceived"]
state_int = [
int(state[i:i + 2], 16) for i in range(len(state)) if i % 2 == 0
]
try:
resp = parse_packet(state_int)['Packet']
except Exception as e:
print(e)
return
if "Get" in resp:
state_jsn = resp["Get"]
elif "Set" in resp:
state_jsn = resp["Set"]
else:
return
print(state_jsn)
client.publish("ac_control/state", json.dumps(state_jsn))
client.message_callback_add("tele/AC/RESULT", result_fn)
client.subscribe("tele/AC/RESULT")
def set_fn(_a, _b, msg):
args = msg.payload.decode("utf-8").strip().split(' ', 1)
set_cmd(args[0], args[1])
client.message_callback_add("ac_control/set", set_fn)
client.subscribe("ac_control/set")
def set_pwr_fn(_a, _b, msg):
set_cmd("pwr", msg.payload.decode("utf-8"))
client.message_callback_add("ac_control/set_pwr", set_pwr_fn)
client.subscribe("ac_control/set_pwr")
def get_fn(_a, _b, msg):
pkt = to_hexlist(make_packet([4, 2, 1, 0]))
client.publish("cmnd/AC/SerialSend5", pkt)
client.message_callback_add("ac_control/get", get_fn)
client.subscribe("ac_control/get")
def custom_fn(_a, _b, msg):
pl = msg.payload.decode("utf-8")
if len(pl) % 2 != 0: # todo check hex
return
pkt = to_hexlist(make_packet(to_intlist(pl)))
client.publish("cmnd/AC/SerialSend5", pkt)
client.message_callback_add("ac_control/custom", custom_fn)
client.subscribe("ac_control/custom")
def set_temp_fn(_a, _b, msg):
set_cmd("temp", msg.payload.decode("utf-8"))
client.message_callback_add("ac_control/set_temp", set_temp_fn)
client.subscribe("ac_control/set_temp")
def set_fan_fn(_a, _b, msg):
set_cmd("fan", msg.payload.decode("utf-8"))
client.message_callback_add("ac_control/set_fan", set_fan_fn)
client.subscribe("ac_control/set_fan")
if __name__ == "__main__":
client = mqtt.Client()
client.on_connect = lambda client, userdata, flags, rc: setup_mqtt(rc)
client.connect(MQTTHOST, 1883, 60)
client.loop_forever()