Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
move some things into a utility file, in particular time convertion
and dose computation. Of course, that means a huge amount of churn
and deduplication, but yay tests.
  • Loading branch information
ckuethe committed Dec 22, 2023
1 parent 4e8f98e commit 20212c9
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 99 deletions.
30 changes: 1 addition & 29 deletions src/n42convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import xmltodict
from argparse import ArgumentParser, Namespace
from radiacode.types import Spectrum
from rcutils import stringify
import os
from io import TextIOWrapper
from typing import List, Dict, Any
Expand Down Expand Up @@ -84,30 +85,6 @@ def get_rate_from_spectrum(filename: str) -> float:
return sum(s["foreground"]["spectrum"]) / s["foreground"]["duration"]


def get_dose_from_spectrum(s: Spectrum) -> float:
"Given a Spectrum, return an estimate of the total absorbed dose in uSv"
# According to the Health Physics Society:
# Radiation absorbed dose and effective dose in the international system
# of units (SI system) for radiation measurement uses "gray" (Gy) and
# "sievert" (Sv), respectively.
# [...]
# For practical purposes with gamma and x rays, these units of measure
# for exposure or dose are considered equal.
# via https://hps.org/publicinformation/ate/faqs/radiationdoses.html

kev2j = 1.60218e-16
mass = 4.51e-3 # kg, CsI:Tl density is 4.51g/cm^3, crystal is 1cm^3
a0, a1, a2 = s.a0, s.a1, s.a2

def _chan2kev(c):
return a0 + a1 * c + a2 * c**2

keVz = sum([_chan2kev(ch) * n for ch, n in enumerate(s.counts)])
gray = keVz * kev2j / mass
uSv = gray * 1e6
return uSv


def squareformat(a: List[Any], columns: int = 16):
"reformat the a long list of numbers (any elements, actually) as some nice lines"
if columns < 1:
Expand All @@ -119,11 +96,6 @@ def squareformat(a: List[Any], columns: int = 16):
return "\n".join(rv)


def stringify(a: List[Any], c: str = " ") -> str:
"Make a string out of a list of things, nicer than str(list(...))"
return c.join([f"{x}" for x in a])


def format_calibration(spectrum, fg=True):
"Format calibration factors from data file"
try:
Expand Down
3 changes: 1 addition & 2 deletions src/n42www.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 syn=python
# SPDX-License-Identifier: MIT

from flask import Flask, request, abort, Response, send_file
from flask import Flask, request, abort, send_file
from argparse import ArgumentParser, Namespace
from logging import Logger, getLogger, DEBUG, INFO, WARNING, basicConfig
from n42convert import process_single_fileobj
Expand Down Expand Up @@ -37,7 +37,6 @@ def handle_index():

@n42srv.route("/convert", methods=["POST"])
def handle_convert():

uploads = list(request.files.keys())
if uploads != [input_name]:
abort(400)
Expand Down
11 changes: 8 additions & 3 deletions src/radiacode_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import n42convert
import radqr
from rcutils import get_dose_from_spectrum
import importlib.metadata
import warnings
from argparse import ArgumentParser, Namespace
Expand Down Expand Up @@ -236,7 +237,7 @@ def main() -> None:
except KeyboardInterrupt:
t.close()
elif args.accumulate_dose: # yep, until a set dose is reached
e0 = n42convert.get_dose_from_spectrum(measurement)
e0 = get_dose_from_spectrum(measurement.counts, measurement.a0, measurement.a1, measurement.a2)
with tqdm(
desc=f"Target Dose ({args.accumulate_dose:.3f}uSv)",
unit="uSv",
Expand All @@ -246,7 +247,8 @@ def main() -> None:
waiting = True
while waiting:
sleep(1)
recv_dose = n42convert.get_dose_from_spectrum(dev.spectrum()) - e0
s = dev.spectrum()
recv_dose = get_dose_from_spectrum(s.counts, s.a0, s.a1, s.a2) - e0
t.n = round(recv_dose, 3)
t.display()
if recv_dose >= args.accumulate_dose:
Expand Down Expand Up @@ -300,7 +302,10 @@ def main() -> None:
uuid=uuid4(),
)

print(f"Total dose: {n42convert.get_dose_from_spectrum(measurement):.2f}uSv", file=sys.stderr)
print(
f"Total dose: {get_dose_from_spectrum(measurement.counts, measurement.a0, measurement.a1, measurement.a2):.2f}uSv",
file=sys.stderr,
)
ofd = sys.stdout
if args.outfile:
tfd, tfn = mkstemp(dir=".")
Expand Down
26 changes: 1 addition & 25 deletions src/rcmultispg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from radiacode import RadiaCode, Spectrum
from radiacode.transports.usb import DeviceNotFound
from threading import Barrier, BrokenBarrierError, Thread, Lock
from rcutils import UnixTime2FileTime
from time import sleep, time, strftime, gmtime
from binascii import hexlify
from collections import namedtuple
Expand Down Expand Up @@ -47,31 +48,6 @@ def handle_sigusr1(_signum=None, _stackframe=None):
USR1FLAG = True


