diff --git a/.gitignore b/.gitignore index ea69657..051fdc8 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__ .*.sw? .vscode *.json +*.ndjson +*.rcspg junk dist .coverage diff --git a/src/rcmultispg.py b/src/rcmultispg.py index 1ecb23d..0e60602 100755 --- a/src/rcmultispg.py +++ b/src/rcmultispg.py @@ -9,7 +9,9 @@ from argparse import ArgumentParser, Namespace from radiacode import RadiaCode, Spectrum from radiacode.transports.usb import DeviceNotFound -from threading import Barrier, BrokenBarrierError, Thread, Lock +from threading import Barrier, BrokenBarrierError, Thread, Lock, active_count +from queue import Queue +from tempfile import mkstemp from rcutils import UnixTime2FileTime, find_radiacode_devices from time import sleep, time, strftime, gmtime from binascii import hexlify @@ -17,18 +19,23 @@ from struct import pack as struct_pack from re import sub as re_sub from typing import List, Union -from json import dump as jdump -from signal import signal, SIGUSR1 +from json import dumps as jdumps +from signal import signal, SIGUSR1, SIGINT import sys +import os Number = Union[int, float] -SpecData = namedtuple("SpecData", ["time", "spectrum"]) - -RUNNING: bool = True # Signal when it's time for the threads to exit -USR1FLAG: bool = False # Signal when SIGUSR1 is caught, to trigger a snapshot +SpecData = namedtuple("SpecData", ["time", "serial_number", "spectrum"]) +MIN_POLL_INTERVAL: float = 0.5 STDIO_LOCK: Lock = Lock() # Prevent stdio corruption THREAD_BARRIER: Barrier = Barrier(0) # intialized to 0 to shut up static type checking +# Queues and stuff +DATA_QUEUE: Queue = Queue() # Global so I can use them in signal handlers +CTRL_QUEUE: Queue = Queue() +SHUTDOWN_OBJECT = object() +SNAPSHOT_OBJECT = object() + def tbar(wait_time=None) -> None: """ @@ -42,9 +49,11 @@ def tbar(wait_time=None) -> None: def handle_sigusr1(_signum=None, _stackframe=None): - """Set a flag when SIGUSR1 is received, to trigger a state dump""" - global USR1FLAG - USR1FLAG = True + DATA_QUEUE.put(SNAPSHOT_OBJECT) + + +def handle_sigint(_signum=None, _stackframe=None): + CTRL_QUEUE.put(SHUTDOWN_OBJECT) def get_args() -> Namespace: @@ -103,18 +112,17 @@ def gte_zero(s): help="reset the currently displayed spectrum", ) ap.add_argument( - "--save-raw-data", - dest="pickle", + "-l", + "--raw-log", default=False, action="store_true", - help="Dump raw measurements to a json file", + help="record raw measurements to raw_{serial}_{timestamp}.ndjson", ) rv = ap.parse_args() - min_poll = 0.5 - if rv.interval < min_poll: - print(f"increasing poll interval to {min_poll}s") - rv.interval = min_poll + if rv.interval < MIN_POLL_INTERVAL: + print(f"increasing poll interval to {MIN_POLL_INTERVAL}s") + rv.interval = MIN_POLL_INTERVAL # post-processing stages. rv.devs = list(set(rv.devs)) @@ -191,60 +199,62 @@ def format_spectra(data: List[SpecData]) -> str: return "\n".join(lines) +def make_rec(data: SpecData): + "Turn a SpecData into an easily jsonified dict" + rec = { + "timestamp": data.time, + "serial_number": data.serial_number, + "duration": data.spectrum.duration.total_seconds(), + "calibration": [data.spectrum.a0, data.spectrum.a1, data.spectrum.a2], + "counts": data.spectrum.counts, + } + return rec + + def save_data( data: List[SpecData], serial_number: str, prefix: str = "rcmulti_", - use_pickle: bool = False, ): duration = data[-1].time - data[2].time start_time = data[0].time time_string = strftime("%Y%m%d%H%M%S", gmtime(start_time)) - fn = f"{prefix}{serial_number}_{time_string}" + fn = f"{prefix}{serial_number}_{time_string}.rcspg" header = make_spectrogram_header( duration=duration, serial_number=serial_number, start_time=start_time, ) + with STDIO_LOCK: + print(f"saving spectrogram in {fn}") - def make_rec(data: SpecData): - rec = { - "capture_timestamp": data.time, - "duration": data.spectrum.duration.total_seconds(), - "calibration": [data.spectrum.a0, data.spectrum.a1, data.spectrum.a2], - "counts": data.spectrum.counts, - } - return rec - - if use_pickle: - with open(f"{fn}.json", "w") as ofd: - jdump([make_rec(d) for d in data], ofd, indent=2) - - with open(f"{fn}.rcspg", "w") as ofd: + tfd, tfn = mkstemp(dir=".") + os.close(tfd) + with open(tfn, "w") as ofd: print(header, file=ofd) print(make_spectrum_line(data[-1].spectrum), file=ofd) print(format_spectra(data), file=ofd) + os.rename(tfn, fn) -def rc_worker(args: Namespace, serial_number: str, start_time: Number) -> None: +def rc_worker(args: Namespace, serial_number: str) -> None: """ Thread responsible for data acquisition. Connects to devices, polls spectra, accumulates data, and generates the output file """ - global RUNNING, USR1FLAG try: rc = RadiaCode(serial_number=serial_number) except (usb.core.USBTimeoutError, usb.core.USBError, DeviceNotFound) as e: with STDIO_LOCK: - print(f"{serial_number} failed to connect - cable error, device disconnect? {e}") - RUNNING = False + print(f"{serial_number} failed to connect - cable error, device disconnect, bt connected? {e}") + CTRL_QUEUE.put(SHUTDOWN_OBJECT) # if we can't start all threads, shut everything down return with STDIO_LOCK: print(f"{serial_number} Connected ") - data: List[SpecData] = [SpecData(time(), rc.spectrum_accum())] + DATA_QUEUE.put(SpecData(time(), serial_number, rc.spectrum_accum())) # wait for all threads to connect to their devices if THREAD_BARRIER.n_waiting >= 1: @@ -265,46 +275,76 @@ def rc_worker(args: Namespace, serial_number: str, start_time: Number) -> None: print(f"{serial_number} sampling") i = 0 - while RUNNING: # IO loop + while CTRL_QUEUE.qsize() == 0: # don't even need to read the item try: - data.append(SpecData(time(), rc.spectrum())) + DATA_QUEUE.put(SpecData(time(), serial_number, rc.spectrum())) with STDIO_LOCK: print(f"\rn:{i}", end="", flush=True) i += 1 sleep(args.interval) tbar() except (usb.core.USBError, BrokenBarrierError): + # once running though, don't take down all the other threads THREAD_BARRIER.abort() - RUNNING = False - - if USR1FLAG: # handle request to snapshot state - save_data( - data=data, - prefix=args.prefix, - serial_number=serial_number, - use_pickle=args.pickle, - ) - with STDIO_LOCK: - print(f"{serial_number} snapshot") - tbar() - USR1FLAG = False - # loop complete, print summary, save data with STDIO_LOCK: - nd = len(data) - 2 - print(f"{serial_number} data collection stop - {nd} records, {nd*args.interval:.1f}s") + print(f"{serial_number} data collection stop - {i} records, {i*args.interval:.1f}s") - save_data( - data=data, - prefix=args.prefix, - serial_number=serial_number, - use_pickle=args.pickle, - ) +def log_worker(args: Namespace) -> None: + "Handle realtime logging of measurements if you can't wait for the spectrogram file" + with STDIO_LOCK: + print(f"starting log_worker for: {' '.join(args.devs)}") -def main() -> None: - global RUNNING + log_fds = {d: None for d in args.devs} if args.raw_log else {} + measurements = {d: [] for d in args.devs} + start_time = time() + time_string = strftime("%Y%m%d%H%M%S", gmtime(start_time)) + + for sn in args.devs: + tmpfd, tmpfn = mkstemp(dir=".") + os.close(tmpfd) + fn = f"raw_{sn}_{time_string}.ndjson" + os.rename(tmpfn, fn) + log_fds[sn] = open(fn, "w") + + running = True + snapshot = False + while running: + while DATA_QUEUE.qsize(): + msg = DATA_QUEUE.get() + if msg is SHUTDOWN_OBJECT: + running = False # bail out once this batch of messages is done + continue + + elif msg is SNAPSHOT_OBJECT: + snapshot = True # save the current spectrogram immediately + continue + + elif isinstance(msg, SpecData): + measurements[msg.serial_number].append(msg) + if args.raw_log: + fd = log_fds[msg.serial_number] + print(jdumps(make_rec(msg)), file=fd, flush=True) + + else: + pass # who put this junk here? + if snapshot: + with STDIO_LOCK: + print() + for sn in measurements: + save_data(data=measurements[sn], serial_number=sn) + snapshot = False + sleep(MIN_POLL_INTERVAL / 2) + + for sn in args.devs: + save_data(data=measurements[sn], serial_number=sn) + if sn in log_fds: + log_fds[sn].close() + return + +def main() -> None: args = get_args() try: dev_names = find_radiacode_devices() @@ -329,46 +369,44 @@ def main() -> None: THREAD_BARRIER._parties = len(args.devs) # ick, but works - start_time = time() # create the threads and store in a list so they can be checked later threads: List[Thread] = [ Thread( target=rc_worker, name=serial_number, - args=(args, serial_number, start_time), + args=(args, serial_number), ) for serial_number in args.devs ] + threads.append(Thread(target=log_worker, args=(args,))) + signal(SIGUSR1, handle_sigusr1) + signal(SIGINT, handle_sigint) + print(f"`kill -USR1 {os.getpid()}` to snapshot the spectrogram") # Start the readers [t.start() for t in threads] + sleep(MIN_POLL_INTERVAL) + # Main process/thread slowly spins waiting for ^C. If interrupt is received, or one of the # workers exits, set a shutdown flag which will cause all other threads to gracefully shut try: - while True: - sleep(1) - if not all([t.is_alive() for t in threads]): - raise ChildProcessError - except ChildProcessError: - print("Some threads exited early") + while active_count() - 1 == len(threads): + sleep(0.25) except KeyboardInterrupt: - # print("Stopping threads") pass finally: - RUNNING = False + CTRL_QUEUE.put(SHUTDOWN_OBJECT) + DATA_QUEUE.put(SHUTDOWN_OBJECT) with STDIO_LOCK: - print() + print("Stopping threads") # Clean up - while True: + while active_count() > 1: # main process counts as a thread [t.join(0.1) for t in threads] - if any([t.is_alive() for t in threads]): - sleep(0.1) - else: - break + sleep(0.1) if __name__ == "__main__":