-
Notifications
You must be signed in to change notification settings - Fork 0
/
climate.py
132 lines (99 loc) · 3.18 KB
/
climate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from lywsd03mmc import Lywsd03mmcClient
import time
import os
import json
import watchdog
from datetime import datetime, timedelta
SENSOR_ADDR = "A4:C1:38:..." # Your sensor's MAC address here (visible in the app)
READ_FREQUENCY = 0.5 * 60 # Frequency (in mins) to check the sensor
DIR = "climate/" # Where to store the readings
SENSOR = None
RETRIES = 0
_last = None
_last_read = None
def bounded(a, b, c):
return min(max(a, b), c)
def read_sensor():
global SENSOR, _last, _last_read, READ_FREQUENCY
if _last is None or _last_read is None or (datetime.now() - _last_read).total_seconds() >= READ_FREQUENCY:
try:
with watchdog.Watchdog(60):
if SENSOR is None:
SENSOR = Lywsd03mmcClient(SENSOR_ADDR)
data = SENSOR.data
if data is not None:
_last = data
_last_read = datetime.now()
return (_last, _last_read)
except:
pass
return None
def structure_reading(reading):
return {
"moment": reading[1].isoformat(),
"temperature": reading[0].temperature,
"humidity": reading[0].humidity,
"battery": reading[0].battery
}
def write_reading(path, reading):
if reading is None:
return
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(structure_reading(reading), f)
def save_reading(reading):
global DIR
if reading is None:
return
path = os.path.join(
DIR,
reading[1].strftime("%Y-%m-%d"),
reading[1].strftime("%Y-%m-%d_%H-%M-%S") + ".json"
)
write_reading(path, reading)
def save_latest_reading(reading):
global DIR
if reading is None:
return
path = os.path.join(DIR, "latest.copy.json")
final_path = os.path.join(DIR, "latest.json")
write_reading(path, reading)
os.rename(path, final_path)
def get_reading():
print(datetime.now(), "Reading sensor...")
result = read_sensor()
if result is None:
print(" Unable to read sensor")
return False
print(" Result:", result)
print(" Saving reading")
save_reading(result)
print(" Saving as latest")
save_latest_reading(result)
return True
if __name__ == "__main__":
while True:
start = datetime.now()
result = False
try:
result = get_reading()
except Exception as e:
print("ERROR: ", e)
result = False
delta = 60
if result:
RETRIES = 0
print("Sleeping for 30")
delta = (start + timedelta(minutes=30)) - datetime.now()
else:
RETRIES = bounded(1, RETRIES + 1, 20)
if RETRIES % 2 == 0:
SENSOR = None
print("Resetting sensor connection")
print("Retrying in", RETRIES + 1, "minute(s)")
delta = (start + timedelta(minutes=RETRIES + 1)) - datetime.now()
if delta.total_seconds() > 0:
time.sleep(delta.total_seconds())
else:
print("Delta was negative, sleeping for 10, delta=", delta)
time.sleep(10)