From e9b42d7ba665c615cfbf6175491970c967a70ba2 Mon Sep 17 00:00:00 2001 From: Markus Johansson Date: Fri, 3 Jan 2025 16:21:31 +0100 Subject: [PATCH] WIP new sample config input --- prp/cli.py | 49 +++++++++++- prp/models/config.py | 58 ++++++++++++++ prp/parse/__init__.py | 2 + prp/parse/sample.py | 170 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 prp/models/config.py create mode 100644 prp/parse/sample.py diff --git a/prp/cli.py b/prp/cli.py index edbe6f0..be58d47 100644 --- a/prp/cli.py +++ b/prp/cli.py @@ -10,6 +10,7 @@ import pysam from cyvcf2 import VCF, Writer from pydantic import TypeAdapter, ValidationError +import yaml from prp import VERSION as __version__ @@ -17,6 +18,7 @@ from .models.phenotype import ElementType from .models.qc import QcMethodIndex, QcSoftware from .models.sample import MethodIndex, PipelineResult, ReferenceGenome, IgvAnnotationTrack +from .models.config import SampleConfig from .parse import ( load_variants, parse_alignment_results, @@ -37,6 +39,7 @@ parse_tbprofiler_lineage_results, parse_virulencefinder_stx_typing, parse_virulencefinder_vir_pred, + parse_sample ) from .parse.phenotype.tbprofiler import ( EXPECTED_SCHEMA_VERSION as EXPECTED_TBPROFILER_SCHEMA_VERSION, @@ -50,6 +53,23 @@ OUTPUT_SCHEMA_VERSION = 1 +class SampleConfigFile(click.ParamType): + name = "config" + + def convert(self, value, param, ctx): + """Convert string path to yaml object.""" + # verify input is path to existing file + try: + cnf_path = Path(value) + if not cnf_path.is_file(): + raise FileNotFoundError(f"file not found, please check the path.") + except TypeError as error: + raise TypeError(f"value should be a str not '{type(value)}'") from error + # load yaml and cast to pydantic model + with cnf_path.open() as cfile: + data = yaml.safe_load(cfile) + return SampleConfig(**data) + @click.group() @click.version_option(__version__) @@ -69,6 +89,33 @@ def cli(silent, debug): ) +@cli.command() +@click.option("-s", "--sample", 'sample_cnf', type=SampleConfigFile(), required=True, help="Sample configuration with results.") +@click.option("-o", "--output", type=click.Path(), help="Path to result.") +def parse(sample_cnf, output): + """Parse JASEN resulst and write as concatinated file in json format.""" + LOG.info("Start generating pipeline result json") + try: + sample_obj = parse_sample(sample_cnf) + except ValidationError as err: + click.secho("Generated result failed validation", fg="red") + click.secho(err) + raise click.Abort + + # Either wrtie to stdout or to file + dump = sample_obj.model_dump_json(indent=2) + if output is None: + print(dump) + else: + LOG.info("Storing results to: %s", output) + try: + with open(output, "w", encoding="utf-8") as fout: + fout.write(dump) + except Exception as error: + raise click.Abort('Error writing results file') + click.secho("Finished generating pipeline output", fg="green") + + @cli.command() @click.option("-i", "--sample-id", required=True, help="Sample identifier") @click.option( @@ -296,7 +343,7 @@ def create_bonsai_input( ) ) # parse mykrobe result - amr_res = parse_mykrobe_amr_pred(pred_res) + amr_res = parse_mykrobe_amr_pred(mykrobe) if amr_res is not None: results["element_type_result"].append(amr_res) diff --git a/prp/models/config.py b/prp/models/config.py new file mode 100644 index 0000000..35d805c --- /dev/null +++ b/prp/models/config.py @@ -0,0 +1,58 @@ +"""Sample configuration with paths to output files.""" + +from .base import RWModel + +from pydantic import Field, FilePath +from typing import List +from pathlib import Path + + +class IgvAnnotation(RWModel): + name: str + type: str + uri: str + index_uri: str | None = None + + +class SampleConfig(RWModel): + """Sample information with metadata and results files.""" + + # Sample information + sample_id: str = Field(..., alias="sampleId", min_length=3, max_length=100) + sample_name: str + lims_id: str + + # Bonsai paramters + groups: List[str] = [] + + # Reference genome + ref_genome_sequence: Path + ref_genome_annotation: Path + + igv_annotations: List[IgvAnnotation] = [] + + # Jasen result files + nextflow_run_info: FilePath + process_metadata: List[FilePath] = [] # stores versions of tools and databases + software_info: List[FilePath] = [] # store sw and db version info + + ## Classification + kraken: FilePath + + ## QC + quast: FilePath + postalnqc: FilePath | None = None + + ## typing + pymlst: FilePath | None = None + chewbbaca: FilePath | None = None + serotypefinder: FilePath | None = None + shigapass: FilePath | None = None + emmtyper: FilePath | None = None + + ## resistance, virulence etc + amrfinder: FilePath | None = None + resfinder: FilePath | None = None + virulencefinder: FilePath | None = None + mykrobe: FilePath | None = None + tbprofiler: FilePath | None = None \ No newline at end of file diff --git a/prp/parse/__init__.py b/prp/parse/__init__.py index b7eb5ae..5630c87 100644 --- a/prp/parse/__init__.py +++ b/prp/parse/__init__.py @@ -21,3 +21,5 @@ parse_virulencefinder_stx_typing, ) from .variant import load_variants + +from .sample import parse_sample \ No newline at end of file diff --git a/prp/parse/sample.py b/prp/parse/sample.py new file mode 100644 index 0000000..7d7511c --- /dev/null +++ b/prp/parse/sample.py @@ -0,0 +1,170 @@ +"""Parse for input config using parsers from this module.""" + +import logging +import json +from typing import Sequence + +from .metadata import parse_run_info +from .qc import parse_quast_results, parse_postalignqc_results +from .species import parse_kraken_result, get_mykrobe_spp_prediction, SppMethodIndex +from .typing import ( + parse_cgmlst_results, + parse_mlst_results, + parse_virulencefinder_stx_typing, + parse_serotypefinder_oh_typing, + parse_mykrobe_lineage_results, + parse_tbprofiler_lineage_results, +) +from .phenotype.resfinder import parse_resfinder_amr_pred, AMRMethodIndex +from .phenotype.amrfinder import parse_amrfinder_amr_pred, parse_amrfinder_vir_pred +from .phenotype.virulencefinder import parse_virulencefinder_vir_pred, VirulenceMethodIndex +from .phenotype.shigapass import parse_shigapass_pred, ShigaTypingMethodIndex +from .phenotype.emmtyper import parse_emmtyper_pred, EmmTypingMethodIndex +from .phenotype import mykrobe +from .phenotype import tbprofiler + +from ..models.phenotype import ElementType +from ..models.sample import MethodIndex, QcMethodIndex, PipelineResult + +OUTPUT_SCHEMA_VERSION = 1 + +LOG = logging.getLogger(__name__) + +def _read_qc(smp_cnf) -> Sequence[QcMethodIndex]: + """Read all qc related info""" + qc_results = [] + if smp_cnf.quast: + qc_results.append(parse_quast_results(smp_cnf.quast)) + + if smp_cnf.postalnqc: + qc_results.append(parse_postalignqc_results(smp_cnf.postalnqc)) + return qc_results + + +def _read_spp_prediction(smp_cnf) -> Sequence[SppMethodIndex]: + """Read all species prediction results.""" + spp_results = [] + if smp_cnf.kraken: + spp_results.append(parse_kraken_result(smp_cnf.kraken)) + + # TODO refactor lineage and species to use path instead of dict as input + if smp_cnf.mykrobe: + raw_result = mykrobe._read_result(smp_cnf.mykrobe) + spp_results.append(get_mykrobe_spp_prediction(raw_result)) + return spp_results + + +def _read_typing(smp_cnf) -> Sequence[MethodIndex | EmmTypingMethodIndex | ShigaTypingMethodIndex]: + """Read typing all information.""" + typing_result = [] + if smp_cnf.pymlst: + typing_result.append(parse_mlst_results(smp_cnf.pymlst)) + + if smp_cnf.chewbbaca: + # TODO add corrected_alleles to input + typing_result.append(parse_cgmlst_results(smp_cnf.chewbbaca)) + + if smp_cnf.emmtyper: + typing_result.extend(parse_emmtyper_pred(smp_cnf.emmtyper)) + + if smp_cnf.shigapass: + typing_result.append(parse_shigapass_pred(smp_cnf.shigapass)) + + # stx typing + if smp_cnf.virulencefinder: + tmp_virfinder_res: MethodIndex | None = parse_virulencefinder_stx_typing(smp_cnf.virulencefinder) + if tmp_virfinder_res is not None: + typing_result.append(tmp_virfinder_res) + + if smp_cnf.serotypefinder: + LOG.info("Parse serotypefinder results") + # OH typing + tmp_serotype_res: MethodIndex | None = parse_serotypefinder_oh_typing(smp_cnf.serotypefinder) + if tmp_serotype_res is not None: + typing_result.append(tmp_serotype_res) + + # TODO refactor lineage and species to use path instead of dict as input + if smp_cnf.mykrobe: + raw_result = mykrobe._read_result(smp_cnf.mykrobe) + lin_res: MethodIndex | None = parse_mykrobe_lineage_results(raw_result) + if lin_res is not None: + typing_result.append(lin_res) + + if smp_cnf.tbprofiler: + raw_result = tbprofiler._read_result(smp_cnf.tbprofiler) + typing_result.append(parse_tbprofiler_lineage_results(raw_result)) + + return typing_result + + +def _read_resistance(smp_cnf) -> Sequence[AMRMethodIndex]: + """Read resistance predictions.""" + resistance = [] + if smp_cnf.resfinder: + with smp_cnf.resfinder.open("r", encoding="utf-8") as resfinder_json: + pred_res = json.load(resfinder_json) + for method in [ElementType.AMR, ElementType.STRESS]: + resistance.append(parse_resfinder_amr_pred(pred_res, method)) + + if smp_cnf.amrfinder: + for method in [ElementType.AMR, ElementType.STRESS]: + resistance.append(parse_amrfinder_amr_pred(smp_cnf.amrfinder, method)) + + if smp_cnf.mykrobe: + tmp_res = mykrobe.parse_mykrobe_amr_pred(smp_cnf.mykrobe, smp_cnf.sample_id) + if tmp_res is not None: + resistance.append(tmp_res) + + if smp_cnf.tbprofiler: + # store pipeline version + resistance.append( + tbprofiler.parse_tbprofiler_amr_pred(smp_cnf.tbprofiler) + ) + return resistance + + +def _read_virulence(smp_cnf) -> Sequence[VirulenceMethodIndex]: + """Read virulence results.""" + virulence = [] + if smp_cnf.amrfinder: + virulence.append(parse_amrfinder_vir_pred(smp_cnf.amrfinder)) + + if smp_cnf.virulencefinder: + # virulence genes + raw_res: VirulenceMethodIndex | None = parse_virulencefinder_vir_pred( + smp_cnf.virulencefinder + ) + if raw_res is not None: + virulence.append(raw_res) + return virulence + + +def parse_sample(smp_cnf) -> PipelineResult: + """Parse sample config object into a combined result object.""" + sample_info, seq_info, pipeline_info = parse_run_info( + smp_cnf.nextflow_run_info, smp_cnf.process_metadata + ) + results = { + "sequencing": seq_info, + "pipeline": pipeline_info, + "qc": _read_qc(smp_cnf), + "species_prediction": _read_spp_prediction(smp_cnf), + "typing_result": _read_typing(smp_cnf), + "element_type_result": [], + **sample_info, # add sample_name & lims_id + } + # read versions of softwares + if smp_cnf.mykrobe: + results["pipeline"].softwares.append(mykrobe.get_version(smp_cnf.mykrobe)) + if smp_cnf.tbprofiler: + results["pipeline"].softwares.extend(tbprofiler.get_version(smp_cnf.tbprofiler)) + + # add amr and virulence + results["element_type_result"].extend( + [*_read_resistance(smp_cnf), *_read_virulence(smp_cnf)] + ) + + # verify data consistancy + return PipelineResult( + sample_id=smp_cnf.sample_id, schema_version=OUTPUT_SCHEMA_VERSION, **results + )