From 21f65405ed6a236ace1ec310b910f3af6737f0d4 Mon Sep 17 00:00:00 2001 From: Nathan Roach Date: Tue, 27 Dec 2022 15:09:19 -0500 Subject: [PATCH] Adding initial batch of samtools type hinted wrapper functions --- fgpyo/sam/__init__.py | 9 +- fgpyo/sam/builder.py | 19 +- fgpyo/samtools.py | 377 ++++++++++++++++++++ fgpyo/tests/test_samtools.py | 664 +++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 1055 insertions(+), 15 deletions(-) create mode 100644 fgpyo/samtools.py create mode 100644 fgpyo/tests/test_samtools.py diff --git a/fgpyo/sam/__init__.py b/fgpyo/sam/__init__.py index caceadac..f77b75a0 100644 --- a/fgpyo/sam/__init__.py +++ b/fgpyo/sam/__init__.py @@ -198,13 +198,14 @@ class SamFileType(enum.Enum): ext (str): The standard file extension for this file type. """ - def __init__(self, mode: str, ext: str) -> None: + def __init__(self, mode: str, ext: str, index_ext: Optional[str]) -> None: self.mode = mode self.extension = ext + self.index_extension = index_ext - SAM = ("", ".sam") - BAM = ("b", ".bam") - CRAM = ("c", ".cram") + SAM = ("", ".sam", None) + BAM = ("b", ".bam", ".bai") + CRAM = ("c", ".cram", ".crai") @classmethod def from_path(cls, path: Union[Path, str]) -> "SamFileType": diff --git a/fgpyo/sam/builder.py b/fgpyo/sam/builder.py index 69f0db1e..b93f9b34 100755 --- a/fgpyo/sam/builder.py +++ b/fgpyo/sam/builder.py @@ -27,6 +27,7 @@ from pysam import AlignmentHeader from fgpyo import sam +from fgpyo import samtools from fgpyo.sam import SamOrder @@ -588,18 +589,14 @@ def to_path( if pred(rec): writer.write(rec) - samtools_sort_args = ["-o", str(path), fp.name] - file_handle.close() - if self._sort_order == SamOrder.QueryName: - # Ignore type hints for now until we have wrappers to use here. - pysam.sort("-n", *samtools_sort_args) # type: ignore - elif self._sort_order == SamOrder.Coordinate: - # Ignore type hints for now until we have wrappers to use here. - if index: - samtools_sort_args.insert(0, "--write-index") - pysam.sort(*samtools_sort_args) # type: ignore - + if self._sort_order not in {SamOrder.Unsorted, SamOrder.Unknown}: + samtools.sort( + input=Path(fp.name), + output=path, + index_output=index, + sort_order=self._sort_order, + ) return path def __len__(self) -> int: diff --git a/fgpyo/samtools.py b/fgpyo/samtools.py new file mode 100644 index 00000000..189dbc5b --- /dev/null +++ b/fgpyo/samtools.py @@ -0,0 +1,377 @@ +""" +Type-Hinted Wrappers around pysam samtools dispatch functions. +-------------------------------------------------------------- + +Calls pysam samtools functions, with type hints for relevant parameters and return types. + +Examples +~~~~~~~~ +.. code-block:: python + + >>> from fgpyo import samtools + >>> Path("example1.sorted.bam.bai").exists() + False + >>> samtools.sort(input=Path("example1.bam"), output=Path("example1.sorted.bam")) + '' + >>> samtools.index(input=Path("example1.sorted.bam")) + '' + >>> Path("example1.sorted.bam.bai").exists() + True + +Module Contents +~~~~~~~~~~~~~~~ +The module contains the following public method definitions: + - :class:`~fgpyo.samtools.dict` -- Wrapper around pysam call to samtools dict with type hints + on inputs and return type + - :class:`~fgpyo.samtools.sort` -- Wrapper around pysam call to samtools sort with type hints + on inputs and return type + - :class:`~fgpyo.samtools.faidx` -- Wrapper around pysam call to samtools faidx with type hints + on inputs and return type + - :class:`~fgpyo.samtools.index` -- Wrapper around pysam call to samtools index with type hints + on inputs and return type +""" +import enum +from pathlib import Path +from typing import TYPE_CHECKING +from typing import List +from typing import Optional +from typing import Tuple + +from fgpyo.sam import SamFileType +from fgpyo.sam import SamOrder + + +def pysam_fn(*args: str) -> str: + """ + Type hinted import of an example function from the pysam samtools dispatcher. + """ + pass + + +if TYPE_CHECKING: + _pysam_dict = pysam_fn + _pysam_sort = pysam_fn + _pysam_faidx = pysam_fn + _pysam_index = pysam_fn +else: + from pysam import dict as _pysam_dict + from pysam import faidx as _pysam_faidx + from pysam import index as _pysam_index + from pysam import sort as _pysam_sort + + +def dict( + input: Path, + output: Path, + no_header: bool = False, + alias: bool = False, + assembly: Optional[str] = None, + alt: Optional[Path] = None, + species: Optional[str] = None, + uri: Optional[str] = None, +) -> str: + """ + Calls the `samtools` function `dict` using the pysam samtools dispatcher. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.dict() + + Args: + input: Path to the FASTA files to generate a sequence dictionary for. + output: Path to the file to write output to. + no_header: If true will not print the @HD header line. + assembly: The assembly for the AS tag. + alias: Adds an AN tag with the same value as the SN tag, except that a 'chr' prefix is + removed if SN has one or added if it does not. For mitochondria (i.e., when SN is “M” + or “MT”, with or without a “chr” prefix), also adds the remaining combinations of + “chr/M/MT” to the AN tag. + alt: Add an AH tag to each sequence listed in the specified bwa-style .alt file. These + files use SAM records to represent alternate locus sequences (as named in the QNAME + field) and their mappings to the primary assembly. + species: Specify the species for the SP tag. + uri: Specify the URI for the UR tag. Defaults to the absolute path of ref.fasta. + """ + + args: List[str] = ["--output", str(output)] + + if no_header: + args.append("--no-header") + + if alias: + args.append("--alias") + + if assembly is not None: + args.extend(["--assembly", assembly]) + + if alt is not None: + args.extend(["--alt", str(alt)]) + + if species is not None: + args.extend(["--species", species]) + + if uri is not None: + args.extend(["--uri", uri]) + + args.append(str(input)) + + return _pysam_dict(*args) + + +@enum.unique +class FaidxMarkStrand(enum.Enum): + RevComp = "rc" + No = "no" + Sign = "sign" + Custom = "custom" + + +def faidx( + input: Path, + output: Optional[Path] = None, + length: int = 60, + regions: Optional[List[str]] = None, + region_file: Optional[Path] = None, + fai_idx: Optional[Path] = None, + gzi_idx: Optional[Path] = None, + continue_if_non_existent: bool = False, + fastq: bool = False, + reverse_complement: bool = False, + mark_strand: FaidxMarkStrand = FaidxMarkStrand.RevComp, + custom_mark_strand: Optional[Tuple[str, str]] = None, +) -> str: + """ + Calls the `samtools` function `faidx` using the pysam samtools dispatcher. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.faidx() + + Args: + input: Path to the FAIDX files to index / read from. + output: Path to the file to write FASTA output. + length: Length of FASTA sequence line. + regions: regions to extract from the FASTA file in samtools region format + (:<1-based start>-<1-based end>) + region_file: Path to file containing regions to extract from the FASTA file in samtools + region format (:<1-based start>-<1-based end>). 1 region descriptor per line. + fai_idx: Read/Write to specified FAI index file. + gzi_idx: Read/Write to specified compressed file index (used with .gz files). + continue_if_non_existent: If true continue qorking if a non-existent region is requested. + fastq: Read FASTQ files and output extracted sequences in FASTQ format. Same as using + samtools fqidx. + reverse_complement: Output the sequence as the reverse complement. When this option is + used, “/rc” will be appended to the sequence names. To turn this off or change the + string appended, use the mark_strand parameter. + mark_strand: Append strand indicator to sequence name. Type to append can be one of: + FaidxMarkStrand.RevComp - Append '/rc' when writing the reverse complement. This is the + default. + + FaidxMarkStrand.No - Do not append anything. + + FaidxMarkStrand.Sign - Append '(+)' for forward strand or '(-)' for reverse + complement. This matches the output of “bedtools getfasta -s”. + + FaidxMarkStrand.Custom - custom,, + Append string to names when writing the forward strand and when writing the + reverse strand. Spaces are preserved, so it is possible to move the indicator into the + comment part of the description line by including a leading space in the strings + and . + custom_mark_strand: The custom strand indicators to use in in the Custom MarkStrand + setting. The first value of the tuple will be used as the positive strand indicator, + the second value will be used as the negative strand indicator. + """ + + mark_strand_str: str + if mark_strand == FaidxMarkStrand.Custom: + assert custom_mark_strand is not None, ( + "Cannot use custom mark strand without providing the custom strand indicators to " + + "`custom_mark_string`" + ) + mark_strand_str = f"{mark_strand.value},{custom_mark_strand[0]},{custom_mark_strand[1]}" + else: + mark_strand_str = mark_strand.value + + args: List[str] = ["--length", str(length), "--mark-strand", mark_strand_str] + + if output is not None: + args.extend(["--output", str(output)]) + + if continue_if_non_existent: + args.append("--continue") + if reverse_complement: + args.append("--reverse-complement") + if fastq: + args.append("--fastq") + + if fai_idx is not None: + args.extend(["--fai-idx", str(fai_idx)]) + + if gzi_idx is not None: + args.extend(["--gzi-idx", str(gzi_idx)]) + + if region_file is not None: + args.extend(["--region-file", str(region_file)]) + + args.append(str(input)) + + if regions is not None: + args.extend(regions) + + return _pysam_faidx(*args) + + +@enum.unique +class SamIndexType(enum.Enum): + BAI = "-b" + CSI = "-c" + + +def index( + input: Path, + # See https://github.com/pysam-developers/pysam/issues/1155 + # inputs: List[Path], Cant currently accept multiple + output: Optional[Path] = None, + threads: int = 0, + index_type: SamIndexType = SamIndexType.BAI, + csi_index_size: int = 14, +) -> str: + """ + Calls the `samtools` function `index` using the pysam samtools dispatcher. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.index() + + Args: + input: Path to the SAM/BAM/CRAM file to index. + output: Path to the file to write output. (Currently may only be used when exactly one + alignment file is being indexed.) + threads: Number of input/output compression threads to use in addition to main thread. + index_type: The type of index file to produce when indexing. + csi_index_size: Sets the minimum interval size of CSI indices to 2^INT. + """ + + # assert len(inputs) >= 1, "Must provide at least one input to samtools index." + + # if len(inputs) != 1: + # assert ( + # output is None + # ), "Output currently can only be used if there is exactly one input file being indexed" + # args = ["-M"] + # else: + # args = [] + args = [] + + if index_type != SamIndexType.BAI: + args.append(index_type.value) + + if index_type == SamIndexType.CSI: + args.extend(["-m", str(csi_index_size)]) + args.extend(["-@", str(threads)]) + args.append(str(input)) + if output is not None: + args.extend(["-o", str(output)]) + + return _pysam_index(*args) + + +def sort( + input: Path, + output: Path, + index_output: bool = True, + sort_unmapped_reads: bool = False, + kmer_size: int = 20, + compression_level: Optional[int] = None, + memory_per_thread: str = "768MB", + sort_order: SamOrder = SamOrder.Coordinate, + sort_tag: Optional[str] = None, + output_format: SamFileType = SamFileType.BAM, + tempfile_prefix: Optional[str] = None, + threads: int = 1, + no_pg: bool = False, +) -> str: + """ + Calls the `samtools` function sort using the pysam samtools dispatcher. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.sort() + + Args: + input: Path to the SAM/BAM/CRAM file to sort. + output: Path to the file to write output. + index_output: If true, creates an index for the output file. + sort_unmapped_reads: If true, sort unmapped reads by their sequence minimizer, reverse + complementing where appropriate. This has the effect of collating some similar data + together, improving the compressibility of the unmapped sequence. The minimiser kmer + size is adjusted using the ``kmer_size`` option. Note data compressed in this manner + may need to be name collated prior to conversion back to fastq. + + Mapped sequences are sorted by chromosome and position. + kmer_size: the kmer-size to be used if sorting unmapped reads. + compression_level: The compression level to be used in the final output file. + memory_per_thread: Approximately the maximum required memory per thread, specified either + in bytes or with a K, M, or G suffix. + + To prevent sort from creating a huge number of temporary files, it enforces a minimum + value of 1M for this setting. + sort_order: The sort order to use when sorting the file. + sort_tag: The tag to use to use to sort the SAM/BAM/CRAM records. Will be sorted by + this tag first, followed by position (or name, depending on ``sort_order`` + provided). + output_format: the output file format to write the results as. + By default, will try to select a format based on the ``output`` filename extension; + if no format can be deduced, bam is selected. + tempfile_prefix: The prefix to use for temporary files. Resulting files will be in + format PREFIX.nnnn.bam, or if the specified PREFIX is an existing directory, to + PREFIX/samtools.mmm.mmm.tmp.nnnn.bam, where mmm is unique to this invocation of the + sort command. + + By default, any temporary files are written alongside the output file, as + out.bam.tmp.nnnn.bam, or if output is to standard output, in the current directory + as samtools.mmm.mmm.tmp.nnnn.bam. + threads: The number of threads to use when sorting. By default, operation is + single-threaded. + no_pg: If true, will not add a @PG line to the header of the output file. + """ + + output_string = ( + f"{output}##idx##{output}{output_format.index_extension}" + if index_output and output_format.index_extension is not None + else str(output) + ) + + args = ["-m", memory_per_thread, "-O", output_format._name_, "-@", str(threads)] + + if sort_unmapped_reads: + args.extend(["-M", "-K", str(kmer_size)]) + + if compression_level is not None: + args.extend(["-I", str(compression_level)]) + + if sort_order == SamOrder.QueryName: + args.append("-n") + elif sort_order == SamOrder.TemplateCoordinate: + args.append("--template-coordinate") + else: + assert ( + sort_order == SamOrder.Coordinate + ), "Sort order to samtools sort cannot be Unknown or Unsorted" + + if sort_tag is not None: + args.extend(["-t", sort_tag]) + + if tempfile_prefix is not None: + args.extend(["-T", tempfile_prefix]) + + if no_pg: + args.append("--no-PG") + + args.extend(["-o", output_string, str(input)]) + + return _pysam_sort(*args) diff --git a/fgpyo/tests/test_samtools.py b/fgpyo/tests/test_samtools.py new file mode 100644 index 00000000..ee0f97a9 --- /dev/null +++ b/fgpyo/tests/test_samtools.py @@ -0,0 +1,664 @@ +from pathlib import Path +from typing import List +from typing import Optional + +import bgzip +import pytest +from pysam.utils import SamtoolsError + +from fgpyo import sam +from fgpyo import samtools +from fgpyo.sam import SamFileType +from fgpyo.sam import SamOrder +from fgpyo.sam.builder import SamBuilder +from fgpyo.samtools import FaidxMarkStrand +from fgpyo.samtools import SamIndexType + +EXAMPLE_DICT_FASTA: str = """\ +>chr1 +GATTACATTTGAGAGA +>chr2 +CCCCTACCCACCC +>chr1_alt +GATTACATGAGAGA +>chr2_alt +CCCCTACCACCC +""" + +EXPECTED_DICT: str = """\ +@HD VN:1.0 SO:unsorted +@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b UR:consistent_location +@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 UR:consistent_location +@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 UR:consistent_location +@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 UR:consistent_location +""" + +HEADERLESS_DICT: str = """\ +@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b UR:consistent_location +@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 UR:consistent_location +@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 UR:consistent_location +@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 UR:consistent_location +""" + +OTHER_TAG_DICT: str = """\ +@HD VN:1.0 SO:unsorted +@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b AN:1 UR:consistent_location AS:test1 SP:test3 +@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 AN:2 UR:consistent_location AS:test1 SP:test3 +@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 AH:* AN:1_alt UR:consistent_location AS:test1 SP:test3 +@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 AH:* AN:2_alt UR:consistent_location AS:test1 SP:test3 +""" # noqa: E501 + + +@pytest.fixture +def example_dict_fasta(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fa" + + with outfile.open("w") as out_fh: + out_fh.write(EXAMPLE_DICT_FASTA) + + return outfile + + +def test_dict_produces_sequence_dict( + tmp_path: Path, + example_dict_fasta: Path, +) -> None: + output_access = tmp_path / "example.dict" + assert not output_access.exists() + samtools.dict(input=example_dict_fasta, output=output_access, uri="consistent_location") + assert output_access.exists() + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == EXPECTED_DICT + + +def test_dict_no_header_works( + tmp_path: Path, + example_dict_fasta: Path, +) -> None: + output_access = tmp_path / "example.dict" + assert not output_access.exists() + samtools.dict( + input=example_dict_fasta, output=output_access, no_header=True, uri="consistent_location" + ) + assert output_access.exists() + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == HEADERLESS_DICT + + +ALT_FILE: str = """\ +chr1_alt 0 chr1 1 30 7M2D7M * 0 0 * * NM:i:2 +chr2_alt 0 chr2 1 30 6M1D6M * 0 0 * * NM:i:1 +""" + + +@pytest.fixture +def bwa_style_alt(tmp_path: Path) -> Path: + outfile = tmp_path / "example.alt" + + with outfile.open("w") as out_fh: + out_fh.write(ALT_FILE) + + return outfile + + +def test_dict_other_tags_work( + tmp_path: Path, + example_dict_fasta: Path, + bwa_style_alt: Path, +) -> None: + output_access = tmp_path / "example.dict" + assert not output_access.exists() + samtools.dict( + input=example_dict_fasta, + output=output_access, + alias=True, + assembly="test1", + alt=bwa_style_alt, + species="test3", + uri="consistent_location", + ) + assert output_access.exists() + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == OTHER_TAG_DICT + + +EXAMPLE_FAIDX_FASTA: str = """\ +>chr1 +GATTACATTTGAGAGA +>chr2 +CCCCTACCCACCC +""" + +SUBSET_FASTA_TEMPLATE: str = """\ +>chr1:1-7{mark_strand} +GATTACA +>chr2:8-13{mark_strand} +CCACCC +""" +SUBSET_FASTA: str = SUBSET_FASTA_TEMPLATE.format(mark_strand="") + +WRAPPED_SUBSET_FASTA: str = """\ +>chr1:1-7 +GATTA +CA +>chr2:8-13 +CCACC +C +""" + + +@pytest.fixture +def example_faidx_fasta(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fa" + + with outfile.open("w") as out_fh: + out_fh.write(EXAMPLE_FAIDX_FASTA) + + return outfile + + +def test_faidx_produces_functional_index( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx(input=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, output=output_access, regions=["chr1:1-7", "chr2:8-13"] + ) + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == SUBSET_FASTA + + +def test_faidx_fails_if_non_existent_region_requested( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + with pytest.raises(SamtoolsError): + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx(input=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, output=output_access, regions=["chr3:1-4", "chr1:1-7"] + ) + + +def test_faidx_passes_if_non_existent_region_requested_when_continue_passed( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + assert not output_index_expected.exists() + samtools.faidx(input=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + continue_if_non_existent=True, + regions=["chr3:1-4", "chr1:1-7"], + ) + + +def test_faidx_regions_and_regions_file_result_in_same_thing( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx(input=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + + regions = ["chr1:1-7", "chr2:8-13"] + + region_file = tmp_path / "regions.txt" + with region_file.open("w") as region_fh: + region_fh.writelines([f"{region}\n" for region in regions]) + + samtools.faidx(input=example_faidx_fasta, output=output_access, regions=regions) + + manually_passed_output_contents: str + with output_access.open("r") as subset_fasta: + manually_passed_output_contents = subset_fasta.read() + + samtools.faidx(input=example_faidx_fasta, output=output_access, region_file=region_file) + + file_passed_output_contents: str + with output_access.open("r") as subset_fasta: + file_passed_output_contents = subset_fasta.read() + + assert manually_passed_output_contents == file_passed_output_contents + + +def test_length_parameter( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + input=example_faidx_fasta, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + length=5, + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == WRAPPED_SUBSET_FASTA + + +RC_SUBSET_FASTA: str = """\ +>chr1:1-7{mark_strand} +TGTAATC +>chr2:8-13{mark_strand} +GGGTGG +""" + + +def test_rc_parameter( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + input=example_faidx_fasta, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + reverse_complement=True, + regions=["chr1:1-7", "chr2:8-13"], + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == RC_SUBSET_FASTA.format(mark_strand="/rc") + + +@pytest.mark.parametrize( + argnames=["mark_strand", "expected_fwd_mark_strand", "expected_rev_mark_strand"], + argvalues=[ + (FaidxMarkStrand.RevComp, "", "/rc"), + (FaidxMarkStrand.No, "", ""), + (FaidxMarkStrand.Sign, "(+)", "(-)"), + (FaidxMarkStrand.Custom, "ex1a", "ex1b"), + (FaidxMarkStrand.Custom, "ex2a", "ex2b"), + ], + ids=["rev comp", "no", "sign", "custom1", "custom2"], +) +def test_mark_strand_parameters( + tmp_path: Path, + example_faidx_fasta: Path, + mark_strand: FaidxMarkStrand, + expected_fwd_mark_strand: str, + expected_rev_mark_strand: str, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + input=example_faidx_fasta, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + reverse_complement=False, + mark_strand=mark_strand, + custom_mark_strand=(expected_fwd_mark_strand, expected_rev_mark_strand), + regions=["chr1:1-7", "chr2:8-13"], + ) + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == SUBSET_FASTA_TEMPLATE.format(mark_strand=expected_fwd_mark_strand) + + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + reverse_complement=True, + mark_strand=mark_strand, + custom_mark_strand=(expected_fwd_mark_strand, expected_rev_mark_strand), + regions=["chr1:1-7", "chr2:8-13"], + ) + + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == RC_SUBSET_FASTA.format(mark_strand=expected_rev_mark_strand) + + +EXAMPLE_FASTQ: str = """\ +@chr1 +GATTACATTTGAGAGA ++ +;;;;;;;;;;;;;;;; +@chr2 +CCCCTACCCACCC ++ +;;;;;;;;;;;;; +""" + +SUBSET_FASTQ: str = """\ +@chr1:1-7 +GATTACA ++ +;;;;;;; +@chr2:8-13 +CCACCC ++ +;;;;;; +""" + + +@pytest.fixture +def example_fastq(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fq" + + with outfile.open("w") as out_fh: + out_fh.write(EXAMPLE_FASTQ) + + return outfile + + +def test_fastq_parameter( + tmp_path: Path, + example_fastq: Path, +) -> None: + output_index_expected = Path(f"{example_fastq}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + input=example_fastq, + fastq=True, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fq" + # Make sure the index is functional + samtools.faidx( + input=example_fastq, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + fastq=True, + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == SUBSET_FASTQ + + +@pytest.fixture +def example_faidx_fasta_gz(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fa.gz" + + with outfile.open(mode="wb") as out_fh: + with bgzip.BGZipWriter(out_fh) as fh: + fh.write(bytes(EXAMPLE_FAIDX_FASTA, "Utf-8")) + + return outfile + + +def test_index_outputs( + tmp_path: Path, + example_faidx_fasta: Path, + example_faidx_fasta_gz: Path, +) -> None: + example_fai = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not example_fai.exists() + samtools.faidx( + input=example_faidx_fasta, + fai_idx=example_fai, + ) + assert example_fai.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + fai_idx=example_fai, + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + assert output_contents == SUBSET_FASTA + + example_gzi = Path(f"{example_faidx_fasta_gz}.gzi") + assert not example_gzi.exists() + samtools.faidx( + input=example_faidx_fasta_gz, + gzi_idx=example_gzi, + ) + assert example_gzi.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + input=example_faidx_fasta, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + gzi_idx=example_gzi, + ) + + compressed_output_contents: str + with output_access.open() as subset_fasta: + compressed_output_contents = subset_fasta.read() + + assert compressed_output_contents == SUBSET_FASTA + + +@pytest.mark.parametrize( + argnames=["index_type"], + argvalues=[ + (SamIndexType.BAI,), + (SamIndexType.CSI,), + ], + ids=["BAI", "CSI"], +) +def test_index_works_with_one_input( + tmp_path: Path, + index_type: SamIndexType, +) -> None: + builder = SamBuilder(sort_order=SamOrder.Coordinate) + builder.add_pair(name="test1", chrom="chr1", start1=4000, start2=4300) + builder.add_pair( + name="test2", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+" + ) + builder.add_pair(name="test3", chrom="chr2", start1=4000, start2=4300) + builder.add_pair(name="test4", chrom="chr5", start1=4000, start2=4300) + + # At the moment sam builder doesnt support generating CRAM and SAM formats, so for now we're + # only testing on BAMs + input_file = tmp_path / "test_input.bam" + builder.to_path(input_file) + + output_index_expected = Path(f"{input_file}.{index_type._name_.lower()}") + + # Make sure we're producing the index + assert not output_index_expected.exists() + # samtools.index(inputs=[input_file], index_type=index_type) + samtools.index(input=input_file, index_type=index_type) + assert output_index_expected.exists() + + +# Can't accept multiple inputs at the moment. +# See https://github.com/pysam-developers/pysam/issues/1155 +# @pytest.mark.parametrize( +# argnames=["index_type"], +# argvalues=[ +# (SamIndexType.BAI,), +# (SamIndexType.CSI,), +# ], +# ids=["BAI", "CSI"], +# ) +# def test_index_works_with_multiple_inputs( +# tmp_path: Path, +# index_type: SamIndexType, +# ) -> None: +# builder = SamBuilder(sort_order=SamOrder.Coordinate) +# builder.add_pair(name="test1", chrom="chr1", start1=4000, start2=4300) +# builder.add_pair( +# name="test2", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+" +# ) +# builder.add_pair(name="test3", chrom="chr2", start1=4000, start2=4300) +# builder.add_pair(name="test4", chrom="chr5", start1=4000, start2=4300) + +# # At the moment sam builder doesnt support generating CRAM and SAM formats, so for now we're +# # only testing on BAMs +# input_file1 = tmp_path / "test_input1.bam" +# builder.to_path(input_file1) +# input_file2 = tmp_path / "test_input2.bam" +# builder.to_path(input_file2) + +# inputs = [ +# input_file1, +# input_file2, +# ] + +# # Make sure we're producing the indices +# for input in inputs: +# output_index_expected = Path(f"{input}.{index_type._name_.lower()}") +# assert not output_index_expected.exists() + +# samtools.index(inputs=inputs, index_type=index_type) +# for input in inputs: +# output_index_expected = Path(f"{input}.{index_type._name_.lower()}") +# assert output_index_expected.exists() + + +@pytest.mark.parametrize( + argnames=["file_type"], + argvalues=[ + (SamFileType.SAM,), + (SamFileType.BAM,), + (SamFileType.CRAM,), + ], + ids=["SAM", "BAM", "CRAM"], +) +@pytest.mark.parametrize( + argnames=["index_output"], + argvalues=[ + (True,), + (False,), + ], + ids=["indexed", "not_indexed"], +) +@pytest.mark.parametrize( + argnames=["sort_order", "expected_name_order"], + argvalues=[ + (SamOrder.Coordinate, ["test2", "test3", "test4", "test1"]), + (SamOrder.QueryName, ["test1", "test2", "test3", "test4"]), + (SamOrder.TemplateCoordinate, ["test2", "test3", "test4", "test1"]), + ], + ids=["Coordinate sorting", "Query name sorting", "Template Sorted"], +) +def test_sort_types( + tmp_path: Path, + file_type: SamFileType, + index_output: bool, + sort_order: Optional[SamOrder], + expected_name_order: List[str], +) -> None: + + builder = SamBuilder(sort_order=SamOrder.Unsorted) + builder.add_pair( + name="test3", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+" + ) + builder.add_pair(name="test2", chrom="chr1", start1=4000, start2=4300) + builder.add_pair(name="test1", chrom="chr5", start1=4000, start2=4300) + builder.add_pair(name="test4", chrom="chr2", start1=4000, start2=4300) + + input_file = tmp_path / "test_input.bam" + output_file = tmp_path / f"test_output{file_type.extension}" + + builder.to_path(input_file) + + samtools.sort( + input=input_file, + output=output_file, + index_output=index_output, + sort_order=sort_order, + ) + with sam.reader(output_file) as in_bam: + for name in expected_name_order: + read1 = next(in_bam) + assert ( + name == read1.query_name + ), "Position based read sort order did not match expectation" + read2 = next(in_bam) + assert ( + name == read2.query_name + ), "Position based read sort order did not match expectation" + + if index_output and file_type != SamFileType.SAM: + assert Path(f"{output_file}{file_type.index_extension}") diff --git a/pyproject.toml b/pyproject.toml index 783be7c5..1cea5b74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ flake8 = [ { version = ">=6.1.0", python = ">=3.12.0" }, ] black = ">=19.10b0" +bgzip = ">=0.4.0" pytest-cov = ">=2.8.1" isort = ">=5.10.1"