diff --git a/ci/check.sh b/ci/check.sh index dca833f7..456b8588 100755 --- a/ci/check.sh +++ b/ci/check.sh @@ -44,6 +44,7 @@ if [[ "$1" == "--check" ]]; then fi banner "Executing in conda environment ${CONDA_DEFAULT_ENV} in directory ${root}" +run "Checking poetry" "poetry check" run "Unit Tests" "python -m pytest -vv -r sx fgpyo" run "Import Sorting" "isort --force-single-line-imports --profile black fgpyo" run "Style Checking" "black --line-length 99 $black_extra_args fgpyo" diff --git a/fgpyo/io/tests/test_io.py b/fgpyo/io/tests/test_io.py index 1e4bc559..11fe86df 100644 --- a/fgpyo/io/tests/test_io.py +++ b/fgpyo/io/tests/test_io.py @@ -86,8 +86,8 @@ def test_assert_path_is_writeable_pass() -> None: @pytest.mark.parametrize( "suffix, expected", [ - (".gz", io._io.TextIOWrapper), - (".fa", io._io.TextIOWrapper), + (".gz", io.TextIOWrapper), + (".fa", io.TextIOWrapper), ], ) def test_reader( @@ -103,8 +103,8 @@ def test_reader( @pytest.mark.parametrize( "suffix, expected", [ - (".gz", io._io.TextIOWrapper), - (".fa", io._io.TextIOWrapper), + (".gz", io.TextIOWrapper), + (".fa", io.TextIOWrapper), ], ) def test_writer( diff --git a/fgpyo/sam/__init__.py b/fgpyo/sam/__init__.py index caceadac..64e5ec57 100644 --- a/fgpyo/sam/__init__.py +++ b/fgpyo/sam/__init__.py @@ -198,13 +198,14 @@ class SamFileType(enum.Enum): ext (str): The standard file extension for this file type. """ - def __init__(self, mode: str, ext: str) -> None: + def __init__(self, mode: str, ext: str, index_ext: Optional[str]) -> None: self.mode = mode self.extension = ext + self.index_extension = index_ext - SAM = ("", ".sam") - BAM = ("b", ".bam") - CRAM = ("c", ".cram") + SAM = ("", ".sam", None) + BAM = ("b", ".bam", ".bai") + CRAM = ("c", ".cram", ".crai") @classmethod def from_path(cls, path: Union[Path, str]) -> "SamFileType": @@ -911,3 +912,5 @@ class SamOrder(enum.Enum): Coordinate = "coordinate" #: coordinate sorted QueryName = "queryname" #: queryname sorted Unknown = "unknown" # Unknown SAM / BAM / CRAM sort order + # Sort by template-coordinate, SO is unsorted, GO is query, SS is template-coordinate: + TemplateCoordinate = "unsorted" diff --git a/fgpyo/sam/builder.py b/fgpyo/sam/builder.py index 69f0db1e..39c6e1a2 100755 --- a/fgpyo/sam/builder.py +++ b/fgpyo/sam/builder.py @@ -27,6 +27,7 @@ from pysam import AlignmentHeader from fgpyo import sam +from fgpyo import samtools from fgpyo.sam import SamOrder @@ -136,6 +137,11 @@ def __init__( "SQ": (sd if sd is not None else SamBuilder.default_sd()), "RG": [(rg if rg is not None else SamBuilder.default_rg())], } + if sort_order == SamOrder.TemplateCoordinate: + self._header["HD"] = { + **self._header["HD"], + **{"GO": "query", "SS": "template-coordinate"}, + } if extra_header is not None: self._header = {**self._header, **extra_header} self._samheader = AlignmentHeader.from_dict(self._header) @@ -588,18 +594,14 @@ def to_path( if pred(rec): writer.write(rec) - samtools_sort_args = ["-o", str(path), fp.name] - file_handle.close() - if self._sort_order == SamOrder.QueryName: - # Ignore type hints for now until we have wrappers to use here. - pysam.sort("-n", *samtools_sort_args) # type: ignore - elif self._sort_order == SamOrder.Coordinate: - # Ignore type hints for now until we have wrappers to use here. - if index: - samtools_sort_args.insert(0, "--write-index") - pysam.sort(*samtools_sort_args) # type: ignore - + if self._sort_order not in {SamOrder.Unsorted, SamOrder.Unknown}: + samtools.sort( + alignment_file=Path(fp.name), + output=path, + index_output=index, + sort_order=self._sort_order, + ) return path def __len__(self) -> int: diff --git a/fgpyo/samtools.py b/fgpyo/samtools.py new file mode 100644 index 00000000..3188e74b --- /dev/null +++ b/fgpyo/samtools.py @@ -0,0 +1,375 @@ +""" +Type-Hinted Wrappers around pysam samtools dispatch functions. +-------------------------------------------------------------- + +Calls pysam samtools functions, with type hints for relevant parameters and return types. + +Examples +~~~~~~~~ +.. code-block:: python + + >>> from fgpyo import samtools + >>> Path("example1.sorted.bam.bai").exists() + False + >>> samtools.sort(alignment_file=Path("example1.bam"), output=Path("example1.sorted.bam")) + '' + >>> samtools.index(alignment_file=Path("example1.sorted.bam")) + '' + >>> Path("example1.sorted.bam.bai").exists() + True + +Module Contents +~~~~~~~~~~~~~~~ +The module contains the following public method definitions: + - :class:`~fgpyo.samtools.dict` -- Wrapper around pysam call to samtools dict with type hints + on inputs and return type + - :class:`~fgpyo.samtools.sort` -- Wrapper around pysam call to samtools sort with type hints + on inputs and return type + - :class:`~fgpyo.samtools.faidx` -- Wrapper around pysam call to samtools faidx with type hints + on inputs and return type + - :class:`~fgpyo.samtools.index` -- Wrapper around pysam call to samtools index with type hints + on inputs and return type +""" +import enum +from pathlib import Path +from typing import TYPE_CHECKING +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import Union + +from fgpyo.sam import SamFileType +from fgpyo.sam import SamOrder + + +def pysam_fn(*args: str) -> str: + """ + Type hinted import of an example function from the pysam samtools dispatcher. + """ + pass + + +if TYPE_CHECKING: + _pysam_dict = pysam_fn + _pysam_sort = pysam_fn + _pysam_faidx = pysam_fn + _pysam_index = pysam_fn +else: + from pysam import dict as _pysam_dict + from pysam import faidx as _pysam_faidx + from pysam import index as _pysam_index + from pysam import sort as _pysam_sort + + +def dict( + fasta_file: Path, + output: Path, + no_header: bool = False, + alias: bool = False, + assembly: Optional[str] = None, + alt: Optional[Path] = None, + species: Optional[str] = None, + uri: Optional[str] = None, +) -> str: + """ + Calls the `samtools` function `dict` using the pysam samtools dispatcher. + Create a dictionary file from an input FASTA file. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.dict() + + Args: + fasta_file: Path to the FASTA files to generate a sequence dictionary for. + output: Path to the file to write output to. + no_header: If true will not print the @HD header line. + assembly: The assembly for the AS tag. + alias: Adds an AN tag with the same value as the SN tag, except that a 'chr' prefix is + removed if SN has one or added if it does not. For mitochondria (i.e., when SN is “M” + or “MT”, with or without a “chr” prefix), also adds the remaining combinations of + “chr/M/MT” to the AN tag. + alt: Add an AH tag to each sequence listed in the specified bwa-style .alt file. These + files use SAM records to represent alternate locus sequences (as named in the QNAME + field) and their mappings to the primary assembly. + species: Specify the species for the SP tag. + uri: Specify the URI for the UR tag. Defaults to the absolute path of ref.fasta. + """ + + args: List[str] = ["--output", str(output)] + + if no_header: + args.append("--no-header") + + if alias: + args.append("--alias") + + if assembly is not None: + args.extend(["--assembly", assembly]) + + if alt is not None: + args.extend(["--alt", str(alt)]) + + if species is not None: + args.extend(["--species", species]) + + if uri is not None: + args.extend(["--uri", uri]) + + args.append(str(fasta_file)) + + return _pysam_dict(*args) + + +@enum.unique +class FaidxMarkStrand(enum.Enum): + RevComp = "rc" + No = "no" + Sign = "sign" + Custom = "custom" + + +def faidx( + sequence_file: Path, + output: Optional[Path] = None, + length: int = 60, + regions: Optional[List[str]] = None, + region_file: Optional[Path] = None, + fai_idx: Optional[Path] = None, + gzi_idx: Optional[Path] = None, + continue_if_non_existent: bool = False, + fastq: bool = False, + reverse_complement: bool = False, + mark_strand: FaidxMarkStrand = FaidxMarkStrand.RevComp, + custom_mark_strand: Optional[Tuple[str, str]] = None, +) -> str: + """ + Calls the `samtools` function `faidx` using the pysam samtools dispatcher. + Index an input FASTA or FASTQ file. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.faidx() + + Args: + sequence_file: Path to the FASTA/FASTQ file to index. + output: Path to the file to write output index file. + length: Length of FASTA sequence line. + regions: regions to extract from the FASTA file in samtools region format + (:<1-based start>-<1-based end>) + region_file: Path to file containing regions to extract from the FASTA file in samtools + region format (:<1-based start>-<1-based end>). 1 region descriptor per line. + fai_idx: Read/Write to specified FAI index file. + gzi_idx: Read/Write to specified compressed file index (used with .gz files). + continue_if_non_existent: If true continue working if a non-existent region is requested. + fastq: Read FASTQ files and output extracted sequences in FASTQ format. Same as using + samtools fqidx. + reverse_complement: Output the sequence as the reverse complement. When this option is + used, “/rc” will be appended to the sequence names. To turn this off or change the + string appended, use the mark_strand parameter. + mark_strand: Append strand indicator to sequence name. Type to append can be one of: + FaidxMarkStrand.RevComp - Append '/rc' when writing the reverse complement. This is the + default. + + FaidxMarkStrand.No - Do not append anything. + + FaidxMarkStrand.Sign - Append '(+)' for forward strand or '(-)' for reverse + complement. This matches the output of “bedtools getfasta -s”. + + FaidxMarkStrand.Custom - custom,, + Append string to names when writing the forward strand and when writing the + reverse strand. Spaces are preserved, so it is possible to move the indicator into the + comment part of the description line by including a leading space in the strings + and . + custom_mark_strand: The custom strand indicators to use in in the Custom MarkStrand + setting. The first value of the tuple will be used as the positive strand indicator, + the second value will be used as the negative strand indicator. + """ + + mark_strand_str: str + if mark_strand == FaidxMarkStrand.Custom: + assert custom_mark_strand is not None, ( + "Cannot use custom mark strand without providing the custom strand indicators to " + + "`custom_mark_string`" + ) + mark_strand_str = f"{mark_strand.value},{custom_mark_strand[0]},{custom_mark_strand[1]}" + else: + mark_strand_str = mark_strand.value + + args: List[str] = ["--length", str(length), "--mark-strand", mark_strand_str] + + if output is not None: + args.extend(["--output", str(output)]) + + if continue_if_non_existent: + args.append("--continue") + if reverse_complement: + args.append("--reverse-complement") + if fastq: + args.append("--fastq") + + if fai_idx is not None: + args.extend(["--fai-idx", str(fai_idx)]) + + if gzi_idx is not None: + args.extend(["--gzi-idx", str(gzi_idx)]) + + if region_file is not None: + args.extend(["--region-file", str(region_file)]) + + args.append(str(sequence_file)) + + if regions is not None: + args.extend(regions) + + return _pysam_faidx(*args) + + +@enum.unique +class SamIndexType(enum.Enum): + BAI = "-b" + CSI = "-c" + + +def index( + alignment_file: Union[Path, Sequence[Path]], + output: Optional[Path] = None, + threads: int = 0, + index_type: SamIndexType = SamIndexType.BAI, + csi_index_size: int = 14, +) -> str: + """ + Calls the `samtools` function `index` using the pysam samtools dispatcher. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.index() + + Args: + alignment_file: Path to the SAM/BAM/CRAM file to index. + output: Path to the file to write output. (Currently may only be used when exactly one + alignment file is being indexed.) + threads: Number of input/output compression threads to use in addition to main thread. + index_type: The type of index file to produce when indexing. + csi_index_size: Sets the minimum interval size of CSI indices to 2^INT. + """ + + if isinstance(alignment_file, Path): + alignment_file = [alignment_file] + if output is not None and len(alignment_file) != 1: + raise ValueError("Only one output may be specified, for indexing only one alignment file") + + args = [] if len(alignment_file) == 1 else ["-M"] + + if index_type != SamIndexType.BAI: + args.append(index_type.value) + + if index_type == SamIndexType.CSI: + args.extend(["-m", str(csi_index_size)]) + args.extend(["-@", str(threads)]) + args.extend(f"{input_path}" for input_path in alignment_file) + if output is not None: + args.extend(["-o", str(output)]) + + return _pysam_index(*args) + + +def sort( + alignment_file: Path, + output: Path, + index_output: bool = True, + sort_unmapped_reads: bool = False, + kmer_size: int = 20, + compression_level: Optional[int] = None, + memory_per_thread: str = "768MB", + sort_order: SamOrder = SamOrder.Coordinate, + sort_tag: Optional[str] = None, + output_format: SamFileType = SamFileType.BAM, + tempfile_prefix: Optional[str] = None, + threads: int = 1, + no_pg: bool = False, +) -> str: + """ + Calls the `samtools` function sort using the pysam samtools dispatcher. + + Arguments will be formatted into the appropriate command-line call which will be invoked + using the pysam dispatcher. + + Returns the stdout of the resulting call to pysam.sort() + + Args: + alignment_file: Path to the SAM/BAM/CRAM file to sort. + output: Path to the file to write output. + index_output: If true, creates an index for the output file. + sort_unmapped_reads: If true, sort unmapped reads by their sequence minimizer, reverse + complementing where appropriate. This has the effect of collating some similar data + together, improving the compressibility of the unmapped sequence. The minimiser kmer + size is adjusted using the ``kmer_size`` option. Note data compressed in this manner + may need to be name collated prior to conversion back to fastq. + + Mapped sequences are sorted by chromosome and position. + kmer_size: the kmer-size to be used if sorting unmapped reads. + compression_level: The compression level to be used in the final output file. + memory_per_thread: Approximately the maximum required memory per thread, specified either + in bytes or with a K, M, or G suffix. + + To prevent sort from creating a huge number of temporary files, it enforces a minimum + value of 1M for this setting. + sort_order: The sort order to use when sorting the file. + sort_tag: The tag to use to use to sort the SAM/BAM/CRAM records. Will be sorted by + this tag first, followed by position (or name, depending on ``sort_order`` + provided). + output_format: the output file format to write the results as. + By default, will try to select a format based on the ``output`` filename extension; + if no format can be deduced, bam is selected. + tempfile_prefix: The prefix to use for temporary files. Resulting files will be in + format PREFIX.nnnn.bam, or if the specified PREFIX is an existing directory, to + PREFIX/samtools.mmm.mmm.tmp.nnnn.bam, where mmm is unique to this invocation of the + sort command. + + By default, any temporary files are written alongside the output file, as + out.bam.tmp.nnnn.bam, or if output is to standard output, in the current directory + as samtools.mmm.mmm.tmp.nnnn.bam. + threads: The number of threads to use when sorting. By default, operation is + single-threaded. + no_pg: If true, will not add a @PG line to the header of the output file. + """ + + output_string = ( + f"{output}##idx##{output}{output_format.index_extension}" + if index_output and output_format.index_extension is not None + else str(output) + ) + + args = ["-m", memory_per_thread, "-O", output_format._name_, "-@", str(threads)] + + if sort_unmapped_reads: + args.extend(["-M", "-K", str(kmer_size)]) + + if compression_level is not None: + args.extend(["-I", str(compression_level)]) + + if sort_order == SamOrder.QueryName: + args.append("-n") + elif sort_order == SamOrder.TemplateCoordinate: + args.append("--template-coordinate") + else: + assert ( + sort_order == SamOrder.Coordinate + ), "Sort order to samtools sort cannot be Unknown or Unsorted" + + if sort_tag is not None: + args.extend(["-t", sort_tag]) + + if tempfile_prefix is not None: + args.extend(["-T", tempfile_prefix]) + + if no_pg: + args.append("--no-PG") + + args.extend(["-o", output_string, str(alignment_file)]) + + return _pysam_sort(*args) diff --git a/fgpyo/tests/test_samtools.py b/fgpyo/tests/test_samtools.py new file mode 100644 index 00000000..d678f491 --- /dev/null +++ b/fgpyo/tests/test_samtools.py @@ -0,0 +1,667 @@ +from pathlib import Path +from typing import List +from typing import Optional + +import pysam +import pytest +from pysam.utils import SamtoolsError + +from fgpyo import sam +from fgpyo import samtools +from fgpyo.sam import SamFileType +from fgpyo.sam import SamOrder +from fgpyo.sam.builder import SamBuilder +from fgpyo.samtools import FaidxMarkStrand +from fgpyo.samtools import SamIndexType + +EXAMPLE_DICT_FASTA: str = """\ +>chr1 +GATTACATTTGAGAGA +>chr2 +CCCCTACCCACCC +>chr1_alt +GATTACATGAGAGA +>chr2_alt +CCCCTACCACCC +""" + +EXPECTED_DICT: str = """\ +@HD VN:1.0 SO:unsorted +@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b UR:consistent_location +@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 UR:consistent_location +@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 UR:consistent_location +@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 UR:consistent_location +""" + +HEADERLESS_DICT: str = """\ +@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b UR:consistent_location +@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 UR:consistent_location +@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 UR:consistent_location +@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 UR:consistent_location +""" + +OTHER_TAG_DICT: str = """\ +@HD VN:1.0 SO:unsorted +@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b AN:1 UR:consistent_location AS:test1 SP:test3 +@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 AN:2 UR:consistent_location AS:test1 SP:test3 +@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 AH:* AN:1_alt UR:consistent_location AS:test1 SP:test3 +@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 AH:* AN:2_alt UR:consistent_location AS:test1 SP:test3 +""" # noqa: E501 + + +@pytest.fixture +def example_dict_fasta(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fa" + + with outfile.open("w") as out_fh: + out_fh.write(EXAMPLE_DICT_FASTA) + + return outfile + + +def test_dict_produces_sequence_dict( + tmp_path: Path, + example_dict_fasta: Path, +) -> None: + output_access = tmp_path / "example.dict" + assert not output_access.exists() + samtools.dict(fasta_file=example_dict_fasta, output=output_access, uri="consistent_location") + assert output_access.exists() + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == EXPECTED_DICT + + +def test_dict_no_header_works( + tmp_path: Path, + example_dict_fasta: Path, +) -> None: + output_access = tmp_path / "example.dict" + assert not output_access.exists() + samtools.dict( + fasta_file=example_dict_fasta, + output=output_access, + no_header=True, + uri="consistent_location", + ) + assert output_access.exists() + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == HEADERLESS_DICT + + +ALT_FILE: str = """\ +chr1_alt 0 chr1 1 30 7M2D7M * 0 0 * * NM:i:2 +chr2_alt 0 chr2 1 30 6M1D6M * 0 0 * * NM:i:1 +""" + + +@pytest.fixture +def bwa_style_alt(tmp_path: Path) -> Path: + outfile = tmp_path / "example.alt" + + with outfile.open("w") as out_fh: + out_fh.write(ALT_FILE) + + return outfile + + +def test_dict_other_tags_work( + tmp_path: Path, + example_dict_fasta: Path, + bwa_style_alt: Path, +) -> None: + output_access = tmp_path / "example.dict" + assert not output_access.exists() + samtools.dict( + fasta_file=example_dict_fasta, + output=output_access, + alias=True, + assembly="test1", + alt=bwa_style_alt, + species="test3", + uri="consistent_location", + ) + assert output_access.exists() + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == OTHER_TAG_DICT + + +EXAMPLE_FAIDX_FASTA: str = """\ +>chr1 +GATTACATTTGAGAGA +>chr2 +CCCCTACCCACCC +""" + +SUBSET_FASTA_TEMPLATE: str = """\ +>chr1:1-7{mark_strand} +GATTACA +>chr2:8-13{mark_strand} +CCACCC +""" +SUBSET_FASTA: str = SUBSET_FASTA_TEMPLATE.format(mark_strand="") + +WRAPPED_SUBSET_FASTA: str = """\ +>chr1:1-7 +GATTA +CA +>chr2:8-13 +CCACC +C +""" + + +@pytest.fixture +def example_faidx_fasta(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fa" + + with outfile.open("w") as out_fh: + out_fh.write(EXAMPLE_FAIDX_FASTA) + + return outfile + + +def test_faidx_produces_functional_index( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx(sequence_file=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, output=output_access, regions=["chr1:1-7", "chr2:8-13"] + ) + + output_contents: str + with output_access.open("r") as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == SUBSET_FASTA + + +def test_faidx_fails_if_non_existent_region_requested( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + with pytest.raises(SamtoolsError): + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx(sequence_file=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + regions=["chr3:1-4", "chr1:1-7"], + ) + + +def test_faidx_passes_if_non_existent_region_requested_when_continue_passed( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + assert not output_index_expected.exists() + samtools.faidx(sequence_file=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + continue_if_non_existent=True, + regions=["chr3:1-4", "chr1:1-7"], + ) + + +def test_faidx_regions_and_regions_file_result_in_same_thing( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx(sequence_file=example_faidx_fasta) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + + regions = ["chr1:1-7", "chr2:8-13"] + + region_file = tmp_path / "regions.txt" + with region_file.open("w") as region_fh: + region_fh.writelines([f"{region}\n" for region in regions]) + + samtools.faidx(sequence_file=example_faidx_fasta, output=output_access, regions=regions) + + manually_passed_output_contents: str + with output_access.open("r") as subset_fasta: + manually_passed_output_contents = subset_fasta.read() + + samtools.faidx( + sequence_file=example_faidx_fasta, output=output_access, region_file=region_file + ) + + file_passed_output_contents: str + with output_access.open("r") as subset_fasta: + file_passed_output_contents = subset_fasta.read() + + assert manually_passed_output_contents == file_passed_output_contents + + +def test_length_parameter( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + sequence_file=example_faidx_fasta, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + length=5, + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == WRAPPED_SUBSET_FASTA + + +RC_SUBSET_FASTA: str = """\ +>chr1:1-7{mark_strand} +TGTAATC +>chr2:8-13{mark_strand} +GGGTGG +""" + + +def test_rc_parameter( + tmp_path: Path, + example_faidx_fasta: Path, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + sequence_file=example_faidx_fasta, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + reverse_complement=True, + regions=["chr1:1-7", "chr2:8-13"], + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == RC_SUBSET_FASTA.format(mark_strand="/rc") + + +@pytest.mark.parametrize( + argnames=["mark_strand", "expected_fwd_mark_strand", "expected_rev_mark_strand"], + argvalues=[ + (FaidxMarkStrand.RevComp, "", "/rc"), + (FaidxMarkStrand.No, "", ""), + (FaidxMarkStrand.Sign, "(+)", "(-)"), + (FaidxMarkStrand.Custom, "ex1a", "ex1b"), + (FaidxMarkStrand.Custom, "ex2a", "ex2b"), + ], + ids=["rev comp", "no", "sign", "custom1", "custom2"], +) +def test_mark_strand_parameters( + tmp_path: Path, + example_faidx_fasta: Path, + mark_strand: FaidxMarkStrand, + expected_fwd_mark_strand: str, + expected_rev_mark_strand: str, +) -> None: + output_index_expected = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + sequence_file=example_faidx_fasta, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + reverse_complement=False, + mark_strand=mark_strand, + custom_mark_strand=(expected_fwd_mark_strand, expected_rev_mark_strand), + regions=["chr1:1-7", "chr2:8-13"], + ) + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == SUBSET_FASTA_TEMPLATE.format(mark_strand=expected_fwd_mark_strand) + + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + reverse_complement=True, + mark_strand=mark_strand, + custom_mark_strand=(expected_fwd_mark_strand, expected_rev_mark_strand), + regions=["chr1:1-7", "chr2:8-13"], + ) + + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == RC_SUBSET_FASTA.format(mark_strand=expected_rev_mark_strand) + + +EXAMPLE_FASTQ: str = """\ +@chr1 +GATTACATTTGAGAGA ++ +;;;;;;;;;;;;;;;; +@chr2 +CCCCTACCCACCC ++ +;;;;;;;;;;;;; +""" + +SUBSET_FASTQ: str = """\ +@chr1:1-7 +GATTACA ++ +;;;;;;; +@chr2:8-13 +CCACCC ++ +;;;;;; +""" + + +@pytest.fixture +def example_fastq(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fq" + + with outfile.open("w") as out_fh: + out_fh.write(EXAMPLE_FASTQ) + + return outfile + + +def test_fastq_parameter( + tmp_path: Path, + example_fastq: Path, +) -> None: + output_index_expected = Path(f"{example_fastq}.fai") + + # Make sure we're producing the index + assert not output_index_expected.exists() + samtools.faidx( + sequence_file=example_fastq, + fastq=True, + ) + assert output_index_expected.exists() + + output_access = tmp_path / "output_subset.fq" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_fastq, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + fastq=True, + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + + assert output_contents == SUBSET_FASTQ + + +@pytest.fixture +def example_faidx_fasta_gz(tmp_path: Path) -> Path: + outfile = tmp_path / "example.fa.gz" + + with pysam.BGZFile(f"{outfile}", mode="wb", index=None) as fh: + fh.write(bytes(EXAMPLE_FAIDX_FASTA, "Utf-8")) + + return outfile + + +def test_index_outputs( + tmp_path: Path, + example_faidx_fasta: Path, + example_faidx_fasta_gz: Path, +) -> None: + example_fai = Path(f"{example_faidx_fasta}.fai") + + # Make sure we're producing the index + assert not example_fai.exists() + samtools.faidx( + sequence_file=example_faidx_fasta, + fai_idx=example_fai, + ) + assert example_fai.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + fai_idx=example_fai, + ) + + output_contents: str + with output_access.open() as subset_fasta: + output_contents = subset_fasta.read() + assert output_contents == SUBSET_FASTA + + example_gzi = Path(f"{example_faidx_fasta_gz}.gzi") + assert not example_gzi.exists() + samtools.faidx( + sequence_file=example_faidx_fasta_gz, + gzi_idx=example_gzi, + ) + assert example_gzi.exists() + + output_access = tmp_path / "output_subset.fa" + # Make sure the index is functional + samtools.faidx( + sequence_file=example_faidx_fasta, + output=output_access, + regions=["chr1:1-7", "chr2:8-13"], + gzi_idx=example_gzi, + ) + + compressed_output_contents: str + with output_access.open() as subset_fasta: + compressed_output_contents = subset_fasta.read() + + assert compressed_output_contents == SUBSET_FASTA + + +@pytest.mark.parametrize( + argnames=["index_type"], + argvalues=[ + (SamIndexType.BAI,), + (SamIndexType.CSI,), + ], + ids=["BAI", "CSI"], +) +def test_index_works_with_one_input( + tmp_path: Path, + index_type: SamIndexType, +) -> None: + builder = SamBuilder(sort_order=SamOrder.Coordinate) + builder.add_pair(name="test1", chrom="chr1", start1=4000, start2=4300) + builder.add_pair( + name="test2", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+" + ) + builder.add_pair(name="test3", chrom="chr2", start1=4000, start2=4300) + builder.add_pair(name="test4", chrom="chr5", start1=4000, start2=4300) + + # At the moment sam builder doesn't support generating CRAM and SAM formats, so for now we're + # only testing on BAMs + input_file = tmp_path / "test_input.bam" + builder.to_path(input_file) + + output_index_expected = Path(f"{input_file}.{index_type._name_.lower()}") + + # Make sure we're producing the index + assert not output_index_expected.exists() + # samtools.index(inputs=[input_file], index_type=index_type) + samtools.index(alignment_file=input_file, index_type=index_type) + assert output_index_expected.exists() + + +@pytest.mark.parametrize( + argnames=["index_type"], + argvalues=[ + (SamIndexType.BAI,), + (SamIndexType.CSI,), + ], + ids=["BAI", "CSI"], +) +def test_index_works_with_multiple_inputs( + tmp_path: Path, + index_type: SamIndexType, +) -> None: + builder = SamBuilder(sort_order=SamOrder.Coordinate) + builder.add_pair(name="test1", chrom="chr1", start1=4000, start2=4300) + builder.add_pair( + name="test2", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+" + ) + builder.add_pair(name="test3", chrom="chr2", start1=4000, start2=4300) + builder.add_pair(name="test4", chrom="chr5", start1=4000, start2=4300) + + # At the moment sam builder doesn't support generating CRAM and SAM formats, so for now we're + # only testing on BAMs + input_file1 = tmp_path / "test_input1.bam" + builder.to_path(input_file1) + input_file2 = tmp_path / "test_input2.bam" + builder.to_path(input_file2) + + inputs = [ + input_file1, + input_file2, + ] + + # Make sure we're producing the indices + for alignment_file in inputs: + output_index_expected = Path(f"{alignment_file}.{index_type._name_.lower()}") + assert not output_index_expected.exists() + + samtools.index(alignment_file=inputs, index_type=index_type) + for alignment_file in inputs: + output_index_expected = Path(f"{alignment_file}.{index_type._name_.lower()}") + assert output_index_expected.exists() + + +@pytest.mark.parametrize( + argnames=["file_type"], + argvalues=[ + (SamFileType.SAM,), + (SamFileType.BAM,), + (SamFileType.CRAM,), + ], + ids=["SAM", "BAM", "CRAM"], +) +@pytest.mark.parametrize( + argnames=["index_output"], + argvalues=[ + (True,), + (False,), + ], + ids=["indexed", "not_indexed"], +) +@pytest.mark.parametrize( + argnames=["sort_order", "expected_name_order"], + argvalues=[ + (SamOrder.Coordinate, ["test2", "test3", "test4", "test1"]), + (SamOrder.QueryName, ["test1", "test2", "test3", "test4"]), + (SamOrder.TemplateCoordinate, ["test2", "test3", "test4", "test1"]), + ], + ids=["Coordinate sorting", "Query name sorting", "Template Sorted"], +) +def test_sort_types( + tmp_path: Path, + file_type: SamFileType, + index_output: bool, + sort_order: Optional[SamOrder], + expected_name_order: List[str], +) -> None: + builder = SamBuilder(sort_order=SamOrder.Unsorted) + builder.add_pair( + name="test3", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+" + ) + builder.add_pair(name="test2", chrom="chr1", start1=4000, start2=4300) + builder.add_pair(name="test1", chrom="chr5", start1=4000, start2=4300) + builder.add_pair(name="test4", chrom="chr2", start1=4000, start2=4300) + + input_file = tmp_path / "test_input.bam" + output_file = tmp_path / f"test_output{file_type.extension}" + + builder.to_path(input_file) + + samtools.sort( + alignment_file=input_file, + output=output_file, + index_output=index_output, + sort_order=sort_order, + ) + with sam.reader(output_file) as in_bam: + for name in expected_name_order: + read1 = next(in_bam) + assert ( + name == read1.query_name + ), "Position based read sort order did not match expectation" + read2 = next(in_bam) + assert ( + name == read2.query_name + ), "Position based read sort order did not match expectation" + + if index_output and file_type != SamFileType.SAM: + assert Path(f"{output_file}{file_type.index_extension}") diff --git a/fgpyo/util/inspect.py b/fgpyo/util/inspect.py index 6a69a6b5..1a7a32d4 100644 --- a/fgpyo/util/inspect.py +++ b/fgpyo/util/inspect.py @@ -1,3 +1,4 @@ +import sys from typing import Any from typing import Dict from typing import List @@ -5,9 +6,9 @@ from typing import Type from typing import Union -try: # py>=38 +if sys.version_info >= (3, 8): from typing import Literal -except ImportError: # py<38 +else: from typing_extensions import Literal import functools