Skip to content

Commit

Permalink
rework rcmultispg
Browse files Browse the repository at this point in the history
The DAQ Threads are now device readers which package up their measurements
and drop them into a Queue.  A separate writer Thread reads this Queue and
is responsible for disk I/O. The main function supervises these Threads and
coordinates startup/shutdown.

This new structure makes it easy to save raw data to dedicated files in real
time if so desired, and to catch SIGUSR1 to trigger an immediate dump of the
spectrogram.
  • Loading branch information
ckuethe committed Feb 25, 2024
1 parent 9a8d206 commit f36182b
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 80 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ __pycache__
.*.sw?
.vscode
*.json
*.ndjson
*.rcspg
junk
dist
.coverage
Expand Down
198 changes: 118 additions & 80 deletions src/rcmultispg.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,33 @@
from argparse import ArgumentParser, Namespace
from radiacode import RadiaCode, Spectrum
from radiacode.transports.usb import DeviceNotFound
from threading import Barrier, BrokenBarrierError, Thread, Lock
from threading import Barrier, BrokenBarrierError, Thread, Lock, active_count
from queue import Queue
from tempfile import mkstemp
from rcutils import UnixTime2FileTime, find_radiacode_devices
from time import sleep, time, strftime, gmtime
from binascii import hexlify
from collections import namedtuple
from struct import pack as struct_pack
from re import sub as re_sub
from typing import List, Union
from json import dump as jdump
from signal import signal, SIGUSR1
from json import dumps as jdumps
from signal import signal, SIGUSR1, SIGINT
import sys
import os

Number = Union[int, float]
SpecData = namedtuple("SpecData", ["time", "spectrum"])

RUNNING: bool = True # Signal when it's time for the threads to exit
USR1FLAG: bool = False # Signal when SIGUSR1 is caught, to trigger a snapshot
SpecData = namedtuple("SpecData", ["time", "serial_number", "spectrum"])
MIN_POLL_INTERVAL: float = 0.5
STDIO_LOCK: Lock = Lock() # Prevent stdio corruption
THREAD_BARRIER: Barrier = Barrier(0) # intialized to 0 to shut up static type checking

# Queues and stuff
DATA_QUEUE: Queue = Queue() # Global so I can use them in signal handlers
CTRL_QUEUE: Queue = Queue()
SHUTDOWN_OBJECT = object()
SNAPSHOT_OBJECT = object()