# The spectrogram format uses FileTime, the number of 100ns intervals since the
# beginning of 1600 CE. On a linux/unix/bsd host, we get the number of (fractional)
# seconds since the beginning of 1970 CE. Here are some conversions, which, If I use
# them one more time are getting moved into a utility file...

jiffy = 1e-7
epoch_offset_jiffies = 116444736000000000


def FileTime2UnixTime(x: Number) -> float:
return (float(x) - epoch_offset_jiffies) * jiffy


def FileTime2DateTime(x: Number) -> datetime:
return datetime.fromtimestamp(FileTime2UnixTime(x))


def UnixTime2FileTime(x: Number) -> int:
return int(float(x) / jiffy + epoch_offset_jiffies)


def DateTime2FileTime(dt: datetime) -> int:
return UnixTime2FileTime(dt.timestamp())


def find_radiacode_devices() -> List[str]:
"List all the radiacode devices detected"
return [ # No error handling. Caller can deal with any errors.
Expand Down
73 changes: 73 additions & 0 deletions src/rcutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python3

"""
Some stuff found to be useful in various scripts, and should thus be hoisted into a
utility library, rather than being imported from and between scripts.
"""

from datetime import datetime
from typing import Union, List, Any

Number = Union[int, float]

# The spectrogram format uses FileTime, the number of 100ns intervals since the
# beginning of 1600 CE. On a linux/unix/bsd host, we get the number of (fractional)
# seconds since the beginning of 1970 CE. Here are some conversions, which, If I use
# them one more time are getting moved into a utility file...

_filetime_quantum = 1e-7
_filetime_epoch_offset = 116444736000000000


def FileTime2UnixTime(x: Number) -> float:
"Convert a FileTime to Unix timestamp"
return (float(x) - _filetime_epoch_offset) * _filetime_quantum


def FileTime2DateTime(x: Number) -> datetime:
"Convert a FileTime to Python DateTime"
return datetime.fromtimestamp(FileTime2UnixTime(x))


def UnixTime2FileTime(x: Number) -> int:
"Convert a Unix timestamp to FileTime"
return int(float(x) / _filetime_quantum + _filetime_epoch_offset)


def DateTime2FileTime(dt: datetime) -> int:
"Convert a Python DateTime to FileTime"
return UnixTime2FileTime(dt.timestamp())


def stringify(a: List[Any], c: str = " ") -> str:
"Make a string out of a list of things, nicer than str(list(...))"
return c.join([f"{x}" for x in a])


def get_dose_from_spectrum(
counts: List[int],
a0: float = 0,
a1: float = 3000 / 1024,
a2: float = 0,
d: float = 4.51,
v: float = 1.0,
) -> float:
"""
A somewhat generic function to estimate the energy represented by a spectrum.
counts: a list of counts per channel
a0-a2 are the calibration coefficients for the instrument
d: density in g/cm^3 of the scintillator crystal, approximately 4.51 for CsI:Tl
v: volume of the scintillator crystal, radiacode is 1cm^3
"""

joules_per_keV = 1.60218e-16
mass = d * v * 1e-3 # kg

def _chan2kev(c):
return a0 + a1 * c + a2 * c**2

total_keV = sum([_chan2kev(ch) * n for ch, n in enumerate(counts)])
gray = total_keV * joules_per_keV / mass
uSv = gray * 1e6
return uSv
17 changes: 0 additions & 17 deletions tests/test_multispg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,12 @@
# SPDX-License-Identifier: MIT

import datetime

from collections import namedtuple
from argparse import Namespace
from os.path import dirname, join as pathjoin
import rcmultispg
from rcmultispg import SpecData, Spectrum
import unittest
from unittest.mock import patch, mock_open
from io import StringIO

test_epoch = datetime.datetime(2023, 12, 20, 12, 34, 56, 987654)
# gonna need to fake time of day for sleep and stuff.
fake_clock: float = test_epoch.timestamp()
testdir = dirname(__file__)


def fake_sleep(seconds: float):
global fake_clock
fake_clock += seconds


def fake_time_time():
return fake_clock


class TestRadiaCodePoll(unittest.TestCase):
Expand Down
14 changes: 7 additions & 7 deletions tests/test_n42convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ def test_load_spectrum_filename(self):
spec = data["foreground"].pop("spectrum")
self.assertDictEqual(data, self.am241_expected)

def test_load_spectrum_no_serial_number(self):
data = n42convert.load_radiacode_spectrum(filename=pathjoin(testdir, "data_am241.xml"))

data["foreground"].pop("device_serial_number", None)
fmt_instr = n42convert.make_instrument_info(data)
self.assertNotIn("<RadInstrumentIdentifier>", fmt_instr)

def test_load_spectrum_fail_bad_spectrum_length(self):
with self.assertRaises(ValueError) as cm:
n42convert.load_radiacode_spectrum(filename=pathjoin(testdir, "data_invalid_spectrum.xml"))
Expand Down Expand Up @@ -114,13 +121,6 @@ def test_squareformat_fail0(self):
with self.assertRaises(ValueError):
n42convert.squareformat([0], 0)

def test_stringify(self):
testcases = [([], ""), ([0], "0"), ([0, 1, 2, 3], "0 1 2 3")]

for t in testcases:
self.assertEqual(n42convert.stringify(t[0]), t[1])
self.assertEqual(n42convert.stringify(t[0], ","), t[1].replace(" ", ","))

def test_argparse_no_uuid(self):
with patch("sys.argv", [__file__, "-i", self.i, "-b", self.b, "-o", self.o]):
parsed_args = n42convert.get_args()
Expand Down
9 changes: 6 additions & 3 deletions tests/test_n42www.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 syn=python
# SPDX-License-Identifier: MIT

import sys
import n42www
from n42www import n42srv
import unittest
from unittest.mock import patch, MagicMock
from io import StringIO
from unittest.mock import patch
import os

testdir = os.path.dirname(__file__)
Expand Down Expand Up @@ -70,3 +68,8 @@ def test_convert_bad_fcontent(self):
with open("/dev/null", "rb") as am241:
response = self.client.post("/convert", data={"file-input": (am241, fn)})
self.assertEqual(response.status_code, 400)

def test_main(self):
with patch("sys.argv", [__file__, "-vvv", "-b", "255.255.255.255"]):
with self.assertRaises(SystemExit):
n42www.main()
5 changes: 3 additions & 2 deletions tests/test_radiacode_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import sys
import datetime
from radiacode.types import Spectrum
from n42convert import load_radiacode_spectrum, get_dose_from_spectrum
from n42convert import load_radiacode_spectrum
from rcutils import get_dose_from_spectrum
from os.path import dirname, join as pathjoin
import radiacode_poll
import unittest
Expand Down Expand Up @@ -115,7 +116,7 @@ def test_accumulated_dose(self):
a2=dev.a2,
counts=dev.th232,
)
self.assertAlmostEqual(303.20, get_dose_from_spectrum(s), delta=0.01)
self.assertAlmostEqual(303.20, get_dose_from_spectrum(s.counts, s.a0, s.a1, s.a2), delta=0.01)

