Skip to content

Commit

Permalink
add physical constants and validation tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KedoKudo committed Dec 30, 2024
1 parent 3a97728 commit 01492ea
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 135 deletions.
43 changes: 43 additions & 0 deletions src/pleiades/core/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python
"""Physical constants with validation."""

from pydantic import BaseModel, ConfigDict, Field, computed_field


class PhysicalConstants(BaseModel):
"""Physical constants used in nuclear calculations."""

model_config = ConfigDict(frozen=True)

# Particle masses (use floats instead of Mass objects)
neutron_mass_amu: float = Field(default=1.008664915, description="Neutron mass in amu")
proton_mass_amu: float = Field(default=1.007276466, description="Proton mass in amu")
electron_mass_amu: float = Field(default=0.000548579909, description="Electron mass in amu")

# Fundamental constants
speed_of_light: float = Field(default=299792458.0, description="Speed of light in m/s")
planck_constant: float = Field(default=4.135667696e-15, description="Planck constant in eV⋅s")
boltzmann_constant: float = Field(default=8.617333262e-5, description="Boltzmann constant in eV/K")
elementary_charge: float = Field(default=1.602176634e-19, description="Elementary charge in Coulombs")
avogadro_number: float = Field(default=6.02214076e23, description="Avogadro's number in mol^-1")

# Nuclear physics constants
barn_to_cm2: float = Field(default=1e-24, description="Conversion factor from barns to cm²")
amu_to_kg: float = Field(default=1.660539067e-27, description="Conversion factor from amu to kg")

# Derived constants
@computed_field
@property
def neutron_mass_kg(self) -> float:
"""Neutron mass in kilograms."""
return self.neutron_mass_amu * self.amu_to_kg

@computed_field
@property
def atomic_mass_unit_eV(self) -> float: # noqa: N802
"""One atomic mass unit in eV/c²."""
return 931.494061e6 # MeV/c² to eV/c²


# Create singleton instance
CONSTANTS = PhysicalConstants()
163 changes: 50 additions & 113 deletions src/pleiades/core/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,27 @@
import re
from importlib import resources
from pathlib import Path
from typing import Dict, List, Optional, Set, NamedTuple
from enum import Enum, auto
from typing import Dict, List, Optional, Set

from pleiades.core.models import CrossSectionPoint, DataCategory, IsotopeIdentifier, IsotopeInfo, IsotopeMassData

logger = logging.getLogger(__name__)

class DataCategory(Enum):
"""Enumeration of valid data categories."""
ISOTOPES = auto()
RESONANCES = auto()
CROSS_SECTIONS = auto()

@classmethod
def to_path(cls, category: 'DataCategory') -> str:
"""Convert category enum to path string."""
return category.name.lower()

class IsotopeInfo(NamedTuple):
"""Container for isotope information."""
spin: float
abundance: float

class CrossSectionPoint(NamedTuple):
"""Container for cross-section data points."""
energy: float # eV
cross_section: float # barns

class IsotopeMassData(NamedTuple):
"""Container for isotope mass information."""
atomic_mass: float # atomic mass units
mass_uncertainty: float
binding_energy: float # MeV
beta_decay_energy: float # MeV

class NuclearDataManager:
"""
Manages access to nuclear data files packaged with PLEIADES.
This class provides a centralized interface for accessing nuclear data files
that are distributed with the PLEIADES package. It handles path resolution,
validates file existence, and caches results for improved performance.
"""

# Mapping of categories to their valid file extensions
_VALID_EXTENSIONS = {
DataCategory.ISOTOPES: {'.info', '.mas20', '.list'},
DataCategory.RESONANCES: {'.endf'},
DataCategory.CROSS_SECTIONS: {'.tot'}
DataCategory.ISOTOPES: {".info", ".mas20", ".list"},
DataCategory.RESONANCES: {".endf"},
DataCategory.CROSS_SECTIONS: {".tot"},
}

def __init__(self):
Expand All @@ -65,10 +39,9 @@ def _initialize_cache(self) -> None:
for category in DataCategory:
try:
category_path = self._get_category_path(category)
data_path = resources.files('pleiades.data').joinpath(category_path)
data_path = resources.files("pleiades.data").joinpath(category_path)
self._cached_files[category] = {
item.name for item in data_path.iterdir()
if item.suffix in self._VALID_EXTENSIONS[category]
item.name for item in data_path.iterdir() if item.suffix in self._VALID_EXTENSIONS[category]
}
except Exception as e:
logger.error(f"Failed to initialize cache for {category}: {str(e)}")
Expand Down Expand Up @@ -101,24 +74,16 @@ def get_file_path(self, category: DataCategory, filename: str) -> Path:
file_path = Path(filename)
if file_path.suffix not in self._VALID_EXTENSIONS[category]:
raise ValueError(
f"Invalid file extension for {category}. "
f"Allowed extensions: {self._VALID_EXTENSIONS[category]}"
f"Invalid file extension for {category}. " f"Allowed extensions: {self._VALID_EXTENSIONS[category]}"
)

