From dee9993f249f14d93157cf28815e12ba8afe97d9 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Fri, 14 Jun 2024 19:59:11 -0700 Subject: [PATCH] Use my nice rcfile abstraction, only put "business logic" in track_sanitize --- pyproject.toml | 2 +- src/rcfiles.py | 2 +- src/rctypes.py | 22 +++++ src/track_sanitize.py | 185 ++++++++++++++---------------------------- 4 files changed, 86 insertions(+), 125 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 450c309..804db8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ rccalibrate = "calibrate:main" rcdeadtime = "deadtime:main" rcmultispg = "rcmultispg:main" -rcsanitize = "sanitize_track:main" +rcsanitize = "track_sanititze:main" n42www = "n42www:main" n42convert = "n42convert:main" n42validate = "n42validate:main" diff --git a/src/rcfiles.py b/src/rcfiles.py index afe73bf..ed757eb 100644 --- a/src/rcfiles.py +++ b/src/rcfiles.py @@ -80,7 +80,7 @@ def _format_trackpoint(self, tp: TrackPoint): fz = [None] + list(tp._asdict().values()) fz[0] = DateTime2FileTime(fz[1]) fz[1] = _format_datetime(fz[1]) - return [str(x) for x in fz] + return "\t".join([str(x) for x in fz]) def write_file(self, filename: str) -> None: with open(filename, "wt") as ofd: diff --git a/src/rctypes.py b/src/rctypes.py index b44f331..fa75110 100644 --- a/src/rctypes.py +++ b/src/rctypes.py @@ -73,3 +73,25 @@ # storing datetime as the canonical timestamp. It'll be transcoded for output _trackpoint_fields = ["datetime", "latitude", "longitude", "accuracy", "doserate", "countrate", "comment"] TrackPoint = namedtuple("TrackPoint", _trackpoint_fields, defaults=[None] * len(_trackpoint_fields)) + + +class RangeFinder: + "A helper class to determine the range of a list" + + def __init__(self, name: str = "RangeFinder"): + self.name = name + self.min_val = None + self.max_val = None + + def update(self, x): + self.min_val = x if self.min_val is None else min(x, self.min_val) + self.max_val = x if self.max_val is None else max(x, self.max_val) + + def get(self): + return (self.min_val, self.max_val) + + def __str__(self): + return f"{self.name}(min={self.min_val}, max={self.max_val})" + + def __repr__(self): + print(self.__str__()) diff --git a/src/track_sanitize.py b/src/track_sanitize.py index 3148a5c..1c4d8f4 100755 --- a/src/track_sanitize.py +++ b/src/track_sanitize.py @@ -1,39 +1,12 @@ #!/usr/bin/env python3 -import datetime import os import re from argparse import ArgumentParser, Namespace -from collections import namedtuple +from datetime import datetime from hashlib import sha256 as hf -from tempfile import mkstemp -from typing import List -from rctypes import TrackPoint -from rcutils import DateTime2FileTime - -# file deepcode ignore PT: CLI utility, intended to walk files... - - -class RangeFinder: - "A helper class to determine the range of a list" - - def __init__(self, name: str = "RangeFinder"): - self.name = name - self.min_val = None - self.max_val = None - - def update(self, x): - self.min_val = x if self.min_val is None else min(x, self.min_val) - self.max_val = x if self.max_val is None else max(x, self.max_val) - - def get(self): - return (self.min_val, self.max_val) - - def __str__(self): - return f"{self.name}(min={self.min_val}, max={self.max_val})" - - def __repr__(self): - print(self.__str__()) +from rcfiles import RcTrack +from rctypes import RangeFinder def get_args() -> Namespace: @@ -43,7 +16,7 @@ def _timestamp(s: str) -> str: "helper to enforce time string format" m = re.match("^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})Z?$", s.strip()) if m: - return datetime.datetime(*[int(x) for x in m.groups()]) + return datetime(*[int(x) for x in m.groups()]) else: raise ValueError("Invalid time format") @@ -56,14 +29,14 @@ def _rcsn(s: str) -> str: ap = ArgumentParser( description="Sanitize a Radiacode track by rebasing it (to the notional setting of Hunt for Red October)" ) - ap.add_argument( + ap.add_argument( # -p prefix "-p", "--prefix", type=str, default="sanitized_", help="[%(default)s]", ) - ap.add_argument( + ap.add_argument( # -s serial-number "-s", "--serial-number", default="RC-100-314159", @@ -71,7 +44,7 @@ def _rcsn(s: str) -> str: type=_rcsn, help="[%(default)s]", ) - ap.add_argument( + ap.add_argument( # -t start-time "-t", "--start-time", default="1984-12-05T00:00:00", @@ -79,7 +52,7 @@ def _rcsn(s: str) -> str: type=_timestamp, help="[%(default)s]", ) - ap.add_argument( + ap.add_argument( # -x base-longitude "-x", "--base-longitude", default=-55.9269664, @@ -87,7 +60,7 @@ def _rcsn(s: str) -> str: type=float, help="[%(default)f]", ) - ap.add_argument( + ap.add_argument( # -y base-latitude "-y", "--base-latitude", default=43.5833323, @@ -95,134 +68,100 @@ def _rcsn(s: str) -> str: metavar="LAT", help="[%(default)f]", ) - ap.add_argument( + ap.add_argument( # -c comment "-c", "--comment", - default="And I ... was never here.", + default='"And I ... was never here."', metavar="STR", type=str, help="[%(default)s]", ) - ap.add_argument( + ap.add_argument( # -C allow-unsanitized-comment "-C", "--allow-unsanitized-comment", default=False, action="store_true", help="Preserve original comment [%(default)s]", ) - ap.add_argument( - "-I", - "--force-input", - default=False, - action="store_true", - help="Allow processing of files that begin with the chosen prefix [%(default)s]", - ) - ap.add_argument( + ap.add_argument( # -N allow-unsanitized-name "-N", "--allow-unsanitized-name", default=False, action="store_true", help="Preserve original track name [%(default)s]", ) - ap.add_argument( - "-O", + ap.add_argument( # -S allow-unsanitized-serial + "-S", + "--allow-unsanitized-serial", + default=False, + action="store_true", + help="Preserve original serial number [%(default)s]", + ) + ap.add_argument( # -f force-overwrite + "-f", "--force-overwrite", default=False, action="store_true", help="Overwrite existing files [%(default)s]", ) - ap.add_argument( - "-d", - "--dry-run", + ap.add_argument( # -o stdout + "-o", + "--stdout", default=False, action="store_true", - help="Do not emit any output files [%(default)s]", + help="print sanitized track to stdout rather than a file [%(default)s]", ) ap.add_argument(nargs="+", dest="files", metavar="FILE") return ap.parse_args() -def parse_row(args: Namespace, s: str) -> TrackPoint: - "Parse a single point into more specific points" - tmp = s.strip("\n").split("\t") - tmp[0] = int(tmp[0]) - tmp[1] = datetime.datetime.strptime(tmp[1], "%Y-%m-%d %H:%M:%S") - for i in range(2, 7): # latitude .. countrate - tmp[i] = float(tmp[i]) - return TrackPoint(*tmp) - - -def sanitize(args: Namespace, lines: List[str]) -> None: +def sanitize(args: Namespace, track: RcTrack) -> None: """ - Given a list of input lines, return a list of their sanitized versions. + Iterate over the points in an RcTrack, and mask the selected data """ - header = None - columns: List[str] = None - start_timestamp = None lat_range = RangeFinder("Lat") lon_range = RangeFinder("Lon") - header_hash = hf(b"", usedforsecurity=False) - for i, line in enumerate(lines): - header_hash.update(line.encode()) - if line.startswith("Track: "): - header = line.strip().split("\t") - header[1] = args.serial_number - if args.allow_unsanitized_comment is False: - header[2] = args.comment - elif line.startswith("Timestamp\t"): - columns = line.strip("\n") - else: - lines[i] = parse_row(args, line) - lat_range.update(lines[i].latitude) - lon_range.update(lines[i].longitude) + + header_text = f"{track.name} {track.serialnumber} {track.comment} {track.points[0].datetime}" + header_hash = hf(header_text.encode(), usedforsecurity=False) + + for point in track.points: + header_hash.update(str(point).encode()) + lat_range.update(point.latitude) + lon_range.update(point.longitude) + + if args.allow_unsanitized_comment is False: + track.comment = args.comment + + if args.allow_unsanitized_serial is False: + track.serialnumber = args.serial_number if args.allow_unsanitized_name is False: - header[0] = f"Track: {args.prefix}{header_hash.hexdigest()[:32]}" - rv: List[str] = [] - rv.append("\t".join(header)) - rv.append(columns) - start_timestamp = lines[2].time - for r in lines[2:]: - row_dict = r._asdict() - t = args.start_time + (row_dict["time"] - start_timestamp) - row_dict["time"] = t.strftime("%Y-%m-%d %H:%M:%S") - row_dict["timestamp"] = DateTime2FileTime(t) - row_dict["latitude"] = f'{row_dict["latitude"] - lat_range.min_val + args.base_latitude:0.7f}' - row_dict["longitude"] = f'{row_dict["longitude"] - lon_range.min_val + args.base_longitude:0.7f}' - r = TrackPoint(*row_dict.values()) - rs = "\t".join([str(v) for v in r._asdict().values()]) - rv.append(rs) - return rv - - -def save_file(args: Namespace, cur_fn: str, lines: List[str]) -> None: - file_name = os.path.basename(cur_fn) - dir_name = os.path.dirname(cur_fn) - dest_filename = os.path.join(dir_name, f"{args.prefix}{file_name}") - if args.dry_run: - print(f"Not writing to {dest_filename}") - print("\n".join(lines)) - return - if os.path.exists(dest_filename) and args.force_overwrite is False: - raise FileExistsError - - tmp_fd, tmp_fn = mkstemp(dir=dir_name) - os.close(tmp_fd) - with open(tmp_fn, "at") as ofd: - ofd.write("\n".join(lines)) - os.rename(tmp_fn, dest_filename) + track.name = f"{args.prefix}{header_hash.hexdigest()[:32]}" + + start_timestamp = track.points[0].datetime + for i, tp in enumerate(track.points): + track.points[i] = tp._replace( + datetime=tp.datetime - start_timestamp + args.start_time, + latitude=round(tp.latitude - lat_range.min_val + args.base_latitude, 7), + longitude=round(tp.longitude - lon_range.min_val + args.base_latitude, 7), + ) def main() -> None: args = get_args() + # file deepcode ignore PT: Shut up snyk, this is a CLI utility intended to walk files... for filename in args.files: - if os.path.basename(filename).startswith(args.prefix) and args.force_input is False: - continue - - with open(filename) as ifd: - lines = ifd.readlines() - lines = sanitize(args, lines) - save_file(args, filename, lines) + track = RcTrack() + track.load_file(filename) + sanitize(args, track) + if args.stdout: + track.write_file("/dev/stdout") + else: + if os.path.exists(filename) and args.force_overwrite is False: + print(f"Output file {filename} already exists") + continue + track.write_file(filename) if __name__ == "__main__":