def test_main(self):
with patch("sys.stdout", new_callable=StringIO):
Expand Down
10 changes: 0 additions & 10 deletions tests/test_radqr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 syn=python
# SPDX-License-Identifier: MIT

# ick
import sys
from os.path import dirname

sys.path.insert(0, dirname(dirname(__file__)))

import radqr
import unittest
import datetime
Expand Down Expand Up @@ -291,7 +285,3 @@ def test_radqr_decode_invalid_field(self):
with self.assertRaises(ValueError) as cm:
radqr.parse_payload_fields(b"T:0,0 Z:INVALID S:0")
self.assertEqual(cm.exception.args[0], "Unknown field: Z")


if __name__ == "__main__":
unittest.main()
32 changes: 32 additions & 0 deletions tests/test_rcutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3
# coding: utf-8
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 syn=python
# SPDX-License-Identifier: MIT

import datetime
import unittest
import rcutils


class TestRadiaCodeUtils(unittest.TestCase):
unix_time = datetime.datetime(2023, 12, 1, 0, 16, 11)
file_time = 133458921710000000

def test_datetime_to_filetime(self):
self.assertEqual(rcutils.DateTime2FileTime(self.unix_time), self.file_time)

def test_filetime_to_datetime(self):
self.assertEqual(rcutils.FileTime2DateTime(self.file_time), self.unix_time)

def test_spectrum_dose(self):
a = [-10, 3.0, 0]
c = [100] * 1024
usv = rcutils.get_dose_from_spectrum(c, *a)
self.assertAlmostEqual(usv, 5.5458, delta=1e-3)

def test_stringify(self):
testcases = [([], ""), ([0], "0"), ([0, 1, 2, 3], "0 1 2 3")]

for t in testcases:
self.assertEqual(rcutils.stringify(t[0]), t[1])
self.assertEqual(rcutils.stringify(t[0], ","), t[1].replace(" ", ","))
1 change: 0 additions & 1 deletion tests/test_spectrogram_energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import unittest
from unittest.mock import patch
from io import StringIO
from argparse import Namespace
from os.path import dirname, join as pathjoin

test_dir = dirname(__file__)
Expand Down

0 comments on commit 20212c9

Please sign in to comment.