-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
101 lines (81 loc) · 2.83 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from datetime import datetime
import logging
import os
from typing import Optional, Tuple
import time
import pygame
from groundlight import Groundlight, Detector, ImageQuery
gl = Groundlight()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
TRIGGER_INTERVAL_S = (
int(os.getenv("TRIGGER_INTERVAL_S"))
if os.getenv("TRIGGER_INTERVAL_S") is not None
else 3
)
DETECTOR = os.getenv("SA_DETECTOR")
pygame.mixer.init()
sound_mixer = pygame.mixer.music.load("media/dog_barking.mp3")
def env_variables_set():
return (
TRIGGER_INTERVAL_S is not None
and DETECTOR is not None
)
def trigger_sound() -> None:
logger.info("Triggering sound")
pygame.mixer.music.play()
def get_most_recent_iq(
detector: Detector,
) -> None | ImageQuery:
# TODO Find it or we don't
iqs = gl.list_image_queries().results
for iq in iqs:
if iq.detector_id == detector.id:
return iq
return None
def do_loop(
detector: Detector, yes_start_time: Optional[datetime]
) -> Tuple[Optional[datetime], bool]:
"""A single loop for the server. We see if there's a new image query result and if it's a YES, we start a timer."""
result = get_most_recent_iq(detector)
now = time.time()
logger.info(f"Current time: {now}")
logger.info(f"{result=}")
if result and result.result.label == "YES":
logger.info("Received YES result!")
if yes_start_time is None:
logger.info("Starting yes timer....")
yes_start_time = now
elif now - yes_start_time >= 10: # We alarm if we don't get a no for 10 seconds
logger.info(
"Received yes for more than 10 seconds. Triggering sound"
)
# NOTE: We don't time out the sound, it will play until the end
trigger_sound()
return yes_start_time
def start_server_loop() -> None:
"""Kick off the server loop, doing necesary setup like creating/getting the relevant detetor and continually
checking for environment variables if they're not set"""
try:
detector = gl.get_detector(DETECTOR)
except:
logger.info("Detector not found, terminating process")
exit(1)
# initialize the next time we should run the loop as now
next_run_time = time.time()
yes_start_time = None
while True:
now = time.time()
if env_variables_set():
if now >= next_run_time:
yes_start_time = do_loop(
detector, yes_start_time
)
next_run_time = now + TRIGGER_INTERVAL_S
else:
logger.info("Environment variables not set. Sleeping....")
next_run_time = now + 10
time.sleep(min(1, next_run_time - now))
if __name__ == "__main__":
logger.info("Starting loop")
start_server_loop()