def tbar(wait_time=None) -> None:
"""
Expand All @@ -42,9 +49,11 @@ def tbar(wait_time=None) -> None:


def handle_sigusr1(_signum=None, _stackframe=None):
"""Set a flag when SIGUSR1 is received, to trigger a state dump"""
global USR1FLAG
USR1FLAG = True
DATA_QUEUE.put(SNAPSHOT_OBJECT)


def handle_sigint(_signum=None, _stackframe=None):
CTRL_QUEUE.put(SHUTDOWN_OBJECT)


def get_args() -> Namespace:
Expand Down Expand Up @@ -103,18 +112,17 @@ def gte_zero(s):
help="reset the currently displayed spectrum",
)
ap.add_argument(
"--save-raw-data",
dest="pickle",
"-l",
"--raw-log",
default=False,
action="store_true",
help="Dump raw measurements to a json file",
help="record raw measurements to raw_{serial}_{timestamp}.ndjson",
)

rv = ap.parse_args()
min_poll = 0.5
if rv.interval < min_poll:
print(f"increasing poll interval to {min_poll}s")
rv.interval = min_poll
if rv.interval < MIN_POLL_INTERVAL:
print(f"increasing poll interval to {MIN_POLL_INTERVAL}s")
rv.interval = MIN_POLL_INTERVAL

# post-processing stages.
rv.devs = list(set(rv.devs))
Expand Down Expand Up @@ -191,60 +199,62 @@ def format_spectra(data: List[SpecData]) -> str:
return "\n".join(lines)


def make_rec(data: SpecData):
"Turn a SpecData into an easily jsonified dict"
rec = {
"timestamp": data.time,
"serial_number": data.serial_number,
"duration": data.spectrum.duration.total_seconds(),
"calibration": [data.spectrum.a0, data.spectrum.a1, data.spectrum.a2],
"counts": data.spectrum.counts,
}
return rec


def save_data(
data: List[SpecData],
serial_number: str,
prefix: str = "rcmulti_",
use_pickle: bool = False,
):
duration = data[-1].time - data[2].time
start_time = data[0].time
time_string = strftime("%Y%m%d%H%M%S", gmtime(start_time))
fn = f"{prefix}{serial_number}_{time_string}"
fn = f"{prefix}{serial_number}_{time_string}.rcspg"

header = make_spectrogram_header(
duration=duration,
serial_number=serial_number,
start_time=start_time,
)
with STDIO_LOCK:
print(f"saving spectrogram in {fn}")

def make_rec(data: SpecData):
rec = {
"capture_timestamp": data.time,
"duration": data.spectrum.duration.total_seconds(),
"calibration": [data.spectrum.a0, data.spectrum.a1, data.spectrum.a2],
"counts": data.spectrum.counts,
}
return rec

if use_pickle:
with open(f"{fn}.json", "w") as ofd:
jdump([make_rec(d) for d in data], ofd, indent=2)

with open(f"{fn}.rcspg", "w") as ofd:
tfd, tfn = mkstemp(dir=".")
os.close(tfd)
with open(tfn, "w") as ofd:
print(header, file=ofd)
print(make_spectrum_line(data[-1].spectrum), file=ofd)
print(format_spectra(data), file=ofd)
os.rename(tfn, fn)


def rc_worker(args: Namespace, serial_number: str, start_time: Number) -> None:
def rc_worker(args: Namespace, serial_number: str) -> None:
"""
Thread responsible for data acquisition. Connects to devices, polls spectra,
accumulates data, and generates the output file
"""

global RUNNING, USR1FLAG
try:
rc = RadiaCode(serial_number=serial_number)
except (usb.core.USBTimeoutError, usb.core.USBError, DeviceNotFound) as e:
with STDIO_LOCK:
print(f"{serial_number} failed to connect - cable error, device disconnect? {e}")
RUNNING = False
print(f"{serial_number} failed to connect - cable error, device disconnect, bt connected? {e}")
CTRL_QUEUE.put(SHUTDOWN_OBJECT) # if we can't start all threads, shut everything down
return

with STDIO_LOCK:
print(f"{serial_number} Connected ")
data: List[SpecData] = [SpecData(time(), rc.spectrum_accum())]
DATA_QUEUE.put(SpecData(time(), serial_number, rc.spectrum_accum()))

# wait for all threads to connect to their devices
if THREAD_BARRIER.n_waiting >= 1:
Expand All @@ -265,46 +275,76 @@ def rc_worker(args: Namespace, serial_number: str, start_time: Number) -> None:
print(f"{serial_number} sampling")

i = 0
while RUNNING: # IO loop
while CTRL_QUEUE.qsize() == 0: # don't even need to read the item
try:
data.append(SpecData(time(), rc.spectrum()))
DATA_QUEUE.put(SpecData(time(), serial_number, rc.spectrum()))
with STDIO_LOCK:
print(f"\rn:{i}", end="", flush=True)
i += 1
sleep(args.interval)
tbar()
except (usb.core.USBError, BrokenBarrierError):
# once running though, don't take down all the other threads
THREAD_BARRIER.abort()
RUNNING = False

if USR1FLAG: # handle request to snapshot state
save_data(
data=data,
prefix=args.prefix,
serial_number=serial_number,
use_pickle=args.pickle,
)
with STDIO_LOCK:
print(f"{serial_number} snapshot")
tbar()
USR1FLAG = False

# loop complete, print summary, save data
with STDIO_LOCK:
nd = len(data) - 2
print(f"{serial_number} data collection stop - {nd} records, {nd*args.interval:.1f}s")
print(f"{serial_number} data collection stop - {i} records, {i*args.interval:.1f}s")

save_data(
data=data,
prefix=args.prefix,
serial_number=serial_number,
use_pickle=args.pickle,
)

def log_worker(args: Namespace) -> None:
"Handle realtime logging of measurements if you can't wait for the spectrogram file"
with STDIO_LOCK:
print(f"starting log_worker for: {' '.join(args.devs)}")

def main() -> None:
global RUNNING
log_fds = {d: None for d in args.devs} if args.raw_log else {}
measurements = {d: [] for d in args.devs}
start_time = time()
time_string = strftime("%Y%m%d%H%M%S", gmtime(start_time))

for sn in args.devs:
tmpfd, tmpfn = mkstemp(dir=".")
os.close(tmpfd)
fn = f"raw_{sn}_{time_string}.ndjson"
os.rename(tmpfn, fn)
log_fds[sn] = open(fn, "w")

running = True
snapshot = False
while running:
while DATA_QUEUE.qsize():
msg = DATA_QUEUE.get()
if msg is SHUTDOWN_OBJECT:
running = False # bail out once this batch of messages is done
continue

elif msg is SNAPSHOT_OBJECT:
snapshot = True # save the current spectrogram immediately
continue

elif isinstance(msg, SpecData):
measurements[msg.serial_number].append(msg)
if args.raw_log:
fd = log_fds[msg.serial_number]
print(jdumps(make_rec(msg)), file=fd, flush=True)

else:
pass # who put this junk here?
if snapshot:
with STDIO_LOCK:
print()
for sn in measurements:
save_data(data=measurements[sn], serial_number=sn)
snapshot = False
sleep(MIN_POLL_INTERVAL / 2)

for sn in args.devs:
save_data(data=measurements[sn], serial_number=sn)
if sn in log_fds:
log_fds[sn].close()
return


def main() -> None:
args = get_args()
try:
dev_names = find_radiacode_devices()
Expand All @@ -329,46 +369,44 @@ def main() -> None:

THREAD_BARRIER._parties = len(args.devs) # ick, but works

start_time = time()
# create the threads and store in a list so they can be checked later
threads: List[Thread] = [
Thread(
target=rc_worker,
name=serial_number,
args=(args, serial_number, start_time),
args=(args, serial_number),
)
for serial_number in args.devs
]

threads.append(Thread(target=log_worker, args=(args,)))

signal(SIGUSR1, handle_sigusr1)
signal(SIGINT, handle_sigint)
print(f"`kill -USR1 {os.getpid()}` to snapshot the spectrogram")

# Start the readers
[t.start() for t in threads]

sleep(MIN_POLL_INTERVAL)

# Main process/thread slowly spins waiting for ^C. If interrupt is received, or one of the
# workers exits, set a shutdown flag which will cause all other threads to gracefully shut
try:
while True:
sleep(1)
if not all([t.is_alive() for t in threads]):
raise ChildProcessError
except ChildProcessError:
print("Some threads exited early")
while active_count() - 1 == len(threads):
sleep(0.25)
except KeyboardInterrupt:
# print("Stopping threads")
pass
finally:
RUNNING = False
CTRL_QUEUE.put(SHUTDOWN_OBJECT)
DATA_QUEUE.put(SHUTDOWN_OBJECT)
with STDIO_LOCK:
print()
print("Stopping threads")

# Clean up
while True:
while active_count() > 1: # main process counts as a thread
[t.join(0.1) for t in threads]
if any([t.is_alive() for t in threads]):
sleep(0.1)
else:
break
sleep(0.1)


if __name__ == "__main__":
Expand Down

0 comments on commit f36182b

Please sign in to comment.