diff --git a/chromatopy/readers/chromeleon.py b/chromatopy/readers/chromeleon.py index 2341eb1..c148e4e 100644 --- a/chromatopy/readers/chromeleon.py +++ b/chromatopy/readers/chromeleon.py @@ -79,6 +79,8 @@ def _map_measurement( # reaction_time, unit = self._extract_reaction_time(file_name) + print(content["Sample Information"][2][1]) + return Measurement( id=content["Sample Information"][2][1], chromatograms=[chromatogram], @@ -147,7 +149,7 @@ def _get_file_paths(self): len(files) == len(self.reaction_times) ), f"Number of files ({len(files)}) does not match the number of reaction times ({len(self.reaction_times)})." - self.file_paths = files + self.file_paths = sorted(files) if __name__ == "__main__": @@ -161,4 +163,4 @@ def _get_file_paths(self): ) measurements = reader.read() - print(measurements) + # print(measurements) diff --git a/chromatopy/tools/analyzer.py b/chromatopy/tools/analyzer.py index 792ae21..f66d67c 100644 --- a/chromatopy/tools/analyzer.py +++ b/chromatopy/tools/analyzer.py @@ -1,14 +1,19 @@ +import time import warnings from collections import defaultdict +import numpy as np import plotly.colors as pc import plotly.graph_objects as go from calipytion.model import Standard from calipytion.tools import Calibrator from calipytion.tools.utility import pubchem_request_molecule_name +from joblib import Parallel, delayed +from lmfit.models import GaussianModel from loguru import logger from pydantic import BaseModel, Field from pyenzyme import EnzymeMLDocument, Protein +from rich.progress import Progress, TaskID from chromatopy.model import ( Chromatogram, @@ -17,10 +22,10 @@ SignalType, UnitDefinition, ) -from chromatopy.tools.fit_chromatogram import ChromFit from chromatopy.tools.molecule import Molecule +from chromatopy.tools.peak_utils import SpectrumProcessor from chromatopy.tools.utility import _resolve_chromatogram -from chromatopy.units import C, min +from chromatopy.units import C class ChromAnalyzer(BaseModel): @@ -390,18 +395,53 @@ def read_chromeleon( return cls(id=path, measurements=measurements) - def find_peaks(self, **fitting_kwargs): + def find_peaks(self): print( f"🔍 Processing signal and integrating peaks for {len(self.measurements)} chromatograms." ) - fitting_kwargs["verbose"] = False - for measurement in self.measurements: - fitter = ChromFit( - measurement.chromatograms[0].signals, - measurement.chromatograms[0].times, + + # get all chromatograms and create spectrum processors: + processors = [ + SpectrumProcessor( + time=chrom.times, + raw_data=chrom.signals, + silent=True, + ) + for meas in self.measurements + for chrom in meas.chromatograms + ] + + def process_task( + processor: SpectrumProcessor, progress: Progress, task_id: TaskID + ): + processor.run_pripeline() + progress.update(task_id, advance=1) + time.sleep(0.1) + return processor + + with Progress() as progress: + task = progress.add_task( + "Processing chromatograms...", total=len(self.measurements) ) - fitter.fit(**fitting_kwargs) - measurement.chromatograms[0].peaks = fitter.peaks + + results = Parallel(n_jobs=-1, backend="threading")( + delayed(process_task)(processor, progress, task) + for processor in processors + ) + + processor_idx = 0 + for meas in self.measurements: + for chrom in meas.chromatograms: + chrom.processed_signal = results[processor_idx].baseline_corrected_data + chrom.peaks = [] + for peak in results[processor_idx].peak_params: + chrom.add_to_peaks( + retention_time=peak.center, + area=peak.area, + width=peak.width, + height=peak.amplitude, + ) + processor_idx += 1 def _handel_detector(self, detector: SignalType): """ @@ -438,6 +478,112 @@ def _handel_detector(self, detector: SignalType): f" {list(set(detectors))}" ) + def plot_peaks_fitted_spectrum(self): + # make plotly figure for each chromatogram whereas ech chromatogram contains multiple traces and each comatogram is mapped to one slider + from plotly.express.colors import sample_colorscale + + from chromatopy.tools.utility import generate_visibility + + fig = go.Figure() + + for idx, meas in enumerate(self.measurements): + for chrom in meas.chromatograms: + fig.add_trace( + go.Scatter( + visible=False, + x=chrom.times, + y=chrom.signals, + mode="lines", + name="Unprocessed", + hovertext=f"{meas.id}", + line=dict( + color="black", # Line color + dash="dot", # Line style ('dash' for dashed line) + width=2, # Line width + ), + ) + ) + + fig.add_trace( + go.Scatter( + visible=False, + x=chrom.times, + y=chrom.processed_signal, + mode="lines", + name="Processed", + hovertext=f"{meas.id}", + line=dict( + color="red", # Line color + dash="dot", # Line style ('dash' for dashed line) + width=2, # Line width + ), + ) + ) + + # model peaks as gaussians + color_map = sample_colorscale("viridis", len(chrom.peaks)) + + for peak in chrom.peaks: + gaussian = GaussianModel() + x_start = peak.retention_time - 5 * peak.width + x_end = peak.retention_time + 5 * peak.width + x_arr = np.linspace(x_start, x_end, 100) + fig.add_trace( + go.Scatter( + visible=False, + x=x_arr, + y=gaussian.eval( + x=x_arr, + amplitude=peak.height, + center=peak.retention_time, + sigma=peak.width, + ), + mode="lines", + name=f"Peak {peak.retention_time}", + hovertext=f"{meas.id}", + line=dict( + color=color_map.pop(0), + width=2, + ), + ) + ) + + # get data for peak by solving the gaussian model from lmfit + + fig.data[0].visible = True + fig.data[1].visible = True + n_peaks_in_first_chrom = len(self.measurements[0].chromatograms[0].peaks) + for i in range(2, 2 + n_peaks_in_first_chrom): + fig.data[i].visible = True + + # define slider for each chromatogram + + steps = [] + for meas in self.measurements: + for chrom in meas.chromatograms: + step = { + "label": f"{meas.id}", + "method": "update", + "args": [ + { + "visible": generate_visibility(meas.id, fig), + } + ], + } + steps.append(step) + + sliders = [ + { + "active": 0, + "currentvalue": {"prefix": "Chromatogram: "}, + "steps": steps, + } + ] + + fig.update_layout(sliders=sliders) + + fig.show() + def _get_peaks_by_retention_time( self, retention_time: float, @@ -650,6 +796,7 @@ def visualize(self) -> go.Figure: fig = go.Figure() for meas in self.measurements: for chrom in meas.chromatograms: + print(meas.id) fig.add_trace( go.Scatter( x=chrom.times, @@ -687,6 +834,7 @@ def visualize(self) -> go.Figure: xaxis_title="Retention time / min", yaxis_title=f"Signal {wave_string}", height=600, + legend={"traceorder": "normal"}, ) fig.show() @@ -740,111 +888,130 @@ def apply_standards(self, tolerance: float = 1): if __name__ == "__main__": - from devtools import pprint - - pprint(1) - - from chromatopy.tools.molecule import Molecule - from chromatopy.units import M, min - - path = "example_data/liam/" - reaction_times = [ - 0, - 3, - 6.0, - 9, - 12, - 15, - 18, - 21, - 24, - 27, - 30, - 45, - 60, - 90, - 120, - 150, - 180, - ] - - ana = ChromAnalyzer.read_agilent_csv( - id="lr_205", - path=path, - reaction_times=reaction_times, - time_unit=min, - ph=7.0, - temperature=25.0, - ) + # from devtools import pprint + + # pprint(1) + + # from chromatopy.tools.molecule import Molecule + # from chromatopy.units import M, min + + # path = "example_data/liam/" + # reaction_times = [ + # 0, + # 3, + # 6.0, + # 9, + # 12, + # 15, + # 18, + # 21, + # 24, + # 27, + # 30, + # 45, + # 60, + # 90, + # 120, + # 150, + # 180, + # ] + + # ana = ChromAnalyzer.read_agilent_csv( + # id="lr_205", + # path=path, + # reaction_times=reaction_times, + # time_unit=min, + # ph=7.0, + # temperature=25.0, + # ) - ana.add_molecule( - id="mal", - name="maleimide", - pubchem_cid=10935, - init_conc=0.656, - conc_unit=M, - retention_time=6.05, - ) + # ana.add_molecule( + # id="mal", + # name="maleimide", + # pubchem_cid=10935, + # init_conc=0.656, + # conc_unit=M, + # retention_time=6.05, + # ) - ana.define_internal_standard( - id="std", - name="internal standard", - pubchem_cid=123, - init_conc=1.0, - conc_unit=M, - retention_time=6.3, - retention_tolerance=0.05, - ) + # ana.define_internal_standard( + # id="std", + # name="internal standard", + # pubchem_cid=123, + # init_conc=1.0, + # conc_unit=M, + # retention_time=6.3, + # retention_tolerance=0.05, + # ) - enzdoc = ana.create_enzymeml(name="test") + # enzdoc = ana.create_enzymeml(name="test") - # pprint(enzdoc) + # # pprint(enzdoc) - # pprint(ana) + # # pprint(ana) - # ana.visualize_peaks() + # # ana.visualize_peaks() - # # Molecule( - # # id="s0", - # # pubchem_cid=123, - # # name="Molecule 1", - # # init_conc=1.0, - # # conc_unit=mM, + # # # Molecule( + # # # id="s0", + # # # pubchem_cid=123, + # # # name="Molecule 1", + # # # init_conc=1.0, + # # # conc_unit=mM, + # # # ) + # # import matplotlib.pyplot as plt + + # # plt.scatter( + # # enzdoc.measurements[0].species_data[0].time, + # # enzdoc.measurements[0].species_data[0].data, # # ) - # import matplotlib.pyplot as plt + # # plt.show() + # # ana.plot_measurements() + + # from devtools import pprint - # plt.scatter( - # enzdoc.measurements[0].species_data[0].time, - # enzdoc.measurements[0].species_data[0].data, + # from chromatopy.units.predefined import min + + # dirpath = "/Users/max/Documents/GitHub/shimadzu-example/data/kinetic/substrate_10mM_co-substrate3.12mM" + + # reaction_time = [0, 1, 2, 3, 4, 5.0] + # ana = ChromAnalyzer.read_shimadzu( + # id="23234", + # path=dirpath, + # reaction_times=reaction_time, + # time_unit=min, + # ph=7.0, + # temperature=25.0, + # temperature_unit=C, + # ) + + # ana.add_molecule( + # id="mal", + # name="maleimide", + # pubchem_cid=10935, + # init_conc=0.656, + # conc_unit=M, + # retention_time=11.38, + # wavelength=254, # ) - # plt.show() - # ana.plot_measurements() - from devtools import pprint + # ana.visualize() - from chromatopy.units.predefined import min + from chromatopy.tools.analyzer import ChromAnalyzer + from chromatopy.units import min as min_ - dirpath = "/Users/max/Documents/GitHub/shimadzu-example/data/kinetic/substrate_10mM_co-substrate3.12mM" + path = "/Users/max/Documents/jan-niklas/MjNK" - reaction_time = [0, 1, 2, 3, 4, 5.0] - ana = ChromAnalyzer.read_shimadzu( - id="23234", - path=dirpath, - reaction_times=reaction_time, - time_unit=min, - ph=7.0, + reaction_times = [0.0] * 18 + + ana = ChromAnalyzer.read_chromeleon( + path=path, + reaction_times=reaction_times, + time_unit=min_, + ph=7.4, temperature=25.0, - temperature_unit=C, ) - ana.add_molecule( - id="mal", - name="maleimide", - pubchem_cid=10935, - init_conc=0.656, - conc_unit=M, - retention_time=11.38, - wavelength=254, - ) + ana.find_peaks() - ana.visualize() + print(ana.measurements[0].chromatograms[0].peaks[0]) diff --git a/chromatopy/tools/peak_utils.py b/chromatopy/tools/peak_utils.py new file mode 100644 index 0000000..b9d803e --- /dev/null +++ b/chromatopy/tools/peak_utils.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import numpy as np +from lmfit.models import GaussianModel +from matplotlib import pyplot as plt +from pybaselines import Baseline +from pydantic import BaseModel +from rich import print +from scipy.ndimage import uniform_filter1d +from scipy.signal import find_peaks + +# from chromatopy.model import UnitDefinition + + +class PeakFitResult(BaseModel): + peak_index: int + amplitude: float + center: float + width: float + mode: str + area: float | None = None # You can calculate and store the area if needed + + def calculate_area(self, model: str = "gaussian"): + """Calculates the area under the peak based on the model.""" + models = ("gaussian", "lorentzian") + + if model == "gaussian": + # For Gaussian: Area = Amplitude * Width * sqrt(2*pi) + self.area = self.amplitude * self.width * np.sqrt(2 * np.pi) + elif model == "lorentzian": + # For Lorentzian: Area = Amplitude * Width * pi + self.area = self.amplitude * self.width * np.pi + + else: + raise ValueError(f"Model {model} not supported. Choose from {models}.") + + +class SpectrumProcessor(BaseModel): + time: list[float] + raw_data: list[float] + smoothed_data: list[float] = [] + baseline: list[float] = [] + baseline_corrected_data: list[float] = [] + peak_indices: list[int] = [] + peak_params: list[PeakFitResult] = [] + silent: bool = False + # time_unit: UnitDefinition + + def smooth_data(self, window: int = 11) -> list[float]: + """Smooths the raw data using a uniform filter. + + Args: + window (int, optional): The size of the window. Defaults to 11. + """ + self._remove_nan() + + filtered = uniform_filter1d(self.raw_data, size=window) + if not self.silent: + print("Data smoothed.") + return filtered + + # private method that checks the data for nan values adn removes them, deletes the corresponding time values + def _remove_nan(self) -> None: + """Removes NaN values from the data and the corresponding time values.""" + self.raw_data = [d for d in self.raw_data if not np.isnan(d)] + self.time = [t for t, d in zip(self.time, self.raw_data) if not np.isnan(d)] + + def correct_baseline(self) -> None: + # smooth the data + self.smoothed_data = self.smooth_data(window=11) + + baseliner = Baseline(self.smoothed_data) + + self.baseline = baseliner.mor(self.smoothed_data)[0] + self.baseline_corrected_data = self.smoothed_data - self.baseline + + if not self.silent: + print("Baseline corrected.") + + def search_peaks(self): + peaks, _ = find_peaks(self.baseline_corrected_data, height=None, prominence=4) + self.peak_indices = peaks + + if not self.silent: + print(f"Found {len(peaks)} peaks.") + + def fit_multiple_gaussians(self): + """ + Fits multiple Gaussians to the baseline-corrected data using the peaks found. + + Returns: + dict: A dictionary containing the fit results and individual component fits. + """ + # Check if peak_indices is empty + if self.peak_indices is None or len(self.peak_indices) == 0: + raise ValueError("No peaks found. Run search_peaks() first.") + + # Create a combined model by summing Gaussian models for each peak + mod = None + pars = None + for i, peak in enumerate(self.peak_indices, start=1): + prefix = f"g{i}_" + gauss = GaussianModel(prefix=prefix) + center = self.time[peak] + amplitude = self.baseline_corrected_data[peak] + sigma = ( + self.time[min(len(self.time) - 1, peak + 10)] + - self.time[max(0, peak - 10)] + ) / 2.355 # FWHM estimate + + if mod is None: + mod = gauss + else: + mod += gauss + + # Update parameters with initial guesses + if pars is None: + pars = gauss.make_params( + center=center, sigma=sigma, amplitude=amplitude + ) + else: + pars.update( + gauss.make_params(center=center, sigma=sigma, amplitude=amplitude) + ) + + # Fit the combined model to the data + out = mod.fit(self.baseline_corrected_data, pars, x=np.array(self.time)) + + # Extract the peak parameters + self.peak_params = [ + PeakFitResult( + peak_index=i, + amplitude=out.params[f"g{i}_amplitude"].value, + center=out.params[f"g{i}_center"].value, + width=out.params[f"g{i}_sigma"].value, + mode="gaussian", + ) + for i in range(1, len(self.peak_indices) + 1) + ] + + if not self.silent: + print("Fitted Gaussians to peaks.") + + def run_pripeline(self): + self.correct_baseline() + self.search_peaks() + self.fit_multiple_gaussians() + self.calculate_peak_areas() + + def plot(self): + plt.plot(self.time, self.raw_data, label="Raw Data") + plt.plot(self.time, self.baseline, "--", label="Baseline") + plt.plot(self.time, self.baseline_corrected_data, label="Corrected") + # highlight peaks + plt.plot( + [self.time[i] for i in self.peak_indices], + [self.baseline_corrected_data[i] for i in self.peak_indices], + "x", + label="Peaks", + ) + # plot the fitted Gaussians + for peak in self.peak_params: + gaussian = GaussianModel() + peak_start = max(0, int(peak.center - 3 * peak.width)) + peak_end = min(len(self.time) - 1, int(peak.center + 3 * peak.width)) + x_arr = np.array(self.time[peak_start:peak_end]) + plt.plot( + x_arr, + gaussian.eval( + x=x_arr, + amplitude=peak.amplitude, + center=peak.center, + sigma=peak.width, + ), + ":", + label=f"Peak {peak.peak_index}", + ) + plt.legend() + plt.show() + + def calculate_peak_areas(self): + for peak in self.peak_params: + peak.calculate_area() + + if not self.silent: + print("Calculated peak areas.") + + +if __name__ == "__main__": + import matplotlib.pyplot as plt + + from chromatopy.tools.analyzer import ChromAnalyzer + from chromatopy.units import min as min_ + + path = "/Users/max/Documents/jan-niklas/MjNK" + + reaction_times = [0.0] * 18 + + ana = ChromAnalyzer.read_chromeleon( + path=path, + reaction_times=reaction_times, + time_unit=min_, + ph=7.4, + temperature=25.0, + ) + + chrom = ana.measurements[2].chromatograms[0] + # Check if chrom.times is not empty, contains only floats, and has the expected length + + fitter = SpectrumProcessor(time=chrom.times, raw_data=chrom.signals) + fitter.correct_baseline() + fitter.search_peaks() + fitter.fit_multiple_gaussians() + fitter.calculate_peak_areas() + print(fitter.peak_params[0]) + fitter.plot() + + print(fitter.baseline_corrected_data) diff --git a/chromatopy/tools/utility.py b/chromatopy/tools/utility.py index 6e12f3b..9d9671f 100644 --- a/chromatopy/tools/utility.py +++ b/chromatopy/tools/utility.py @@ -1,3 +1,5 @@ +import plotly.graph_objects as go + from chromatopy.model import Chromatogram @@ -20,3 +22,13 @@ def _resolve_chromatogram( return next(chrom for chrom in chromatograms if chrom.wavelength == wavelength) raise ValueError("No chromatogram found.") + + +def generate_visibility(hover_text: str, fig: go.Figure) -> list[bool]: + visibility = [] + for trace in fig.data: + if trace.hovertext == hover_text: + visibility.append(True) + else: + visibility.append(False) + return visibility diff --git a/pyproject.toml b/pyproject.toml index 5b71748..e3bbfeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,10 +14,10 @@ calipytion = { git = "https://github.com/FAIRChemistry/CaliPytion.git", branch = pyenzyme = { git = "https://github.com/EnzymeML/PyEnzyme.git", branch = "v2-migration" } plotly = "^5.19.0" rainbow-api = "^1.0.6" -hplc-py = "0.2.5" joblib = "^1.4.2" rich = "^13.7.1" loguru = "^0.7.2" +pybaselines = "^1.1.0" [tool.poetry.group.dev.dependencies] mkdocs-material = "^9.5.12"