diff --git a/src/rcutils.py b/src/rcutils.py index 696a39d..df7356b 100644 --- a/src/rcutils.py +++ b/src/rcutils.py @@ -130,106 +130,6 @@ def probe_radiacode_devices() -> None: ) -def rcspg_make_header( - duration: int, - serial_number: str, - start_time: Number, - name: str = "", - comment: str = "", - flags: int = 1, - channels: int = 1024, -) -> str: - """ - Create the first (header) line of the spectrum file - - Not all flag values are known, but bit 0 being unset means that the - spectrogram recording was interrupted and resumed. - """ - - start_time = float(start_time) # accept reasonable inputs: 1700000000, 1.7e9, "1.7e9", ... - file_time = UnixTime2FileTime(start_time) - gt = gmtime(start_time) - tstr = strftime("%Y-%m-%d %H:%M:%S", gt) # This one is for the header - - if not name: - # and this version of time just looks like an int... for deduplication - name = f"rcmulti-{strftime('%Y%m%d%H%M%S', gt)}-{serial_number}" - - fields = [ - f"Spectrogram: {name.strip()}", - f"Time: {tstr}", - f"Timestamp: {file_time}", - f"Accumulation time: {int(duration)}", - f"Channels: {channels}", - f"Device serial: {serial_number.strip()}", - f"Flags: {flags}", - f"Comment: {comment}", - ] - return "\t".join(fields) - - -def rcspg_make_spectrum_line(x: Spectrum) -> str: - """ - The second line of the spectrogram is the spectrum of the accumulated exposure - since last data reset. - (duration:int, coeffs:float[3], counts:int[1024]) - """ - v = struct_pack(" str: - """ - Given the list of SpecData, convert them to whatever we need for the spectrogram - - data[0] = all-time accumulated spectrum - survives "reset accumulation" - data[1] = current accumulated spectrum at the start of measurement. needed to compute - data[2:] = the rest of the spectra - """ - lines = [] - prev_rec = data[1] - for rec in data[2:]: - ts = rec.time - line = [UnixTime2FileTime(ts), int(ts - prev_rec.time)] - line.extend([int(x[0] - x[1]) for x in zip(rec.spectrum.counts, prev_rec.spectrum.counts)]) - prev_rec = rec - line = "\t".join([str(x) for x in line]) - line = re_sub(r"(\s0)+$", "", line) - lines.append(line) - - return "\n".join(lines) - - -def rcspg_save_spectrogram_file( - data: List[SpecData], - serial_number: str, - prefix: str = "rcmulti_", -) -> None: - """ - Emit a spectrogram file into the current directory. - """ - duration = data[-1].time - data[2].time - start_time = data[0].time - time_string = strftime("%Y%m%d%H%M%S", gmtime(start_time)) - fn = f"{prefix}{serial_number}_{time_string}.rcspg" - - header = rcspg_make_header( - duration=duration, - serial_number=serial_number, - start_time=start_time, - ) - print(f"saving spectrogram in {fn}") - - tfd, tfn = mkstemp(dir=".") - os.close(tfd) - with open(tfn, "wt") as ofd: - print(header, file=ofd) - print(rcspg_make_spectrum_line(data[-1].spectrum), file=ofd) - print(rcspg_format_spectra(data), file=ofd) - os.rename(tfn, fn) - - def specdata_to_dict(data: SpecData) -> Dict[str, Any]: "Turn a SpecData into an easily jsonified dict" rec = {