diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9a2a556d..73e7622c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,11 +35,11 @@ jobs: - name: Build and install ms2pip run: | - pip install .[test] + pip install .[dev] - # - name: Test with pytest - # run: | - # pytest + - name: Test with pytest + run: | + pytest - name: Test installation run: | diff --git a/docs/source/api/ms2pip.search-space.rst b/docs/source/api/ms2pip.search-space.rst new file mode 100644 index 00000000..456f2bef --- /dev/null +++ b/docs/source/api/ms2pip.search-space.rst @@ -0,0 +1,6 @@ +******************* +ms2pip.search_space +******************* + +.. automodule:: ms2pip.search_space + :members: diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 13cd8963..563eae26 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -33,12 +33,22 @@ results in: ``predict-batch`` ----------------- -[todo] +Provide a list of peptidoforms (see :ref:`Peptides / PSMs`) to predict multiple spectra at once. +For instance, + +.. code-block:: sh + + ms2pip predict-batch peptides.tsv --model TMT + +results in a file ``test_predictions.csv`` with the predicted spectra. + ``predict-library`` ------------------- -[todo] +Predict spectra for a full peptide search space generated from a protein FASTA file. Various +peptide search space parameters can be configured to control the peptidoforms that are generated. +See :ref:`ms2pip.search_space` for more information. This mode was first developed in collaboration with the ProGenTomics group for the `MS²PIP for DIA `_ project. diff --git a/fasta2speclib/__init__.py b/fasta2speclib/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/fasta2speclib/fasta2speclib.py b/fasta2speclib/fasta2speclib.py deleted file mode 100644 index a91977f0..00000000 --- a/fasta2speclib/fasta2speclib.py +++ /dev/null @@ -1,715 +0,0 @@ -""" -Create a spectral library starting from a proteome in fasta format. - -The script runs through the following steps: -- In silico cleavage of proteins from the fasta file -- Remove peptide redundancy -- Add all variations of variable modifications (max 7 PTMs/peptide) -- Add variations on charge state -- Predict spectra with MS2PIP -- Write to various output file formats - - -Unspecific cleavage (e.g. for immunopeptidomics) is supported by setting -``cleavage_rule`` to ``unspecific``. - - -Decoys added by reversing sequences, keeping the N-terminal residue inplace. - - -Modifications: -- Peptides can carry only one modification per site (side chain or terminus). -- Protein terminal modifications take precedence over peptide terminal modifications. -- Terminal modifications can have site specificity (e.g. N-term K or N-term P). - -""" - -from __future__ import annotations - -__author__ = "Ralf Gabriels" -__copyright__ = "CompOmics" -__credits__ = ["Ralf Gabriels", "Sven Degroeve", "Lennart Martens"] -__license__ = "Apache License, Version 2.0" -__email__ = "Ralf.Gabriels@ugent.be" - -import argparse -import json -import logging -import multiprocessing -import multiprocessing.dummy -from collections import defaultdict -from functools import cmp_to_key, partial -from itertools import chain, product -from pathlib import Path -from typing import Dict, List, Optional, Union - -import pandas as pd -from pydantic import BaseModel, field_validator, model_validator -from pyteomics.fasta import FASTA, Protein, decoy_db -from pyteomics.parser import icleave -from rich.logging import RichHandler -from rich.progress import track - -from ms2pip._utils.retention_time import RetentionTime -from ms2pip.core import MODELS, _Parallelized -from ms2pip.peptides import Modifications as MS2PIPModifications -from ms2pip.utils import spectrum_output - -logger = logging.getLogger(__name__) - -raise NotImplementedError("This module is not yet implemented for MS²PIP v4.") - - -class Peptide(BaseModel): - """Peptide representation within the fasta2speclib search space.""" - - sequence: str - proteins: List[str] - is_n_term: Optional[bool] = None - is_c_term: Optional[bool] = None - modification_options: List[str] = None - charge_options: List[int] = None - - -class ModificationConfig(BaseModel): - """Configuration for a single modification in the search space.""" - - name: str - mass_shift: float - unimod_accession: Optional[int] = None - amino_acid: Optional[str] = None - peptide_n_term: Optional[bool] = False - protein_n_term: Optional[bool] = False - peptide_c_term: Optional[bool] = False - protein_c_term: Optional[bool] = False - fixed: Optional[bool] = False - - @model_validator(mode="after") - def modification_must_have_target(self): - target_fields = [ - "amino_acid", - "peptide_n_term", - "protein_n_term", - "peptide_c_term", - "protein_c_term", - ] - if not any(getattr(self, t) for t in target_fields): - raise ValueError("Modifications must have a target (amino acid or N/C-term).") - return self - - -DEFAULT_MODIFICATIONS = [ - ModificationConfig( - name="Oxidation", - unimod_accession=35, - mass_shift=15.994915, - amino_acid="M", - ), - ModificationConfig( - name="Carbamidomethyl", - mass_shift=57.021464, - unimod_accession=4, - amino_acid="C", - fixed=True, - ), -] - - -class Configuration(BaseModel): - """Configuration for fasta2speclib.""" - - fasta_filename: Union[str, Path] - output_filename: Optional[str] = None - output_filetype: Optional[List[str]] = None - charges: List[int] = [2, 3] - min_length: int = 8 - max_length: int = 30 - cleavage_rule: str = "trypsin" - missed_cleavages: int = 2 - semi_specific: bool = False - modifications: List[ModificationConfig] = DEFAULT_MODIFICATIONS - max_variable_modifications: int = 3 - min_precursor_mz: Optional[float] = None - max_precursor_mz: Optional[float] = None - ms2pip_model: str = "HCD" - add_decoys: float = False - add_retention_time: float = True - deeplc: dict = dict() - batch_size: int = 10000 - num_cpu: Optional[int] = None - - @field_validator("output_filetype") - @classmethod - def _validate_output_filetypes(cls, v): - allowed_types = ["msp", "mgf", "bibliospec", "spectronaut", "dlib"] # , "hdf"] - v = [filetype.lower() for filetype in v] - for filetype in v: - if filetype not in allowed_types: - raise ValueError( - f"File type `{filetype}` not recognized. Should be one of " - f"`{allowed_types}`." - ) - return v - - @field_validator("modifications") - @classmethod - def _validate_modifications(cls, v): - if all(isinstance(m, ModificationConfig) for m in v): - return v - elif all(isinstance(m, dict) for m in v): - return [ModificationConfig(**modification) for modification in v] - else: - raise ValueError( - "Modifications should be a list of dicts or ModificationConfig objects." - ) - - @field_validator("ms2pip_model") - @classmethod - def _validate_ms2pip_model(cls, v): - if v not in MODELS.keys(): - raise ValueError( - f"MS²PIP model `{v}` not recognized. Should be one of " f"`{MODELS.keys()}`." - ) - return v - - @field_validator("num_cpu") - @classmethod - def _validate_num_cpu(cls, v): - available_cpus = multiprocessing.cpu_count() - if not v or not 0 < v < available_cpus: - return available_cpus - else: - return v - - def get_output_filename(self): - if self.output_filename: - return self.output_filename - else: - return str(Path(self.fasta_filename).with_suffix("")) - - -class Fasta2SpecLib: - """Generate an MS²PIP- and DeepLC-predicted spectral library from a FASTA file.""" - - def __init__( - self, - fasta_filename: Union[str, Path], - output_filename: Optional[Union[str, Path]] = None, - config: Optional[Union[Configuration, dict]] = None, - ): - """ - Generate an MS²PIP- and DeepLC-predicted spectral library. - - fasta_filename: str, Path - Path to input FASTA file. - output_filename: str, Path - Stem for output filenames. For instance, ``./output`` would result in - ``./output.msp``. If ``None``, the output filename will be based on the - input FASTA filename. - config: Configuration, dict, optional - Configuration of fasta2speclib. See documentation for more info - - """ - # Parse configuration - if config: - if isinstance(config, dict): - config["fasta_filename"] = fasta_filename - config["output_filename"] = output_filename - config = Configuration.model_validate(config) - elif isinstance(config, Configuration): - config.fasta_filename = fasta_filename - config.output_filename = output_filename - else: - raise TypeError(f"Invalid type for configuration: `{type(config)}`.") - else: - config = Configuration(fasta_filename=fasta_filename, output_filename=output_filename) - - # `unspecific` is not an option in pyteomics.parser.icleave, so we configure - # the settings for unspecific cleavage here. - if config.cleavage_rule == "unspecific": - config.missed_cleavages = config.max_length - config.cleavage_rule = r"(?<=[A-Z])" - - # Setup multiprocessing, using a dummy pool if num_cpu is 1 - if config.num_cpu != 1: - self.Pool = multiprocessing.Pool - else: - self.Pool = multiprocessing.dummy.Pool - - self.config = config - self.rt_predictor = self._get_rt_predictor(config) - self.ms2pip_params = self._prepare_ms2pip_params(config) - - def run(self): - """Run the library generation pipeline.""" - peptides = self.prepare_search_space() - batches = self.peptides_to_batches(peptides, self.config.batch_size) - - # Run in batches to avoid memory issues - for batch_id, batch_peptides in enumerate(batches): - logger.info(f"Processing batch {batch_id + 1}/{len(batches)}...") - self.process_batch(batch_id, batch_peptides) - - def prepare_search_space(self) -> List[Peptide]: - """Prepare peptide search space from FASTA file.""" - logger.info("Preparing search space...") - - # Setup database, with decoy configuration if required - n_proteins = count_fasta_entries(self.config.fasta_filename) - if self.config.add_decoys: - fasta_db = decoy_db( - self.config.fasta_filename, - mode="reverse", - decoy_only=False, - keep_nterm=True, - ) - else: - fasta_db = FASTA(self.config.fasta_filename) - n_proteins *= 2 - - # Read proteins and digest to peptides - with self.Pool(self.config.num_cpu) as pool: - partial_digest_protein = partial( - self._digest_protein, - min_length=self.config.min_length, - max_length=self.config.max_length, - cleavage_rule=self.config.cleavage_rule, - missed_cleavages=self.config.missed_cleavages, - semi_specific=self.config.semi_specific, - ) - results = track( - pool.imap(partial_digest_protein, fasta_db), - total=n_proteins, - description="Digesting proteins...", - transient=True, - ) - peptides = list(chain.from_iterable(results)) - - # Remove redundancy in peptides and combine protein lists - peptide_dict = dict() - for peptide in track( - peptides, - description="Removing peptide redundancy...", - transient=True, - ): - if peptide.sequence in peptide_dict: - peptide_dict[peptide.sequence].proteins.extend(peptide.proteins) - else: - peptide_dict[peptide.sequence] = peptide - peptides = list(peptide_dict.values()) - - # Add modification and charge permutations - modifications_by_target = self._get_modifications_by_target(self.config.modifications) - modification_options = [] - with self.Pool(self.config.num_cpu) as pool: - partial_get_modification_versions = partial( - self._get_modification_versions, - modifications=self.config.modifications, - modifications_by_target=modifications_by_target, - max_variable_modifications=self.config.max_variable_modifications, - ) - modification_options = pool.imap(partial_get_modification_versions, peptides) - for pep, mod_opt in track( - zip(peptides, modification_options), - description="Adding modifications...", - total=len(peptides), - transient=True, - ): - pep.modification_options = mod_opt - pep.charge_options = self.config.charges - - logger.info(f"Search space contains {len(peptides)} peptides.") - return peptides - - @staticmethod - def peptides_to_batches(peptides: List[Peptide], batch_size: int) -> List[List[Peptide]]: - """Divide peptides into batches for batch-based processing.""" - return [peptides[i : i + batch_size] for i in range(0, len(peptides), batch_size)] - - def process_batch(self, batch_id: int, batch_peptides: List[Peptide]): - """Predict and write library for a batch of peptides.""" - # Generate MS²PIP input - logger.info("Generating MS²PIP input...") - peprec = self._peptides_to_peprec(batch_peptides) - logger.info(f"Chunk contains {len(peprec)} peptidoforms.") - - # Filter on precursor m/z - if self.config.min_precursor_mz and self.config.max_precursor_mz: - mods = MS2PIPModifications() - mods.add_from_ms2pip_modstrings(self.ms2pip_params["ms2pip"]["ptm"]) - precursor_mz = peprec.apply( - lambda x: mods.calc_precursor_mz(x["peptide"], x["modifications"], x["charge"])[1], - axis=1, - ) - before = len(peprec) - peprec = ( - peprec[ - (self.config.min_precursor_mz <= precursor_mz) - & (precursor_mz <= self.config.max_precursor_mz) - ] - .reset_index(drop=True) - .copy() - ) - after = len(peprec) - logger.info(f"Filtered batch on precursor m/z: {before} -> {after}") - - # Predict retention time - if self.config.add_retention_time: - logger.info("Predicting retention times with DeepLC...") - self.rt_predictor.add_rt_predictions(peprec) - - # Predict spectra - logger.info("Predicting spectra with MS²PIP...") - ms2pip = _Parallelized( - peprec, - num_cpu=self.config.num_cpu, - params=self.ms2pip_params, - return_results=True, - add_retention_time=False, - ) - predictions = ms2pip.run() - - # Write output - logger.info("Writing output...") - self._write_predictions( - predictions, - peprec, - self.config.output_filetype, - self.config.get_output_filename(), - self.ms2pip_params, - append=batch_id != 0, - ) - - @staticmethod - def _get_rt_predictor(config: Configuration) -> RetentionTime: - """Initialize and return MS²PIP wrapper for DeepLC predictor.""" - if config.add_retention_time: - logger.debug("Initializing DeepLC predictor") - if not config.deeplc: - config.deeplc = {"calibration_file": None} - if "n_jobs" not in config.deeplc: - config.deeplc["n_jobs"] = config.num_cpu - rt_predictor = RetentionTime(config=config.dict()) - else: - rt_predictor = None - return rt_predictor - - @staticmethod - def _prepare_ms2pip_params(config: Configuration) -> dict: - """Prepare MS²PIP parameters from fasta2speclib configuration.""" - ms2pip_params = { - "ms2pip": { - "model": config.ms2pip_model, - "frag_error": 0.02, - "ptm": [ - ( - "{},{},opt,N-term".format(mod.name, mod.mass_shift) - if mod.peptide_n_term or mod.protein_n_term - else ( - "{},{},opt,C-term".format(mod.name, mod.mass_shift) - if mod.peptide_c_term or mod.protein_c_term - else "{},{},opt,{}".format(mod.name, mod.mass_shift, mod.amino_acid) - ) - ) - for mod in config.modifications - ], - "sptm": [], - "gptm": [], - } - } - return ms2pip_params - - @staticmethod - def _digest_protein( - protein: Protein, - min_length: int = 8, - max_length: int = 30, - cleavage_rule: str = "trypsin", - missed_cleavages: int = 2, - semi_specific: bool = False, - ) -> List[Peptide]: - """Digest protein sequence and return a list of validated peptides.""" - - def valid_residues(sequence: str) -> bool: - return not any(aa in sequence for aa in ["B", "J", "O", "U", "X", "Z"]) - - def parse_peptide( - start_position: int, - sequence: str, - protein: Protein, - ) -> Peptide: - """Parse result from parser.icleave into Peptide.""" - return Peptide( - sequence=sequence, - # Assumes protein ID is description until first space - proteins=[protein.description.split(" ")[0]], - is_n_term=start_position == 0, - is_c_term=start_position + len(sequence) == len(protein.sequence), - ) - - peptides = [ - parse_peptide(start, seq, protein) - for start, seq in icleave( - protein.sequence, - cleavage_rule, - missed_cleavages=missed_cleavages, - min_length=min_length, - max_length=max_length, - semi=semi_specific, - ) - if valid_residues(seq) - ] - - return peptides - - @staticmethod - def _get_modifications_by_target( - modifications, - ) -> Dict[str, Dict[str, List[ModificationConfig]]]: - """Restructure variable modifications to options per side chain or terminus.""" - modifications_by_target = { - "sidechain": defaultdict(lambda: [None]), - "peptide_n_term": defaultdict(lambda: [None]), - "peptide_c_term": defaultdict(lambda: [None]), - "protein_n_term": defaultdict(lambda: [None]), - "protein_c_term": defaultdict(lambda: [None]), - } - - def add_mod(mod, target, amino_acid): - if amino_acid: - modifications_by_target[target][amino_acid].append(mod) - else: - modifications_by_target[target]["any"].append(mod) - - for mod in modifications: - if mod.fixed: - continue - if mod.peptide_n_term: - add_mod(mod, "peptide_n_term", mod.amino_acid) - elif mod.peptide_c_term: - add_mod(mod, "peptide_c_term", mod.amino_acid) - elif mod.protein_n_term: - add_mod(mod, "protein_n_term", mod.amino_acid) - elif mod.protein_c_term: - add_mod(mod, "protein_c_term", mod.amino_acid) - else: - add_mod(mod, "sidechain", mod.amino_acid) - - return {k: dict(v) for k, v in modifications_by_target.items()} - - # TODO: Make adding modifications more efficient - @staticmethod - def _get_modification_versions( - peptide: Peptide, - modifications: List[ModificationConfig], - modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]], - max_variable_modifications: int = 3, - ) -> List[str]: - """Get MS²PIP modification strings for all potential versions.""" - possibilities_by_site = defaultdict(list) - - # Generate dictionary of positions per amino acid - pos_dict = defaultdict(list) - for pos, aa in enumerate(peptide.sequence): - pos_dict[aa].append(pos + 1) - # Map modifications to positions - for aa in set(pos_dict).intersection(set(modifications_by_target["sidechain"])): - possibilities_by_site.update( - {pos: modifications_by_target["sidechain"][aa] for pos in pos_dict[aa]} - ) - - # Assign possible modifications per terminus - for terminus, position, specificity in [ - ("peptide_n_term", 0, None), - ("peptide_c_term", -1, None), - ("protein_n_term", 0, "is_n_term"), - ("protein_c_term", -1, "is_c_term"), - ]: - if specificity is None or getattr(peptide, specificity): - for site, mods in modifications_by_target[terminus].items(): - if site == "any" or peptide.sequence[position] == site: - possibilities_by_site[position].extend(mods) - - # Override with fixed modifications - for mod in modifications: - aa = mod.amino_acid - # Skip variable modifications - if not mod.fixed: - continue - # Assign if specific aa matches or if no aa is specified for each terminus - for terminus, position, specificity in [ - ("peptide_n_term", 0, None), - ("peptide_c_term", -1, None), - ("protein_n_term", 0, "is_n_term"), - ("protein_c_term", -1, "is_c_term"), - ]: - if getattr(mod, terminus): # Mod has this terminus - if specificity is None or getattr(peptide, specificity): # Specificity matches - if not aa or (aa and peptide.sequence[position] == aa): # Aa matches - possibilities_by_site[position] = [mod] # Override with fixed mod - break # Allow `else: if amino_acid` if no terminus matches - # Assign if fixed modification is not terminal and specific aa matches - else: - if aa: - for pos in pos_dict[aa]: - possibilities_by_site[pos] = [mod] - - # Get all possible combinations of modifications for all sites - mod_permutations = product(*possibilities_by_site.values()) - mod_positions = possibilities_by_site.keys() - - # Filter by max modified sites (avoiding combinatorial explosion) - mod_permutations = filter( - lambda mods: sum([1 for m in mods if m is not None and not m.fixed]) - <= max_variable_modifications, - mod_permutations, - ) - - def _compare_minus_one_larger(a, b): - """Custom comparison function where `-1` is always larger.""" - if a[0] == -1: - return 1 - elif b[0] == -1: - return -1 - else: - return a[0] - b[0] - - # Get MS²PIP modifications strings for each combination - mod_strings = [] - for p in mod_permutations: - if p == [""]: - mod_strings.append("-") - else: - mods = sorted(zip(mod_positions, p), key=cmp_to_key(_compare_minus_one_larger)) - mod_strings.append("|".join(f"{p}|{m.name}" for p, m in mods if m)) - - return mod_strings - - @staticmethod - def _peptides_to_peprec(peptides: List[Peptide]) -> pd.DataFrame: - """Convert a list of peptides to a PeptideRecord DataFrame.""" - peprec = pd.DataFrame( - [ - { - "peptide": peptide.sequence, - "modifications": modifications, - "charge": charge, - "protein_list": peptide.proteins, - } - for peptide in peptides - for charge in peptide.charge_options - for modifications in peptide.modification_options - ], - columns=["spec_id", "peptide", "modifications", "charge", "protein_list"], - ) - peprec["spec_id"] = peprec.index - return peprec - - @staticmethod - def _write_predictions( - predictions: pd.DataFrame, - peprec: pd.DataFrame, - filetypes: List[str], - filename: str, - ms2pip_params: Dict, - append: bool = False, - ): - """Write predictions (for batch) to requested output file formats.""" - write_mode = "a" if append else "w" - # Remove to avoid PyTables dependency? - # if "hdf" in filetypes: - # logger.info(f"Writing results to {filename}_predictions.hdf") - # predictions.astype(str).to_hdf( - # f"{filename}_predictions.hdf", - # key="table", - # format="table", - # complevel=3, - # complib="zlib", - # mode=write_mode, - # append=append, - # min_itemsize=50, - # ) - spec_out = spectrum_output.SpectrumOutput( - predictions, - peprec, - ms2pip_params["ms2pip"], - output_filename=filename, - write_mode=write_mode, - ) - if "msp" in filetypes: - spec_out.write_msp() - if "mgf" in filetypes: - spec_out.write_mgf() - if "bibliospec" in filetypes: - spec_out.write_bibliospec() - if "spectronaut" in filetypes: - spec_out.write_spectronaut() - if "dlib" in filetypes: - spec_out.write_dlib() - - -def count_fasta_entries(filename: Union[str, Path]) -> int: - """Count the number of entries in a FASTA file.""" - with open(filename, "rt") as f: - count = 0 - for line in f: - if line[0] == ">": - count += 1 - return count - - -def _argument_parser(): - parser = argparse.ArgumentParser( - description=( - "Create an MS2PIP- and DeepLC-predicted spectral library, starting from a " - "FASTA file." - ) - ) - parser.add_argument( - "fasta_filename", - action="store", - help="Path to the FASTA file containing protein sequences", - ) - parser.add_argument( - "-o", - dest="output_filename", - action="store", - help="Name for output file(s) (if not given, derived from FASTA file)", - ) - parser.add_argument( - "-c", - dest="config_filename", - action="store", - help="Name of configuration json file (default: fasta2speclib_config.json)", - ) - - args = parser.parse_args() - return args - - -def main(): - """Command line entrypoint for fasta2speclib.""" - # Configure logging - logging.basicConfig( - format="%(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - level=logging.INFO, - handlers=[RichHandler(rich_tracebacks=True, show_level=True, show_path=False)], - ) - logging.getLogger("ms2pip").setLevel(logging.WARNING) - logging.getLogger("deeplc").setLevel(logging.WARNING) - - # Get configuration from CLI and config file - args = _argument_parser() - with open(args.config_filename, "rt") as config_file: - config_dict = json.load(config_file) - - # Run fasta2speclib - logger.info("Starting library generation pipeline...") - f2sl = Fasta2SpecLib(args.fasta_filename, args.output_filename, config_dict) - f2sl.run() - logger.info("Done!") - - -if __name__ == "__main__": - main() diff --git a/fasta2speclib/fasta2speclib_config.json b/fasta2speclib/fasta2speclib_config.json deleted file mode 100644 index 2accdaab..00000000 --- a/fasta2speclib/fasta2speclib_config.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "output_filetype":["msp", "mgf", "bibliospec", "dlib"], - "charges":[2, 3], - "min_peplen":8, - "max_peplen":30, - "cleavage_rule":"trypsin", - "missed_cleavages":2, - "semi_specific":false, - "modifications":[ - {"name":"Acetyl", "unimod_accession":1, "mass_shift":42.010565, "amino_acid":null, "protein_n_term":true}, - {"name":"Oxidation", "unimod_accession":35, "mass_shift":15.994915, "amino_acid":"M"}, - {"name":"Carbamidomethyl", "unimod_accession":4, "mass_shift":57.021464, "amino_acid":"C", "fixed":true} - ], - "ms2pip_model":"HCD", - "decoy":false, - "add_retention_time":true, - "deeplc": {}, - "batch_size":10000, - "num_cpu":null -} diff --git a/fasta2speclib/fasta2speclib_config.md b/fasta2speclib/fasta2speclib_config.md deleted file mode 100644 index fba38a11..00000000 --- a/fasta2speclib/fasta2speclib_config.md +++ /dev/null @@ -1,2 +0,0 @@ -# fasta2speclib configuration info -Moved to [compomics.github.io](http://compomics.github.io/projects/ms2pip_c/wiki/fasta2speclib.html) diff --git a/ms2pip/__main__.py b/ms2pip/__main__.py index 818c514f..3ea6d1da 100644 --- a/ms2pip/__main__.py +++ b/ms2pip/__main__.py @@ -105,8 +105,27 @@ def predict_batch(*args, **kwargs): @cli.command(help=ms2pip.core.predict_library.__doc__) +@click.argument("fasta-file", required=False, type=click.Path(exists=True, dir_okay=False)) +@click.option("--config", "-c", type=click.Path(exists=True, dir_okay=False)) +@click.option("--output-name", "-o", type=str) +@click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="msp") +@click.option("--add-retention-time", "-r", is_flag=True) +@click.option("--model", type=click.Choice(MODELS), default="HCD") +@click.option("--model-dir") +@click.option("--batch-size", type=int, default=100000) +@click.option("--processes", "-n", type=int) def predict_library(*args, **kwargs): - ms2pip.core.predict_library(*args, **kwargs) + # Parse arguments + if not kwargs["fasta_file"] and not kwargs["config"]: + raise click.UsageError("Either `fasta_file` or `config` must be provided.") + output_format = kwargs.pop("output_format") + output_name = _infer_output_name( + kwargs["fasta_file"] or kwargs["config"], kwargs.pop("output_name") + ) + + # Run and write output for each batch + for i, result_batch in enumerate(ms2pip.core.predict_library(*args, **kwargs)): + write_spectra(output_name, result_batch, output_format, write_mode="w" if i == 0 else "a") @cli.command(help=ms2pip.core.correlate.__doc__) diff --git a/ms2pip/core.py b/ms2pip/core.py index 31feb3a9..898f6948 100644 --- a/ms2pip/core.py +++ b/ms2pip/core.py @@ -9,11 +9,12 @@ from collections import defaultdict from math import ceil from pathlib import Path -from typing import Callable, List, Optional, Tuple, Union +from typing import Any, Callable, Generator, Iterable, List, Optional, Tuple, Union import numpy as np import pandas as pd from psm_utils import PSM, Peptidoform, PSMList +from rich.progress import track import ms2pip.exceptions as exceptions from ms2pip import spectrum_output @@ -25,6 +26,7 @@ from ms2pip._utils.xgb_models import get_predictions_xgb, validate_requested_xgb_model from ms2pip.constants import MODELS from ms2pip.result import ProcessingResult, calculate_correlations +from ms2pip.search_space import ProteomeSearchSpace from ms2pip.spectrum_input import read_spectrum_file from ms2pip.spectrum_output import SUPPORTED_FORMATS @@ -102,6 +104,8 @@ def predict_batch( Predicted spectra with theoretical m/z and predicted intensity values. """ + if isinstance(psms, list): + psms = PSMList(psm_list=psms) psm_list = read_psms(psms, filetype=psm_filetype) if add_retention_time: @@ -122,9 +126,68 @@ def predict_batch( return results -def predict_library(): - """Predict spectral library from protein FASTA file.""" - raise NotImplementedError +def predict_library( + fasta_file: Optional[Union[str, Path]] = None, + config: Optional[Union[ProteomeSearchSpace, dict, str, Path]] = None, + add_retention_time: bool = False, + model: Optional[str] = "HCD", + model_dir: Optional[Union[str, Path]] = None, + batch_size: int = 100000, + processes: Optional[int] = None, +) -> Generator[ProcessingResult, None, None]: + """ + Predict spectral library from protein FASTA file.\f + + Parameters + ---------- + fasta_file + Path to FASTA file with protein sequences. Required if `search-space-config` is not + provided. + config + ProteomeSearchSpace, or a dictionary or path to JSON file with proteome search space + parameters. Required if `fasta_file` is not provided. + add_retention_time + Add retention time predictions with DeepLC (Requires optional DeepLC dependency). + model + Model to use for prediction. Default: "HCD". + model_dir + Directory where XGBoost model files are stored. Default: `~/.ms2pip`. + batch_size + Number of peptides to process in each batch. + processes + Number of parallel processes for multiprocessing steps. By default, all available. + + """ + if fasta_file and config: + # Use provided proteome, but overwrite fasta_file + config = ProteomeSearchSpace.from_any(config) + config.fasta_file = fasta_file + elif fasta_file and not config: + # Default proteome search space with provided fasta_file + config = ProteomeSearchSpace(fasta_file=fasta_file) + elif not fasta_file and config: + # Use provided proteome + config = ProteomeSearchSpace.from_any(config) + else: + raise ValueError("Either `fasta_file` or `config` must be provided.") + + search_space = ProteomeSearchSpace.from_any(config) + search_space.build() + + for batch in track( + _into_batches(search_space, batch_size=batch_size), + description="Predicting spectra...", + total = ceil(len(search_space) / batch_size), + ): + logging.disable(logging.CRITICAL) + yield predict_batch( + search_space.filter_psms_by_mz(PSMList(psm_list=list(batch))), + add_retention_time=add_retention_time, + model=model, + model_dir=model_dir, + processes=processes, + ) + logging.disable(logging.NOTSET) def correlate( @@ -907,3 +970,15 @@ def _assemble_training_data(results: List[ProcessingResult], model: str) -> pd.D ] return training_data + + +def _into_batches(iterable: Iterable[Any], batch_size: int) -> Generator[List[Any], None, None]: + """Accumulate iterator elements into batches of a given size.""" + batch = [] + for item in iterable: + batch.append(item) + if len(batch) == batch_size: + yield batch + batch = [] + if batch: + yield batch diff --git a/ms2pip/result.py b/ms2pip/result.py index bb5ef014..3cf7eb73 100644 --- a/ms2pip/result.py +++ b/ms2pip/result.py @@ -4,6 +4,7 @@ import csv from typing import Any, Dict, List, Optional, Tuple +from logging import getLogger import numpy as np from psm_utils import PSM @@ -18,6 +19,7 @@ from ms2pip.spectrum import ObservedSpectrum, PredictedSpectrum +logger = getLogger(__name__) class ProcessingResult(BaseModel): """Result of processing a single PSM.""" diff --git a/ms2pip/search_space.py b/ms2pip/search_space.py new file mode 100644 index 00000000..17c99a26 --- /dev/null +++ b/ms2pip/search_space.py @@ -0,0 +1,637 @@ +""" +Define and build the search space for in silico spectral library generation. + +This module defines the search space for in silico spectral library generation as a +:py:class:`~ProteomeSearchSpace` object. Variable and fixed modifications can be configured +as :py:class:`~ModificationConfig` objects. + +The peptide search space can be built from a protein FASTA file and a set of parameters, which can +then be converted to a py:class:`psm_utils.PSMList` object for use in :py:mod:`ms2pip`. + +For an unspecific protein digestion, the cleavage rule can be set to ``unspecific``. This will +result in a cleavage rule that allows cleavage after any amino acid with an unlimited number of +allowed missed cleavages. + + +Examples +-------- +>>> from ms2pip.search_space import ProteomeSearchSpace, ModificationConfig +>>> search_space = ProteomeSearchSpace( +... fasta_file="tests/data/test_proteins.fasta", +... min_length=8, +... max_length=30, +... cleavage_rule="trypsin", +... missed_cleavages=2, +... semi_specific=False, +... modifications=[ +... ModificationConfig(label="UNIMOD:Oxidation", amino_acid="M"), +... ModificationConfig(label="UNIMOD:Carbamidomethyl", amino_acid="C", fixed=True), +... ], +... charges=[2, 3], +... ) +>>> psm_list = search_space.into_psm_list() + +>>> from ms2pip.search_space import ProteomeSearchSpace +>>> search_space = ProteomeSearchSpace.from_any("tests/data/test_search_space.json") +>>> psm_list = search_space.into_psm_list() + +""" + +from __future__ import annotations + +import multiprocessing +import multiprocessing.dummy +from collections import defaultdict +from functools import partial +from itertools import chain, combinations, product +from logging import getLogger +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Union + +import numpy as np +import pyteomics.fasta +from psm_utils import PSM, PSMList +from pydantic import BaseModel, field_validator, model_validator +from pyteomics.parser import icleave +from rich.progress import track + +logger = getLogger(__name__) + + +class ModificationConfig(BaseModel): + """Configuration for a single modification in the search space.""" + + label: str + amino_acid: Optional[str] = None + peptide_n_term: Optional[bool] = False + protein_n_term: Optional[bool] = False + peptide_c_term: Optional[bool] = False + protein_c_term: Optional[bool] = False + fixed: Optional[bool] = False + + def __init__(self, **data: Any): + """ + Configuration for a single modification in the search space. + + Parameters + ---------- + label + Label of the modification. This can be any valid ProForma 2.0 label. + amino_acid + Amino acid target of the modification. :py:obj:`None` if the modification is not + specific to an amino acid. Default is None. + peptide_n_term + Whether the modification occurs only on the peptide N-terminus. Default is False. + protein_n_term + Whether the modification occurs only on the protein N-terminus. Default is False. + peptide_c_term + Whether the modification occurs only on the peptide C-terminus. Default is False. + protein_c_term + Whether the modification occurs only on the protein C-terminus. Default is False. + fixed + Whether the modification is fixed. Default is False. + + """ + super().__init__(**data) + + @model_validator(mode="after") + def _modification_must_have_target(self): + target_fields = [ + "amino_acid", + "peptide_n_term", + "protein_n_term", + "peptide_c_term", + "protein_c_term", + ] + if not any(getattr(self, t) for t in target_fields): + raise ValueError("Modifications must have a target (amino acid or N/C-term).") + return self + + +DEFAULT_MODIFICATIONS = [ + ModificationConfig( + label="UNIMOD:Oxidation", + amino_acid="M", + ), + ModificationConfig( + label="UNIMOD:Carbamidomethyl", + amino_acid="C", + fixed=True, + ), +] + + +class ProteomeSearchSpace(BaseModel): + """Search space for in silico spectral library generation.""" + + fasta_file: Path + min_length: int = 8 + max_length: int = 30 + min_precursor_mz: Optional[float] = 0 + max_precursor_mz: Optional[float] = np.Inf + cleavage_rule: str = "trypsin" + missed_cleavages: int = 2 + semi_specific: bool = False + add_decoys: bool = False + modifications: List[ModificationConfig] = DEFAULT_MODIFICATIONS + max_variable_modifications: int = 3 + charges: List[int] = [2, 3] + + def __init__(self, **data: Any): + """ + Search space for in silico spectral library generation. + + Parameters + ---------- + fasta_file + Path to FASTA file with protein sequences. + min_length + Minimum peptide length. Default is 8. + max_length + Maximum peptide length. Default is 30. + min_precursor_mz + Minimum precursor m/z for peptides. Default is 0. + max_precursor_mz + Maximum precursor m/z for peptides. Default is np.Inf. + cleavage_rule + Cleavage rule for peptide digestion. Default is "trypsin". + missed_cleavages + Maximum number of missed cleavages. Default is 2. + semi_specific + Allow semi-specific cleavage. Default is False. + add_decoys + Add decoy sequences to search space. Default is False. + modifications + List of modifications to consider. Default is oxidation of M and carbamidomethylation + of C. + max_variable_modifications + Maximum number of variable modifications per peptide. Default is 3. + charges + List of charges to consider. Default is [2, 3]. + + """ + + super().__init__(**data) + self._peptidoform_spaces: List[_PeptidoformSearchSpace] = [] + + @field_validator("modifications") + @classmethod + def _validate_modifications(cls, v): + if all(isinstance(m, ModificationConfig) for m in v): + return v + elif all(isinstance(m, dict) for m in v): + return [ModificationConfig(**modification) for modification in v] + else: + raise ValueError( + "Modifications should be a list of dicts or ModificationConfig objects." + ) + + @model_validator(mode="after") + def _validate_unspecific_cleavage(self): + """Validate and configure unspecific cleavage settings.""" + # `unspecific` is not an option in pyteomics.parser.icleave, so we configure + # the settings for unspecific cleavage manually. + if self.cleavage_rule == "unspecific": + self.missed_cleavages = self.max_length + self.cleavage_rule = r"(?<=[A-Z])" + return self + + def __len__(self): + if not self._peptidoform_spaces: + raise ValueError("Search space must be built before length can be determined.") + return sum(len(pep_space) for pep_space in self._peptidoform_spaces) + + @classmethod + def from_any(cls, _input: Union[dict, str, Path, ProteomeSearchSpace]) -> ProteomeSearchSpace: + """ + Create ProteomeSearchSpace from various input types. + + Parameters + ---------- + _input + Search space parameters as a dictionary, a path to a JSON file, an existing + :py:class:`ProteomeSearchSpace` object. + + """ + if isinstance(_input, ProteomeSearchSpace): + return _input + elif isinstance(_input, (str, Path)): + with open(_input, "rt") as f: + return cls.model_validate_json(f.read()) + elif isinstance(_input, dict): + return cls.model_validate(_input) + else: + raise ValueError("Search space must be a dict, str, Path, or ProteomeSearchSpace.") + + def build(self, processes: int = 1): + """ + Build peptide search space from FASTA file. + + Parameters + ---------- + processes : int + Number of processes to use for parallelization. + + """ + processes = processes if processes else multiprocessing.cpu_count() + self._digest_fasta(processes) + self._remove_redundancy() + self._add_modifications(processes) + self._add_charges() + + def __iter__(self) -> Generator[PSM, None, None]: + """ + Generate PSMs from search space. + + If :py:meth:`build` has not been called, the search space will first be built with the + given parameters. + + Parameters + ---------- + processes : int + Number of processes to use for parallelization. + + """ + # Build search space if not already built + if not self._peptidoform_spaces: + raise ValueError("Search space must be built before PSMs can be generated.") + + spectrum_id = 0 + for pep_space in self._peptidoform_spaces: + for pep in pep_space: + yield PSM( + peptidoform=pep, + spectrum_id=spectrum_id, + protein_list=pep_space.proteins, + ) + spectrum_id += 1 + + def filter_psms_by_mz(self, psms: PSMList) -> PSMList: + """Filter PSMs by precursor m/z range.""" + return PSMList( + psm_list=[ + psm + for psm in psms + if self.min_precursor_mz <= psm.peptidoform.theoretical_mz <= self.max_precursor_mz + ] + ) + + def _digest_fasta(self, processes: int = 1): + """Digest FASTA file to peptides and populate search space.""" + # Convert to string to avoid issues with Path objects + self.fasta_file = str(self.fasta_file) + n_proteins = _count_fasta_entries(self.fasta_file) + if self.add_decoys: + fasta_db = pyteomics.fasta.decoy_db( + self.fasta_file, + mode="reverse", + decoy_only=False, + keep_nterm=True, + ) + n_proteins *= 2 + else: + fasta_db = pyteomics.fasta.FASTA(self.fasta_file) + + # Read proteins and digest to peptides + with _get_pool(processes) as pool: + partial_digest_protein = partial( + _digest_single_protein, + min_length=self.min_length, + max_length=self.max_length, + cleavage_rule=self.cleavage_rule, + missed_cleavages=self.missed_cleavages, + semi_specific=self.semi_specific, + ) + results = track( + pool.imap(partial_digest_protein, fasta_db), + total=n_proteins, + description="Digesting proteins...", + transient=True, + ) + self._peptidoform_spaces = list(chain.from_iterable(results)) + + def _remove_redundancy(self): + """Remove redundancy in peptides and combine protein lists.""" + peptide_dict = dict() + for peptide in track( + self._peptidoform_spaces, + description="Removing peptide redundancy...", + transient=True, + ): + if peptide.sequence in peptide_dict: + peptide_dict[peptide.sequence].proteins.extend(peptide.proteins) + else: + peptide_dict[peptide.sequence] = peptide + + # Overwrite with non-redundant peptides + self._peptidoform_spaces = list(peptide_dict.values()) + + def _add_modifications(self, processes: int = 1): + """Add modifications to peptides in search space.""" + modifications_by_target = _restructure_modifications_by_target(self.modifications) + modification_options = [] + with _get_pool(processes) as pool: + partial_get_modification_versions = partial( + _get_peptidoform_modification_versions, + modifications=self.modifications, + modifications_by_target=modifications_by_target, + max_variable_modifications=self.max_variable_modifications, + ) + modification_options = pool.imap( + partial_get_modification_versions, self._peptidoform_spaces + ) + for pep, mod_opt in track( + zip(self._peptidoform_spaces, modification_options), + description="Adding modifications...", + total=len(self._peptidoform_spaces), + transient=True, + ): + pep.modification_options = mod_opt + + def _add_charges(self): + """Add charge permutations to peptides in search space.""" + for peptide in track( + self._peptidoform_spaces, + description="Adding charge permutations...", + transient=True, + ): + peptide.charge_options = self.charges + + +class _PeptidoformSearchSpace(BaseModel): + """Search space for a given amino acid sequence.""" + + sequence: str + proteins: List[str] + is_n_term: Optional[bool] = None + is_c_term: Optional[bool] = None + modification_options: List[Dict[int, ModificationConfig]] = [] + charge_options: List[int] = [] + + def __init__(self, **data: Any): + """ + Search space for a given amino acid sequence. + + Parameters + ---------- + sequence + Amino acid sequence of the peptidoform. + proteins + List of protein IDs containing the peptidoform. + is_n_term + Whether the peptidoform is an N-terminal peptide. Default is None. + is_c_term + Whether the peptidoform is a C-terminal peptide. Default is None. + modification_options + List of dictionaries with modification positions and configurations. Default is []. + charge_options + List of charge states to consider. Default is []. + + """ + super().__init__(**data) + + def __len__(self): + return len(self.modification_options) * len(self.charge_options) + + def __iter__(self) -> Generator[str, None, None]: + """Yield peptidoform strings with given charges and modifications.""" + if not self.charge_options: + raise ValueError("Peptide charge options not defined.") + if not self.modification_options: + raise ValueError("Peptide modification options not defined.") + + for modifications, charge in product(self.modification_options, self.charge_options): + yield self._construct_peptidoform_string(self.sequence, modifications, charge) + + @staticmethod + def _construct_peptidoform_string( + sequence: str, modifications: Dict[int, ModificationConfig], charge: int + ) -> str: + if not modifications: + return f"{sequence}/{charge}" + + modded_sequence = list(sequence) + for position, mod in modifications.items(): + if isinstance(position, int): + aa = modded_sequence[position] + if aa != mod.amino_acid: + raise ValueError( + f"Modification {mod.label} at position {position} does not match amino " + f"acid {aa}." + ) + modded_sequence[position] = f"{aa}[{mod.label}]" + elif position == "N": + modded_sequence.insert(0, f"[{mod.label}]-") + elif position == "C": + modded_sequence.append(f"-[{mod.label}]") + else: + raise ValueError(f"Invalid position {position} for modification {mod.label}.") + + return f"{''.join(modded_sequence)}/{charge}" + + +def _digest_single_protein( + protein: pyteomics.fasta.Protein, + min_length: int = 8, + max_length: int = 30, + cleavage_rule: str = "trypsin", + missed_cleavages: int = 2, + semi_specific: bool = False, +) -> List[_PeptidoformSearchSpace]: + """Digest protein sequence and return a list of validated peptides.""" + + def valid_residues(sequence: str) -> bool: + return not any(aa in sequence for aa in ["B", "J", "O", "U", "X", "Z"]) + + def parse_peptide( + start_position: int, + sequence: str, + protein: pyteomics.fasta.Protein, + ) -> _PeptidoformSearchSpace: + """Parse result from parser.icleave into Peptide.""" + return _PeptidoformSearchSpace( + sequence=sequence, + # Assumes protein ID is description until first space + proteins=[protein.description.split(" ")[0]], + is_n_term=start_position == 0, + is_c_term=start_position + len(sequence) == len(protein.sequence), + ) + + peptides = [ + parse_peptide(start, seq, protein) + for start, seq in icleave( + protein.sequence, + cleavage_rule, + missed_cleavages=missed_cleavages, + min_length=min_length, + max_length=max_length, + semi=semi_specific, + ) + if valid_residues(seq) + ] + + return peptides + + +def _count_fasta_entries(filename: Path) -> int: + """Count the number of entries in a FASTA file.""" + with open(filename, "rt") as f: + count = 0 + for line in f: + if line[0] == ">": + count += 1 + return count + + +def _restructure_modifications_by_target( + modifications: List[ModificationConfig], +) -> Dict[str, Dict[str, List[ModificationConfig]]]: + """Restructure variable modifications to options per side chain or terminus.""" + modifications_by_target = { + "sidechain": defaultdict(lambda: []), + "peptide_n_term": defaultdict(lambda: []), + "peptide_c_term": defaultdict(lambda: []), + "protein_n_term": defaultdict(lambda: []), + "protein_c_term": defaultdict(lambda: []), + } + + def add_mod(mod, target, amino_acid): + if amino_acid: + modifications_by_target[target][amino_acid].append(mod) + else: + modifications_by_target[target]["any"].append(mod) + + for mod in modifications: + if mod.fixed: + continue + if mod.peptide_n_term: + add_mod(mod, "peptide_n_term", mod.amino_acid) + elif mod.peptide_c_term: + add_mod(mod, "peptide_c_term", mod.amino_acid) + elif mod.protein_n_term: + add_mod(mod, "protein_n_term", mod.amino_acid) + elif mod.protein_c_term: + add_mod(mod, "protein_c_term", mod.amino_acid) + else: + add_mod(mod, "sidechain", mod.amino_acid) + + return {k: dict(v) for k, v in modifications_by_target.items()} + + +def _get_modification_possibilities_by_site( + peptide: _PeptidoformSearchSpace, + modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]], + modifications: List[ModificationConfig], +) -> Dict[Union[str, int], List[ModificationConfig]]: + """Get all possible modifications for each site in a peptide sequence.""" + possibilities_by_site = defaultdict(list) + + # Generate dictionary of positions per amino acid + position_dict = defaultdict(list) + for pos, aa in enumerate(peptide.sequence): + position_dict[aa].append(pos) + # Map modifications to positions + for aa in set(position_dict).intersection(set(modifications_by_target["sidechain"])): + possibilities_by_site.update( + {pos: modifications_by_target["sidechain"][aa] for pos in position_dict[aa]} + ) + + # Assign possible modifications per terminus + for terminus, position, site_name, specificity in [ + ("peptide_n_term", 0, "N", None), + ("peptide_c_term", -1, "C", None), + ("protein_n_term", 0, "N", "is_n_term"), + ("protein_c_term", -1, "C", "is_c_term"), + ]: + if specificity is None or getattr(peptide, specificity): + for site, mods in modifications_by_target[terminus].items(): + if site == "any" or peptide.sequence[position] == site: + possibilities_by_site[site_name].extend(mods) + + # Override with fixed modifications + for mod in modifications: + aa = mod.amino_acid + # Skip variable modifications + if not mod.fixed: + continue + # Assign if specific aa matches or if no aa is specified for each terminus + for terminus, position, site_name, specificity in [ + ("peptide_n_term", 0, "N", None), + ("peptide_c_term", -1, "C", None), + ("protein_n_term", 0, "N", "is_n_term"), + ("protein_c_term", -1, "C", "is_c_term"), + ]: + if getattr(mod, terminus): # Mod has this terminus + if specificity is None or getattr(peptide, specificity): # Specificity matches + if not aa or (aa and peptide.sequence[position] == aa): # AA matches + possibilities_by_site[site_name] = [mod] # Override with fixed mod + break # Allow `else: if amino_acid` if no terminus matches + # Assign if fixed modification is not terminal and specific aa matches + else: + if aa: + for pos in position_dict[aa]: + possibilities_by_site[pos] = [mod] + + return possibilities_by_site + + +def _get_peptidoform_modification_versions( + peptide: _PeptidoformSearchSpace, + modifications: List[ModificationConfig], + modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]], + max_variable_modifications: int = 3, +) -> List[Dict[Union[str, int], List[ModificationConfig]]]: + """ + Get all potential combinations of modifications for a peptide sequence. + + Examples + -------- + >>> peptide = PeptidoformSpace(sequence="PEPTIDE", proteins=["PROTEIN"]) + >>> phospho = ModificationConfig(label="Phospho", amino_acid="T", fixed=False) + >>> acetyl = ModificationConfig(label="Acetyl", peptide_n_term=True, fixed=False) + >>> modifications = [phospho, acetyl] + >>> modifications_by_target = { + ... "sidechain": {"S": [modifications[0]]}, + ... "peptide_n_term": {"any": [modifications[1]]}, + ... "peptide_c_term": {"any": []}, + ... "protein_n_term": {"any": []}, + ... "protein_c_term": {"any": []}, + ... } + >>> _get_modification_versions(peptide, modifications, modifications_by_target) + [{}, {3: phospho}, {0: acetyl}, {0: acetyl, 3: phospho}] + + """ + # Get all possible modifications for each site in the peptide sequence + possibilities_by_site = _get_modification_possibilities_by_site( + peptide, modifications_by_target, modifications + ) + + # Separate fixed and variable modification sites + fixed_modifications = {} + variable_sites = [] + for site, mods in possibilities_by_site.items(): + for mod in mods: + if mod.fixed: + fixed_modifications[site] = mod + else: + variable_sites.append((site, mod)) + + # Generate all combinations of variable modifications up to the maximum allowed + modification_versions = [] + for i in range(max_variable_modifications + 1): + for comb in combinations(variable_sites, i): + combination_dict = fixed_modifications.copy() + for site, mod in comb: + combination_dict[site] = mod + modification_versions.append(combination_dict) + + return modification_versions + + +def _get_pool(processes: int) -> Union[multiprocessing.Pool, multiprocessing.dummy.Pool]: + """Get a multiprocessing pool with the given number of processes.""" + # TODO: fix None default value for processes + if processes > 1: + return multiprocessing.Pool(processes) + else: + return multiprocessing.dummy.Pool(processes) diff --git a/tests/test_data/search_space_config.json b/tests/test_data/search_space_config.json new file mode 100644 index 00000000..65970077 --- /dev/null +++ b/tests/test_data/search_space_config.json @@ -0,0 +1,14 @@ +{ + "fasta_file": "tests/test_data/test.fasta", + "min_length": 7, + "max_length": 30, + "min_precursor_mz": 400, + "max_precursor_mz": 2000, + "cleavage_rule": "trypsin", + "missed_cleavages": 2, + "semi_specific": false, + "add_decoys": false, + "modifications": [], + "max_variable_modifications": 3, + "charges": [2, 3] +} diff --git a/tests/test_data/test.fasta b/tests/test_data/test.fasta new file mode 100644 index 00000000..8683d577 --- /dev/null +++ b/tests/test_data/test.fasta @@ -0,0 +1,2 @@ +>P12345 +MYSSCSLLQRLVWFPFLALVATQLLFIRNVSSLNLTNEYLHHKCLVSEGKYKPGSKYEYI diff --git a/tests/test_fasta2speclib.py b/tests/test_fasta2speclib.py deleted file mode 100644 index 60930c1c..00000000 --- a/tests/test_fasta2speclib.py +++ /dev/null @@ -1,185 +0,0 @@ -"""Tests for fasta2speclib.""" - -from pyteomics.fasta import Protein - -from fasta2speclib.fasta2speclib import Fasta2SpecLib, ModificationConfig, Peptide - -MODIFICATION_CONFIG = [ - { - "name": "Oxidation", - "mass_shift": 15.9994, - "amino_acid": "M", - }, - { - "name": "Carbamidomethyl", - "mass_shift": 57.0513, - "amino_acid": "C", - "fixed": True, - }, - { - "name": "Glu->pyro-Glu", - "mass_shift": -18.010565, - "amino_acid": "E", - "peptide_n_term": True, - }, - { - "name": "Acetyl", - "mass_shift": 42.010565, - "amino_acid": None, - "protein_n_term": True, - }, -] -MODIFICATION_CONFIG = [ModificationConfig(**mod) for mod in MODIFICATION_CONFIG] - - -def test_get_modifications_by_target(): - modifications_by_target = Fasta2SpecLib._get_modifications_by_target(MODIFICATION_CONFIG) - assert modifications_by_target["sidechain"] == {"M": [None] + MODIFICATION_CONFIG[0:1]} - assert modifications_by_target["peptide_n_term"] == {"E": [None] + MODIFICATION_CONFIG[2:3]} - assert modifications_by_target["peptide_c_term"] == {} - assert modifications_by_target["protein_n_term"] == {"any": [None] + MODIFICATION_CONFIG[3:4]} - assert modifications_by_target["protein_c_term"] == {} - - -def test_get_modification_versions(): - modification_config = [ - ModificationConfig( - **{ - "name": "Oxidation", - "mass_shift": 15.9994, - "amino_acid": "M", - } - ), - ModificationConfig( - **{ - "name": "Carbamidomethyl", - "mass_shift": 57.0513, - "amino_acid": "C", - "fixed": True, - } - ), - ModificationConfig( - **{ - "name": "Glu->pyro-Glu", - "mass_shift": -18.010565, - "amino_acid": "E", - "protein_n_term": True, - } - ), - ] - modifications_by_target = Fasta2SpecLib._get_modifications_by_target(modification_config) - - test_cases = [ - ("ADEF", {""}), # None - ("ACDE", {"2|Carbamidomethyl"}), # Single fixed - ("ACCDE", {"2|Carbamidomethyl|3|Carbamidomethyl"}), # Double fixed - ("ADME", {"", "3|Oxidation"}), # Single variable - ( - "ADMME", - {"", "3|Oxidation", "4|Oxidation", "3|Oxidation|4|Oxidation"}, - ), # Double variable - ( - "ADMMME", - { - "", - "3|Oxidation", - "4|Oxidation", - "5|Oxidation", - "3|Oxidation|4|Oxidation", - "4|Oxidation|5|Oxidation", - "3|Oxidation|5|Oxidation", - }, - ), # More than maximum simultaneous mods should be ignored - ("EDEF", {"", "0|Glu->pyro-Glu"}), # N-term and AA-specific - ] - - for peptide, expected_output in test_cases: - output = Fasta2SpecLib._get_modification_versions( - Peptide(sequence=peptide, is_n_term=True, proteins=[]), - modification_config, - modifications_by_target, - max_variable_modifications=2, - ) - assert set(output) == expected_output - - -def test_digest_protein(): - test_input = { - "protein": Protein( - description="P12345", - sequence="MYSSCSLLQRLVWFPFLALVATQLLFIRNVSSLNLTNEYLHHKCLVSEGKYKPGSKYEYI", - ), - "min_length": 8, - "max_length": 30, - "cleavage_rule": "trypsin", - "missed_cleavages": 2, - "semi_specific": False, - } - - test_output = [ - Peptide( - sequence="MYSSCSLLQR", - proteins=["P12345"], - modification_options=None, - is_n_term=True, - is_c_term=False, - ), - Peptide( - sequence="MYSSCSLLQRLVWFPFLALVATQLLFIR", - proteins=["P12345"], - modification_options=None, - is_n_term=True, - is_c_term=False, - ), - Peptide( - sequence="LVWFPFLALVATQLLFIR", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=False, - ), - Peptide( - sequence="NVSSLNLTNEYLHHK", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=False, - ), - Peptide( - sequence="NVSSLNLTNEYLHHKCLVSEGK", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=False, - ), - Peptide( - sequence="NVSSLNLTNEYLHHKCLVSEGKYKPGSK", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=False, - ), - Peptide( - sequence="CLVSEGKYKPGSK", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=False, - ), - Peptide( - sequence="CLVSEGKYKPGSKYEYI", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=True, - ), - Peptide( - sequence="YKPGSKYEYI", - proteins=["P12345"], - modification_options=None, - is_n_term=False, - is_c_term=True, - ), - ] - - assert test_output == Fasta2SpecLib._digest_protein(**test_input) diff --git a/tests/test_predictions.py b/tests/test_predictions.py deleted file mode 100644 index e6eed081..00000000 --- a/tests/test_predictions.py +++ /dev/null @@ -1,79 +0,0 @@ -import os - -import pandas as pd - -from ms2pip.ms2pipC import MS2PIP - - -TEST_DIR = os.path.dirname(__file__) - - -def run_ms2pip(): - """Run ms2pipC to predict peak intensities from a PEPREC file (HCD model). """ - params = { - "ms2pip": { - "ptm": [ - "Oxidation,15.994915,opt,M", - "Carbamidomethyl,57.021464,opt,C", - "Acetyl,42.010565,opt,N-term", - ], - "sptm": [], - "gptm": [], - "frag_method": "HCD2019", - "frag_error": 0.02, - "out": "csv", - } - } - ms2pip = MS2PIP(os.path.join(TEST_DIR, "test_data/test.peprec"), params=params) - ms2pip.run() - - test_data = pd.read_csv( - os.path.join(TEST_DIR, "test_data/test_HCD2019_predictions.csv") - ) - target_data = pd.read_csv( - os.path.join(TEST_DIR, "test_data/target_HCD2019_predictions.csv") - ) - pepfile = pd.read_csv( - os.path.join(TEST_DIR, "test_data/test.peprec"), - sep=" ", - index_col=False, - dtype={"spec_id": str, "modifications": str}, - ) - return test_data, target_data, pepfile - - -TEST_DATA, TARGET_DATA, PEPFILE = run_ms2pip() - - -class TestPredictions: - def test_all_spec(self): - assert set(TEST_DATA.spec_id.unique()) == set(PEPFILE.spec_id) - - def test_amount_peaks(self): - for pep in ["peptide1", "peptide2", "peptide3"]: - peplen = len(PEPFILE[PEPFILE.spec_id == pep].peptide.values[0]) - assert len(TEST_DATA[TEST_DATA.spec_id == pep]) == (2 * peplen) - 2 - - def test_peak_ints_b(self): - for pep in TARGET_DATA.spec_id.unique(): - tmp_test = TEST_DATA[TEST_DATA.spec_id == pep] - tmp_test = tmp_test[tmp_test.ion == "b"] - tmp_target = TARGET_DATA[TARGET_DATA.spec_id == pep] - tmp_target = tmp_target[tmp_target.ion == "b"] - for no in tmp_target.ionnumber: - assert ( - tmp_test[tmp_test.ionnumber == no]["prediction"].values[0] - == tmp_target[tmp_target.ionnumber == no]["prediction"].values[0] - ) - - def test_peak_ints_y(self): - for pep in TARGET_DATA.spec_id.unique(): - tmp_test = TEST_DATA[TEST_DATA.spec_id == pep] - tmp_test = tmp_test[tmp_test.ion == "y"] - tmp_target = TARGET_DATA[TARGET_DATA.spec_id == pep] - tmp_target = tmp_target[tmp_target.ion == "y"] - for no in tmp_target.ionnumber: - assert ( - tmp_test[tmp_test.ionnumber == no]["prediction"].values[0] - == tmp_target[tmp_target.ionnumber == no]["prediction"].values[0] - ) diff --git a/tests/test_retention_time.py b/tests/test_retention_time.py deleted file mode 100644 index 8be7fbaa..00000000 --- a/tests/test_retention_time.py +++ /dev/null @@ -1,35 +0,0 @@ -import os - -import numpy as np -import pandas as pd - -from ms2pip.retention_time import RetentionTime -from ms2pip.config_parser import ConfigParser - - -TEST_DIR = os.path.dirname(__file__) - -class TestRetentionTime: - def test_prepare_deeplc_peptide_df(self): - peprec = pd.read_csv(os.path.join(TEST_DIR, "test_data/test.peprec"), sep=" ") - config = { - "deeplc": { - "calibration_file": False, - "verbose": False, - "path_model": False, - "split_cal": 25, - "batch_num": 350000, - } - } - - rt_predictor = RetentionTime(config=config) - rt_predictor.peprec = peprec - rt_predictor._prepare_deeplc_peptide_df() - dlc_df = rt_predictor.deeplc_pep_df - - assert dlc_df.equals( - pd.DataFrame({ - "seq": {0: "ACDE", 1: "ACDEFGHI", 2: "ACDEFGHIKMNPQ"}, - "modifications": {0: np.nan, 1: np.nan, 2: np.nan}, - }) - ) diff --git a/tests/test_search_space.py b/tests/test_search_space.py new file mode 100644 index 00000000..d3479093 --- /dev/null +++ b/tests/test_search_space.py @@ -0,0 +1,234 @@ +from ms2pip import search_space + +OXIDATION = search_space.ModificationConfig( + label="Oxidation", + amino_acid="M", +) +CARBAMIDOMETHYL = search_space.ModificationConfig( + label="Carbamidomethyl", + amino_acid="C", + fixed=True, +) +PYROGLU = search_space.ModificationConfig( + label="Glu->pyro-Glu", + amino_acid="E", + peptide_n_term=True, +) +ACETYL = search_space.ModificationConfig( + label="Acetyl", + amino_acid=None, + protein_n_term=True, +) +PHOSPHO = search_space.ModificationConfig( + label="Phospho", + amino_acid="T", + fixed=False, +) + +MODIFICATION_CONFIG = [OXIDATION, CARBAMIDOMETHYL, PYROGLU, ACETYL] + + +def test_restructure_modifications_by_target(): + test_cases = [ + { + "modifications": [PHOSPHO, ACETYL], + "expected": { + "sidechain": {"T": [PHOSPHO]}, + "peptide_n_term": {}, + "peptide_c_term": {}, + "protein_n_term": {"any": [ACETYL]}, + "protein_c_term": {}, + }, + }, + { + "modifications": [CARBAMIDOMETHYL, ACETYL], + "expected": { + "sidechain": {}, + "peptide_n_term": {}, + "peptide_c_term": {}, + "protein_n_term": {"any": [ACETYL]}, + "protein_c_term": {}, + }, + }, + ] + + for case in test_cases: + test_out = search_space._restructure_modifications_by_target(case["modifications"]) + assert test_out == case["expected"] + + +def test_get_peptidoform_modification_versions(): + test_cases = [ + # None + { + "sequence": "PEPTIDE", + "modifications": [], + "expected": [{}], + }, + # Single fixed + { + "sequence": "ACDE", + "modifications": [CARBAMIDOMETHYL], + "expected": [{1: CARBAMIDOMETHYL}], + }, + # Double fixed + { + "sequence": "ACCDE", + "modifications": [CARBAMIDOMETHYL], + "expected": [{1: CARBAMIDOMETHYL, 2: CARBAMIDOMETHYL}], + }, + # Single variable + { + "sequence": "ADME", + "modifications": [OXIDATION], + "expected": [{}, {2: OXIDATION}], + }, + # Double variable + { + "sequence": "ADMME", + "modifications": [OXIDATION], + "expected": [{}, {2: OXIDATION}, {3: OXIDATION}, {2: OXIDATION, 3: OXIDATION}], + }, + # More than maximum simultaneous mods should be ignored + { + "sequence": "ADMMME", + "modifications": [OXIDATION], + "expected": [ + {}, + {2: OXIDATION}, + {3: OXIDATION}, + {4: OXIDATION}, + {2: OXIDATION, 3: OXIDATION}, + {2: OXIDATION, 4: OXIDATION}, + {3: OXIDATION, 4: OXIDATION}, + ], + }, + # N-term and AA-specific + { + "sequence": "EDEF", + "modifications": [PYROGLU], + "expected": [{}, {"N": PYROGLU}], + }, + { + "sequence": "PEPTIDE", + "modifications": [PHOSPHO, ACETYL], + "expected": [{}, {3: PHOSPHO}, {"N": ACETYL}, {"N": ACETYL, 3: PHOSPHO}], + }, + { + "sequence": "ACDEK", + "modifications": [CARBAMIDOMETHYL, ACETYL], + "expected": [ + {1: CARBAMIDOMETHYL}, + {1: CARBAMIDOMETHYL, "N": ACETYL}, + ], + }, + ] + + for case in test_cases: + peptide = search_space._PeptidoformSearchSpace( + sequence=case["sequence"], proteins=[], is_n_term=True + ) + modifications_by_target = search_space._restructure_modifications_by_target( + case["modifications"] + ) + test_out = search_space._get_peptidoform_modification_versions( + peptide, + case["modifications"], + modifications_by_target, + max_variable_modifications=2, + ) + + assert test_out == case["expected"] + + +def test_get_modifications_by_target(): + modifications_by_target = search_space._restructure_modifications_by_target( + MODIFICATION_CONFIG + ) + assert modifications_by_target["sidechain"] == {"M": MODIFICATION_CONFIG[0:1]} + assert modifications_by_target["peptide_n_term"] == {"E": MODIFICATION_CONFIG[2:3]} + assert modifications_by_target["peptide_c_term"] == {} + assert modifications_by_target["protein_n_term"] == {"any": MODIFICATION_CONFIG[3:4]} + assert modifications_by_target["protein_c_term"] == {} + + +class TestProteomeSearchSpace: + def test_digest_fasta(self): + test_input = { + "fasta_file": "tests/test_data/test.fasta", + "min_length": 8, + "max_length": 30, + "cleavage_rule": "trypsin", + "missed_cleavages": 2, + "semi_specific": False, + } + + test_output = [ + search_space._PeptidoformSearchSpace( + sequence="MYSSCSLLQR", + proteins=["P12345"], + modification_options=[], + is_n_term=True, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="MYSSCSLLQRLVWFPFLALVATQLLFIR", + proteins=["P12345"], + modification_options=[], + is_n_term=True, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="LVWFPFLALVATQLLFIR", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="NVSSLNLTNEYLHHK", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="NVSSLNLTNEYLHHKCLVSEGK", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="NVSSLNLTNEYLHHKCLVSEGKYKPGSK", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="CLVSEGKYKPGSK", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=False, + ), + search_space._PeptidoformSearchSpace( + sequence="CLVSEGKYKPGSKYEYI", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=True, + ), + search_space._PeptidoformSearchSpace( + sequence="YKPGSKYEYI", + proteins=["P12345"], + modification_options=[], + is_n_term=False, + is_c_term=True, + ), + ] + + sp = search_space.ProteomeSearchSpace(**test_input) + sp._digest_fasta() + assert test_output == sp._peptidoform_spaces