Skip to content

Commit

Permalink
optimized suboptimal code
Browse files Browse the repository at this point in the history
  • Loading branch information
haeussma committed Sep 9, 2024
1 parent c13e3f0 commit 997849c
Show file tree
Hide file tree
Showing 15 changed files with 21,564 additions and 485 deletions.
136 changes: 97 additions & 39 deletions chromatopy/ioutils/enzymeml.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from calipytion.tools.calibrator import Calibrator
from pyenzyme import (
DataTypes,
EnzymeMLDocument,
Expand All @@ -9,7 +10,7 @@
from pyenzyme import UnitDefinition as EnzymeMLUnitDefinition

from chromatopy.model import Measurement
from chromatopy.model import UnitDefinition as ChromatopyUnitDefinition
from chromatopy.model import UnitDefinition as UnitDefinition
from chromatopy.tools.internal_standard import InternalStandard
from chromatopy.tools.molecule import Molecule

Expand Down Expand Up @@ -37,9 +38,12 @@ def create_enzymeml(

doc = EnzymeMLDocument(name=name)

# check if concentration unit is defined for each molecule

measurement_data_instances = {}
# add EnzymeML SmallMolecules
for molecule in molecules:
_check_molecule_conc_unit_and_init_conc(molecule)
doc.small_molecules.append(
SmallMolecule(
id=molecule.id,
Expand All @@ -52,28 +56,26 @@ def create_enzymeml(
measurement_data_instances[molecule.id] = MeasurementData(
species_id=molecule.id,
initial=molecule.init_conc,
data_unit=EnzymeMLUnitDefinition(**molecule.conc_unit.model_dump()),
data_unit=EnzymeMLUnitDefinition(**molecule.conc_unit.model_dump()), # type: ignore
data_type=DataTypes.PEAK_AREA,
)

from devtools import pprint

# add Proteins
for protein in proteins:
protein_data = {
"id": protein.id,
"name": protein.name,
}
if protein.uniprot_id:
protein_data["ld_id"] = (
f"https://www.uniprot.org/uniprot/{protein.uniprot_id}"
)
protein_data = protein.model_dump()
pprint(protein_data)
protein_data.pop("conc_unit")
protein_data.pop("init_conc")

doc.proteins.append(Protein(**protein_data))

# create MeasurementData instances for each molecule
measurement_data_instances[molecule.id] = MeasurementData(
species_id=molecule.id,
initial=molecule.init_conc,
data_unit=EnzymeMLUnitDefinition(**molecule.conc_unit.model_dump()),
data_unit=EnzymeMLUnitDefinition(**protein.conc_unit.model_dump()),
data_type=DataTypes.PEAK_AREA,
)

Expand Down Expand Up @@ -129,7 +131,7 @@ def add_measurements_to_enzymeml(
mol.id: MeasurementData(
species_id=mol.id,
initial=mol.init_conc,
data_unit=EnzymeMLUnitDefinition(**mol.conc_unit.model_dump()),
data_unit=EnzymeMLUnitDefinition(**mol.conc_unit.model_dump()), # type: ignore
data_type=DataTypes.PEAK_AREA,
)
for mol in molecules
Expand Down Expand Up @@ -193,26 +195,47 @@ def measurements_to_measurmentdata(
"""
all_moecules = {molecule.id: molecule for molecule in molecules}
measured_once = measured_molecule_dict(list(all_moecules.keys()), measurements)
has_external_standard = any([molecule.standard for molecule in molecules])

# setup internal standard if it exists
if calculate_concentration and internal_standard:
internal_std_dict = {}
for molecule_id in measured_once:
if internal_standard.id == molecule_id:
continue
assert internal_standard.init_conc, f"""
The initial concentration of the internal standard molecule {internal_standard.name}
needs to be defined. Please specify the `init_conc` attribute of the molecule.
"""
assert internal_standard.conc_unit, f"""
The concentration unit of the internal standard molecule {internal_standard.name}
needs to be defined. Please specify the `conc_unit` attribute of the molecule.
"""

assert all_moecules[molecule_id].init_conc, f"""
The initial concentration of the molecule {all_moecules[molecule_id].name}
needs to be defined. Please specify the `init_conc` attribute of the molecule.
"""

internal_std_dict[molecule_id] = InternalStandard(
molecule_id=molecule_id,
standard_molecule_id=internal_standard.id,
molecule_init_conc=all_moecules[molecule_id].init_conc,
molecule_init_conc=all_moecules[molecule_id].init_conc, # type: ignore
standard_init_conc=internal_standard.init_conc,
molecule_conc_unit=internal_standard.conc_unit,
)

if calculate_concentration and has_external_standard:
calibrators: dict[str, Calibrator] = {}
for molecule in molecules:
if molecule.standard:
calibrators[molecule.id] = Calibrator.from_standard(molecule.standard)

for meas_idx, measurement in enumerate(measurements):
if calculate_concentration:
# set t0 signals for each internal standard if the measurement is at t=0
if internal_standard:
if measurement.reaction_time == 0:
if meas_idx == 0:
for internal_std in internal_std_dict.values():
internal_std._set_t0_signals(measurement)

Expand All @@ -237,42 +260,56 @@ def measurements_to_measurmentdata(
if peak:
break

if internal_standard:
for chrom in measurement.chromatograms:
internal_std_peak = next(
(
peak
for peak in chrom.peaks
if peak.molecule_id
== internal_std_dict[molecule_id].standard_molecule_id
),
None,
)
if internal_std_peak:
break
if internal_standard and peak:
internal_std_peak = next(
(
peak
for peak in chrom.peaks
if peak.molecule_id
== internal_std_dict[molecule_id].standard_molecule_id
),
None,
)
if internal_std_peak:
break

assert internal_std_peak is not None, f"""
No peak for the internal standard molecule {internal_std_dict[molecule_id]}
was assigned in measurement {meas_idx}.
"""

# use internal standard to calculate concentration
if (
peak
and calculate_concentration
and internal_standard
and internal_std_peak
):
calibrator = internal_std_dict[molecule_id]
internal_calibrator = internal_std_dict[molecule_id]
measurement_data_instances[molecule_id].data.append(
calibrator.calculate_conc(peak.area, internal_std_peak.area)
internal_calibrator.calculate_conc(
peak.area, internal_std_peak.area
)
)
measurement_data_instances[
molecule_id
].data_type = DataTypes.CONCENTRATION

# use peak area as data
elif peak:
elif has_external_standard and peak:
try:
print(f"calculating concentration for molecule {molecule_id}")
print(
f"is not extrapolate: {peak.area < calibrators[molecule_id].standard.result.calibration_range.signal_upper}"
)
print(calibrators[molecule_id].molecule_id)
measurement_data_instances[molecule_id].data.append(
calibrators[molecule_id].calculate_concentrations(
model=calibrators[molecule_id].standard.result,
signals=[peak.area],
)[0]
)
measurement_data_instances[
molecule_id
].data_type = DataTypes.CONCENTRATION

except KeyError:
print(f"no calibrator for molecule {molecule_id}")
continue

elif peak and not calculate_concentration:
measurement_data_instances[molecule_id].data.append(peak.area)
measurement_data_instances[molecule_id].data_type = DataTypes.PEAK_AREA

Expand All @@ -292,7 +329,7 @@ def measurements_to_measurmentdata(

def extract_measurement_conditions(
measurements: list[Measurement],
) -> tuple[float, float, ChromatopyUnitDefinition, ChromatopyUnitDefinition]:
) -> tuple[float, float, UnitDefinition, UnitDefinition]:
"""Asserts and extracts the measurement conditions from a list of Measurement instances.
Args:
Expand Down Expand Up @@ -368,3 +405,24 @@ def measured_molecule_dict(
measured_dict[peak.molecule_id] = True

return measured_dict


def _check_molecule_conc_unit_and_init_conc(molecule: Molecule):
if molecule.init_conc is None:
raise ValueError(f"""
No initial concentration is defined for molecule {molecule.name}.
Please specify the initial concentration of the molecule.
""")

elif molecule.conc_unit:
return

elif molecule.conc_unit is None and molecule.standard:
molecule.conc_unit = molecule.standard.samples[0].conc_unit
return

else:
raise ValueError(f"""
No concentration unit is defined for molecule {molecule.name}.
Please specify the concentration unit or define a standard for the molecule.
""")
31 changes: 18 additions & 13 deletions chromatopy/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class Measurement(BaseModel):
id: str
reaction_time: float
time_unit: UnitDefinition
temperature: float
temperature_unit: UnitDefinition
ph: float
chromatograms: list[Chromatogram] = Field(default_factory=list)
temperature: Optional[float] = Field(default=None)
temperature_unit: Optional[UnitDefinition] = Field(default=None)
ph: Optional[float] = Field(default=None)
sample_name: Optional[str] = Field(default=None)
timestamp: Optional[str] = Field(default=None)
injection_volume: Optional[float] = Field(default=None)
Expand Down Expand Up @@ -325,27 +325,31 @@ def add_to_peaks(
area: float,
molecule_id: Optional[str] = None,
type: Optional[str] = None,
peak_start: Optional[float] = None,
peak_end: Optional[float] = None,
width: Optional[float] = None,
height: Optional[float] = None,
amplitude: Optional[float] = None,
max_signal: Optional[float] = None,
skew: Optional[float] = None,
percent_area: Optional[float] = None,
tailing_factor: Optional[float] = None,
separation_factor: Optional[float] = None,
peak_start: Optional[float] = None,
peak_end: Optional[float] = None,
**kwargs,
):
params = {
"retention_time": retention_time,
"area": area,
"molecule_id": molecule_id,
"type": type,
"peak_start": peak_start,
"peak_end": peak_end,
"width": width,
"height": height,
"amplitude": amplitude,
"max_signal": max_signal,
"skew": skew,
"percent_area": percent_area,
"tailing_factor": tailing_factor,
"separation_factor": separation_factor,
"peak_start": peak_start,
"peak_end": peak_end,
}

if "id" in kwargs:
Expand All @@ -365,13 +369,15 @@ class Peak(BaseModel):
area: float
molecule_id: Optional[str] = Field(default=None)
type: Optional[str] = Field(default=None)
peak_start: Optional[float] = Field(default=None)
peak_end: Optional[float] = Field(default=None)
width: Optional[float] = Field(default=None)
height: Optional[float] = Field(default=None)
amplitude: Optional[float] = Field(default=None)
max_signal: Optional[float] = Field(default=None)
skew: Optional[float] = Field(default=None)
percent_area: Optional[float] = Field(default=None)
tailing_factor: Optional[float] = Field(default=None)
separation_factor: Optional[float] = Field(default=None)
peak_start: Optional[float] = Field(default=None)
peak_end: Optional[float] = Field(default=None)

# JSON-LD fields
ld_id: str = Field(
Expand Down Expand Up @@ -460,7 +466,6 @@ def add_type_term(
class UnitDefinition(BaseModel):
model_config: ConfigDict = ConfigDict( # type: ignore
validate_assigment=True,
use_enum_values=True,
) # type: ignore

id: Optional[str] = Field(default=None)
Expand Down
Loading

0 comments on commit 997849c

Please sign in to comment.