try:
with resources.path(
f'pleiades.data.{self._get_category_path(category)}',
filename
) as path:
with resources.path(f"pleiades.data.{self._get_category_path(category)}", filename) as path:
if not path.exists():
raise FileNotFoundError(
f"File {filename} not found in {category}"
)
raise FileNotFoundError(f"File {filename} not found in {category}")
return path
except Exception as e:
raise FileNotFoundError(
f"Error accessing {filename} in {category}: {str(e)}"
)
raise FileNotFoundError(f"Error accessing {filename} in {category}: {str(e)}")

def list_files(self, category: Optional[DataCategory] = None) -> Dict[DataCategory, List[str]]:
"""
Expand All @@ -137,11 +102,8 @@ def list_files(self, category: Optional[DataCategory] = None) -> Dict[DataCatego
if not isinstance(category, DataCategory):
raise ValueError(f"Invalid category: {category}")
return {category: sorted(self._cached_files[category])}

return {
cat: sorted(self._cached_files[cat])
for cat in DataCategory
}

return {cat: sorted(self._cached_files[cat]) for cat in DataCategory}

def validate_file(self, category: DataCategory, filename: str) -> bool:
"""
Expand All @@ -156,63 +118,45 @@ def validate_file(self, category: DataCategory, filename: str) -> bool:
"""
try:
path = Path(filename)
return (
path.suffix in self._VALID_EXTENSIONS[category] and
path in self._cached_files[category]
)
return path.suffix in self._VALID_EXTENSIONS[category] and path in self._cached_files[category]
except Exception:
return False

def get_isotope_info(self, isotope: str) -> Optional[IsotopeInfo]:
def get_isotope_info(self, isotope: IsotopeIdentifier) -> Optional[IsotopeInfo]:
"""
Extract isotope information from the isotopes.info file.
Args:
isotope: String of the form "element-nucleonNumber" (e.g. "C-13")
isotope: IsotopeIdentifier instance
Returns:
IsotopeInfo containing spin and abundance if found, None otherwise
Raises:
ValueError: If isotope string format is invalid
"""
if not re.match(r"[A-Za-z]+-\d+", isotope):
raise ValueError(
f"Invalid isotope format: {isotope}. Expected format: 'element-nucleonNumber'"
)

try:
element, nucleon = isotope.split('-')
nucleon = int(nucleon)

with self.get_file_path(DataCategory.ISOTOPES, "isotopes.info").open() as f:
for line in f:
line = line.strip()
if line and line[0].isdigit():
data = line.split()
if (data[3] == element and int(data[1]) == nucleon):
return IsotopeInfo(
spin=float(data[5]),
abundance=float(data[7])
)
if data[3] == isotope.element and int(data[1]) == isotope.mass_number:
return IsotopeInfo(spin=float(data[5]), abundance=float(data[7]))
return None
except Exception as e:
logger.error(f"Error reading isotope info for {isotope}: {str(e)}")
raise

def get_mass_data(self, element: str, mass_number: int) -> Optional[IsotopeMassData]:
def get_mass_data(self, isotope: IsotopeIdentifier) -> Optional[IsotopeMassData]:
"""
Extract mass data for an isotope from the mass.mas20 file.
Args:
element: Element symbol (e.g. "U")
mass_number: Mass number of the isotope (e.g. 238)
isotope: IsotopeIdentifier instance
Returns:
IsotopeMassData containing atomic mass, mass uncertainty
Raises:
ValueError: If element or mass number is invalid
ValueError: If data cannot be parsed
"""
try:
with self.get_file_path(DataCategory.ISOTOPES, "mass.mas20").open() as f:
Expand All @@ -221,17 +165,12 @@ def get_mass_data(self, element: str, mass_number: int) -> Optional[IsotopeMassD
next(f)

for line in f:
if (element in line[:25]) and (str(mass_number) in line[:25]):
if (isotope.element in line[:25]) and (str(isotope.mass_number) in line[:25]):
# Parse the line according to mass.mas20 format
atomic_mass_coarse = line[106:109].replace("*", "nan").replace("#", ".0")
atomic_mass_fine = line[110:124].replace("*", "nan").replace("#", ".0")

if "nan" not in [atomic_mass_coarse, atomic_mass_fine]:
# use string concatenation to combine the two parts
# e.g. "238" + "050786.936"
# -> "238050786.936"
# -> 238050786.936/1e6
# = 238.050786936
atomic_mass = float(atomic_mass_coarse + atomic_mass_fine) / 1e6
else:
atomic_mass = float("nan")
Expand All @@ -240,20 +179,20 @@ def get_mass_data(self, element: str, mass_number: int) -> Optional[IsotopeMassD
atomic_mass=atomic_mass,
mass_uncertainty=float(line[124:136].replace("*", "nan").replace("#", ".0")),
binding_energy=float(line[54:66].replace("*", "nan").replace("#", ".0")),
beta_decay_energy=float(line[81:93].replace("*", "nan").replace("#", ".0"))
beta_decay_energy=float(line[81:93].replace("*", "nan").replace("#", ".0")),
)
return None
except Exception as e:
logger.error(f"Error reading mass data for {element}-{mass_number}: {str(e)}")
logger.error(f"Error reading mass data for {isotope}: {str(e)}")
raise

def read_cross_section_data(self, filename: str, isotope: str) -> List[CrossSectionPoint]:
def read_cross_section_data(self, filename: str, isotope: IsotopeIdentifier) -> List[CrossSectionPoint]:
"""
Read cross-section data from a .tot file for a specific isotope.
Args:
filename: Name of the cross-section file
isotope: String of the form "element-nucleonNumber" (e.g. "U-238")
isotope: IsotopeIdentifier instance
Returns:
List of CrossSectionPoint containing energy and cross-section values
Expand All @@ -265,72 +204,70 @@ def read_cross_section_data(self, filename: str, isotope: str) -> List[CrossSect
data_points = []
isotope_found = False
capture_data = False


# Convert to string format expected in file (e.g., "U-238")
isotope_str = str(isotope)

with self.get_file_path(DataCategory.CROSS_SECTIONS, filename).open() as f:
for line in f:
if isotope.upper() in line:
if isotope_str.upper() in line:
isotope_found = True
elif isotope_found and "#data..." in line:
capture_data = True
elif isotope_found and "//" in line:
break
elif capture_data and not line.startswith('#'):
elif capture_data and not line.startswith("#"):
try:
energy_MeV, xs = line.split()
data_points.append(CrossSectionPoint(
energy=float(energy_MeV) * 1e6, # Convert MeV to eV
cross_section=float(xs)
))
energy_MeV, xs = line.split() # noqa: N806
data_points.append(
CrossSectionPoint(
energy=float(energy_MeV) * 1e6, # Convert MeV to eV
cross_section=float(xs),
)
)
except ValueError:
logger.warning(f"Skipping malformed line in {filename}: {line.strip()}")
continue

if not isotope_found:
raise ValueError(f"No data found for isotope {isotope} in {filename}")
raise ValueError(f"No data found for isotope {isotope_str} in {filename}")

return data_points
except Exception as e:
logger.error(f"Error reading cross-section data from {filename}: {str(e)}")
raise

def get_mat_number(self, isotope: str) -> Optional[int]:
def get_mat_number(self, isotope: IsotopeIdentifier) -> Optional[int]:
"""
Get ENDF MAT number for an isotope.
Args:
isotope: String of the form "element-nucleonNumber" (e.g. "U-238")
isotope: IsotopeIdentifier instance
Returns:
ENDF MAT number if found, None otherwise
Raises:
ValueError: If isotope format is invalid
"""
if not re.match(r"[A-Za-z]+-\d+", isotope):
raise ValueError(
f"Invalid isotope format: {isotope}. Expected format: 'element-nucleonNumber'"
)

try:
element, nucleon = isotope.split('-')

with self.get_file_path(DataCategory.ISOTOPES, "neutrons.list").open() as f:
# Line matching breakdown:
# "496) 92-U -238 IAEA EVAL-DEC14 IAEA Consortium 9237"
# When line matches pattern
# We get groups:
# match.group(1) = "92"
# match.group(1) = "92"
# match.group(2) = "U"
# match.group(3) = "238"
# Check if constructed string matches input:
# match.expand(r"\2-\3") = "U-238"
# If match found, get MAT number:
# Take last 5 characters of line " 9237" -> 9237
pattern = r"\b\s*(\d+)\s*-\s*([A-Za-z]+)\s*-\s*(\d+)([A-Za-z]*)\b"

for line in f:
match = re.search(pattern, line)
if match and match.expand(r"\2-\3") == f"{element}-{nucleon}":
if match and match.expand(r"\2-\3") == str(isotope):
return int(line[-5:])
return None
except Exception as e:
Expand All @@ -340,4 +277,4 @@ def get_mat_number(self, isotope: str) -> Optional[int]:
def clear_cache(self) -> None:
"""Clear the file cache and force reinitialization."""
self._cached_files.clear()
self._initialize_cache()
self._initialize_cache()
Loading

0 comments on commit 01492ea

Please sign in to comment.