Skip to content

Commit

Permalink
ledtrig-based gps state indicator
Browse files Browse the repository at this point in the history
  • Loading branch information
ckuethe committed Mar 29, 2024
1 parent eb1e0a3 commit 621bb95
Showing 1 changed file with 55 additions and 25 deletions.
80 changes: 55 additions & 25 deletions src/gpsled.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

"Drive an LED in response to GPS fix state"

import os
import socket
import subprocess
from argparse import ArgumentParser, Namespace
from json import JSONDecodeError
from json import dumps as jdumps
from json import loads as jloads
from re import match as re_match
from time import sleep
import socket
from json import loads as jloads, JSONDecodeError
import os

_example_systemd_unit = """
[Unit]
Expand All @@ -22,12 +25,25 @@
Type=simple
ExecStart=/path/to/gpsled.py -g gpsd://127.0.0.1:2947/ green:lan
Restart=on-failure
NoNewPrivileges=yes
[Install]
WantedBy=multi-user.target
"""


def configure_led(args: Namespace, enable: bool = True):
subprocess.run(["modprobe", "ledtrig-pattern"], check=True)
with open(args.led_path, "w") as ofd:
print("pattern" if enable else "none", file=ofd)

if enable is False:
with open(args.led_path.replace("/trigger", "/brightness"), "w") as ofd:
print("0", file=ofd)
else:
with open(args.led_path.replace("/trigger", "/pattern"), "w") as ofd:
print("0 495 1 5", file=ofd)


def get_args() -> Namespace:
def _gpsd(s):
m = re_match(r"^gpsd://(?P<host>[a-zA-Z0-9_.-]+)(:(?P<port>\d+))?(?P<dev>/.+)?", s)
Expand All @@ -37,21 +53,23 @@ def _gpsd(s):
return None

def _led(s):
"check for /trigger because it's available in all modes"
if isinstance(s, list):
s = s[0]
if s == "/dev/null":
return s
if s.startswith("/sys/class/leds/") and s.endswith("/brightness"):
if s.startswith("/sys/class/leds/") and s.endswith("/trigger"):
return s
else:
return _led(f"/sys/class/leds/{s}/brightness")
return _led(f"/sys/class/leds/{s}/trigger")

ap = ArgumentParser(description="Poll all connected RadiaCode PSRDs and produce spectrograms")
ap = ArgumentParser()
ap.add_argument(
"-g",
"--gpsd",
type=_gpsd,
metavar="URL",
default="gpsd://localhost",
help="Connect to specified device, eg. gpsd://localhost:2947/dev/ttyACM0",
)
ap.add_argument(
Expand All @@ -65,7 +83,7 @@ def _led(s):
dest="led_path",
metavar="PATH",
type=_led,
help="/sys/class/leds/<led>/brightness",
help="/sys/class/leds/<led>/trigger",
)

args = ap.parse_args()
Expand All @@ -83,15 +101,20 @@ def gps_worker(args: Namespace) -> None:
if not os.path.exists(args.led_path):
raise FileNotFoundError(args.led_path)

with open(args.led_path, "at") as ofd:
watch_args = {"enable": True, "json": True}
if args.gpsd["dev"]:
watch_args["device"] = args.gpsd["dev"]
watchstr = "?WATCH=" + jdumps(watch_args)

last_state = None
with open(args.led_path.replace("/trigger", "/pattern"), "wt") as ofd:
while True:
try:
with socket.create_connection(srv, 3) as s:
gpsfd = s.makefile("rw")

print('?WATCH={"enable":true,"json":true}', file=gpsfd, flush=True)
print(watchstr, file=gpsfd, flush=True)
dedup = None
led_state = False
while True:
line = gpsfd.readline().strip()
try:
Expand All @@ -101,33 +124,40 @@ def gps_worker(args: Namespace) -> None:
if x["time"] == dedup:
continue
tpv = {f: x.get(f, None) for f in ["time", "mode", "lat", "lon", "alt", "speed", "track"]}
if tpv["mode"] == 3:
led_state = True
elif tpv["mode"] == 2:
led_state = not led_state
else:
led_state = False

tpv["led_state"] = led_state

if tpv["mode"] != last_state:
if tpv["mode"] == 3:
print("1 100 1 100", flush=True, file=ofd)
tpv["led"] = "on"
elif tpv["mode"] == 2:
print("0 250 1 250", flush=True, file=ofd)
tpv["led"] = "blink"
else:
print("0 495 1 5", flush=True, file=ofd)
tpv["led"] = "blip"

if args.verbose:
print(tpv)
print("1" if led_state else "0", flush=True, file=ofd)
except (KeyError, JSONDecodeError): # skip bad messages, no fix, etc.
print("0", flush=True, file=ofd)
print("0 100", flush=True, file=ofd)
break
# End of read loop
except KeyboardInterrupt:
print("0", flush=True, file=ofd)
return
except (socket.error, TimeoutError) as e:
# any other exception will cause this to error out and exit
if args.verbose:
print(f"Reattempting connection after exception: {e}")
sleep(3)


def main() -> None:
args = get_args()
gps_worker(args)
try:
configure_led(args)
gps_worker(args)
except Exception:
pass
finally:
configure_led(args, False)


if __name__ == "__main__":
Expand Down

0 comments on commit 621bb95

Please sign in to comment.