diff --git a/MANIFEST.in b/MANIFEST.in index 5520e14..f709b94 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -7,4 +7,5 @@ include bedboss/bedmaker/* include bedboss/bedqc/* include bedboss/qdrant_index/* include bedboss/bedbuncher/* -include bedboss/bedbuncher/tools/* \ No newline at end of file +include bedboss/bedbuncher/tools/* +include bedboss/bedclassifier/* \ No newline at end of file diff --git a/bedboss/bedclassifier/__init__.py b/bedboss/bedclassifier/__init__.py new file mode 100644 index 0000000..b8eb0d5 --- /dev/null +++ b/bedboss/bedclassifier/__init__.py @@ -0,0 +1 @@ +from bedboss.bedclassifier.bedclassifier import BedClassifier, get_bed_type diff --git a/bedboss/bedclassifier/bedclassifier.py b/bedboss/bedclassifier/bedclassifier.py new file mode 100644 index 0000000..4251b05 --- /dev/null +++ b/bedboss/bedclassifier/bedclassifier.py @@ -0,0 +1,234 @@ +import gzip +import logging +import os +import shutil +from typing import Optional, Tuple + +import pandas.errors +import pypiper +import pandas as pd + +from bedboss.const import STANDARD_CHROM_LIST +from bedboss.exceptions import BedTypeException + +_LOGGER = logging.getLogger("bedboss") + + +class BedClassifier: + """ + This will take the input of either a .bed or a .bed.gz and classify the type of BED file. + + Types: + BED, BED2 - BED12, narrowPeak, broadPeak + UnknownType + + """ + + def __init__( + self, + input_file: str, + output_dir: Optional[str] = None, + bed_digest: Optional[str] = None, + input_type: Optional[str] = None, + pm: pypiper.PipelineManager = None, + report_to_database: Optional[bool] = False, + ): + # Raise Exception if input_type is given and it is NOT a BED file + # Raise Exception if the input file cannot be resolved + self.input_file = input_file + self.bed_digest = bed_digest + self.input_type = input_type + + self.abs_bed_path = os.path.abspath(self.input_file) + self.file_name = os.path.splitext(os.path.basename(self.abs_bed_path))[0] + self.file_extension = os.path.splitext(self.abs_bed_path)[-1] + + # we need this only if unzipping a file + self.output_dir = output_dir or os.path.join( + os.path.dirname(self.abs_bed_path), "temp_processing" + ) + # Use existing Pipeline Manager or Construct New one + # Want to use Pipeline Manager to log work AND cleanup unzipped gz files. + if pm is not None: + self.pm = pm + self.pm_created = False + else: + self.logs_dir = os.path.join(self.output_dir, "logs") + self.pm = pypiper.PipelineManager( + name="bedclassifier", + outfolder=self.logs_dir, + recover=True, + pipestat_sample_name=bed_digest, + ) + self.pm.start_pipeline() + self.pm_created = True + + if self.file_extension == ".gz": + unzipped_input_file = os.path.join(self.output_dir, self.file_name) + + with gzip.open(self.input_file, "rb") as f_in: + _LOGGER.info( + f"Unzipping file:{self.input_file} and Creating Unzipped file: {unzipped_input_file}" + ) + with open(unzipped_input_file, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + self.input_file = unzipped_input_file + self.pm.clean_add(unzipped_input_file) + + self.bed_type = get_bed_type(self.input_file) + + if self.input_type is not None: + if self.bed_type != self.input_type: + _LOGGER.warning( + f"BED file classified as different type than given input: {self.bed_type} vs {self.input_type}" + ) + + self.pm.report_result(key="bedtype", value=self.bed_type) + + if self.pm_created is True: + self.pm.stop_pipeline() + + +def get_bed_type( + bed: str, standard_chrom: Optional[str] = None, no_fail: Optional[bool] = True +) -> Tuple[str, str]: + """ + get the bed file type (ex. bed3, bed3+n ) + standardize chromosomes if necessary: + filter the input file to contain only the standard chromosomes, + remove regions on ChrUn chromosomes + + :param bed: path to the bed file + :param no_fail: should the function (and pipeline) continue if this function fails to parse BED file + :param standard_chrom: + :return bedtype: tuple[option ["bed{bedtype}+{n}", "unknown_bedtype"], option [bed, narrowpeak, broadpeak, unknown_bedtype]] + """ + # column format for bed12 + # string chrom; "Reference sequence chromosome or scaffold" + # uint chromStart; "Start position in chromosome" + # uint chromEnd; "End position in chromosome" + # string name; "Name of item." + # uint score; "Score (0-1000)" + # char[1] strand; "+ or - for strand" + # uint thickStart; "Start of where display should be thick (start codon)" + # uint thickEnd; "End of where display should be thick (stop codon)" + # uint reserved; "Used as itemRgb as of 2004-11-22" + # int blockCount; "Number of blocks" + # int[blockCount] blockSizes; "Comma separated list of block sizes" + # int[blockCount] chromStarts; "Start positions relative to chromStart" + + df = None + + max_rows = 5 + row_count = 0 + while row_count <= max_rows: + print(f"ROW COUNT: {row_count}") + try: + df = pd.read_csv(bed, sep="\t", header=None, nrows=4, skiprows=row_count) + break + except (pandas.errors.ParserError, pandas.errors.EmptyDataError) as e: + if row_count <= max_rows: + row_count += 1 + else: + if no_fail: + _LOGGER.warning( + f"Unable to parse bed file {bed}, due to error {e}, setting bed_type = unknown_bedtype" + ) + return ("unknown_bedtype", "unknown_bedtype") + else: + raise BedTypeException( + reason=f"Bed type could not be determined due to CSV parse error {e}" + ) + + if df is not None: + df = df.dropna(axis=1) + + # standardizing chromosome + # remove regions on ChrUn chromosomes + if standard_chrom: + _LOGGER.info("Standardizing chromosomes...") + df = df[df.loc[:, 0].isin(STANDARD_CHROM_LIST)] + df.to_csv(bed, compression="gzip", sep="\t", header=False, index=False) + + num_cols = len(df.columns) + bedtype = 0 + + for col in df: + if col <= 2: + if col == 0: + if df[col].dtype == "O": + bedtype += 1 + elif df[col].dtype == "int" or df[col].dtype == "float": + bedtype += 1 + else: + if no_fail: + _LOGGER.warning( + f"Bed type could not be determined at column {0} with data type: {df[col].dtype}" + ) + return ("unknown_bedtype", "unknown_bedtype") + else: + raise BedTypeException( + reason=f"Bed type could not be determined at column {0} with data type: {df[col].dtype}" + ) + + else: + if df[col].dtype == "int" and (df[col] >= 0).all(): + bedtype += 1 + else: + if no_fail: + _LOGGER.warning( + f"Bed type could not be determined at column {col} with data type: {df[col].dtype}" + ) + return ("unknown_bedtype", "unknown_bedtype") + else: + raise BedTypeException( + reason=f"Bed type could not be determined at column 0 with data type: {df[col].dtype}" + ) + else: + if col == 3: + if df[col].dtype == "O": + bedtype += 1 + else: + n = num_cols - bedtype + return (f"bed{bedtype}+{n}", "bed") + elif col == 4: + if df[col].dtype == "int" and df[col].between(0, 1000).all(): + bedtype += 1 + else: + n = num_cols - bedtype + return (f"bed{bedtype}+{n}", "bed") + elif col == 5: + if df[col].isin(["+", "-", "."]).all(): + bedtype += 1 + else: + n = num_cols - bedtype + return (f"bed{bedtype}+{n}", "bed") + elif 6 <= col <= 8: + if df[col].dtype == "int" and (df[col] >= 0).all(): + bedtype += 1 + else: + n = num_cols - bedtype + return (f"bed{bedtype}+{n}", "bed") + elif col == 9: + if df[col].dtype == "int": + bedtype += 1 + else: + n = num_cols - bedtype + if "broadpeak" in bed or "broadPeak" in bed: + return (f"bed{bedtype}+{n}", "broadpeak") + else: + return (f"bed{bedtype}+{n}", "bed") + elif col == 10 or col == 11: + if df[col].str.match(r"^(\d+(,\d+)*)?$").all(): + bedtype += 1 + else: + n = num_cols - bedtype + if "narrowpeak" in bed or "narrowPeak" in bed: + return (f"bed{bedtype}+{n}", "narrowpeak") + else: + return (f"bed{bedtype}+{n}", "bed") + else: + n = num_cols - bedtype + return (f"bed{bedtype}+{n}", "bed") + else: + return ("unknown_bedtype", "unknown_bedtype") diff --git a/bedboss/bedmaker/bedmaker.py b/bedboss/bedmaker/bedmaker.py index e8538ec..553119b 100755 --- a/bedboss/bedmaker/bedmaker.py +++ b/bedboss/bedmaker/bedmaker.py @@ -20,6 +20,7 @@ from yacman.exceptions import UndefinedAliasError from ubiquerg import is_command_callable +from bedboss.bedclassifier.bedclassifier import get_bed_type from bedboss.bedqc.bedqc import bedqc from bedboss.exceptions import RequirementsException @@ -336,7 +337,7 @@ def make_bigbed(self) -> NoReturn: temp = os.path.join(self.output_bigbed, next(tempfile._get_candidate_names())) if not os.path.exists(big_narrow_peak): - bedtype = self.get_bed_type(self.output_bed) + bedtype = get_bed_type(self.output_bed, standard_chrom=self.standard_chrom) self.pm.clean_add(temp) if not is_command_callable(f"{BED_TO_BIGBED_PROGRAM}"): @@ -455,91 +456,3 @@ def get_chrom_sizes(self) -> str: _LOGGER.info(f"Determined path to chrom.sizes asset: {chrom_sizes}") return chrom_sizes - - def get_bed_type(self, bed: str) -> str: - """ - get the bed file type (ex. bed3, bed3+n ) - standardize chromosomes if necessary: - filter the input file to contain only the standard chromosomes, - remove regions on ChrUn chromosomes - - :param bed: path to the bed file - :return bed type - """ - # column format for bed12 - # string chrom; "Reference sequence chromosome or scaffold" - # uint chromStart; "Start position in chromosome" - # uint chromEnd; "End position in chromosome" - # string name; "Name of item." - # uint score; "Score (0-1000)" - # char[1] strand; "+ or - for strand" - # uint thickStart; "Start of where display should be thick (start codon)" - # uint thickEnd; "End of where display should be thick (stop codon)" - # uint reserved; "Used as itemRgb as of 2004-11-22" - # int blockCount; "Number of blocks" - # int[blockCount] blockSizes; "Comma separated list of block sizes" - # int[blockCount] chromStarts; "Start positions relative to chromStart" - df = pd.read_csv(bed, sep="\t", header=None) - df = df.dropna(axis=1) - - # standardizing chromosome - # remove regions on ChrUn chromosomes - if self.standard_chrom: - _LOGGER.info("Standardizing chromosomes...") - df = df[df.loc[:, 0].isin(STANDARD_CHROM_LIST)] - df.to_csv(bed, compression="gzip", sep="\t", header=False, index=False) - - num_cols = len(df.columns) - bedtype = 0 - for col in df: - if col <= 2: - if col == 0: - if df[col].dtype == "O": - bedtype += 1 - else: - return None - else: - if df[col].dtype == "int" and (df[col] >= 0).all(): - bedtype += 1 - else: - return None - else: - if col == 3: - if df[col].dtype == "O": - bedtype += 1 - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" - elif col == 4: - if df[col].dtype == "int" and df[col].between(0, 1000).all(): - bedtype += 1 - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" - elif col == 5: - if df[col].isin(["+", "-", "."]).all(): - bedtype += 1 - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" - elif 6 <= col <= 8: - if df[col].dtype == "int" and (df[col] >= 0).all(): - bedtype += 1 - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" - elif col == 9: - if df[col].dtype == "int": - bedtype += 1 - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" - elif col == 10 or col == 11: - if df[col].str.match(r"^(\d+(,\d+)*)?$").all(): - bedtype += 1 - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" - else: - n = num_cols - bedtype - return f"bed{bedtype}+{n}" diff --git a/bedboss/exceptions.py b/bedboss/exceptions.py index d84d06d..afd6f03 100644 --- a/bedboss/exceptions.py +++ b/bedboss/exceptions.py @@ -46,3 +46,16 @@ def __init__(self, reason: str = ""): :param str reason: additional info about requirements exception """ super(RequirementsException, self).__init__(reason) + + +class BedTypeException(BedBossException): + """Exception when Bed Type could not be determined.""" + + def __init__(self, reason: str = ""): + """ + Optionally provide explanation for exceptional condition. + + :param str reason: some context why error occurred while + using Open Signal Matrix + """ + super(BedTypeException, self).__init__(reason) diff --git a/test/test_bedclassifier.py b/test/test_bedclassifier.py new file mode 100644 index 0000000..aac980e --- /dev/null +++ b/test/test_bedclassifier.py @@ -0,0 +1,55 @@ +import os +import pytest +from tempfile import TemporaryDirectory + +from bedboss.bedclassifier import BedClassifier, get_bed_type + + +FILE_DIR = os.path.dirname(os.path.realpath(__file__)) +HG19_CORRECT_DIR = os.path.join(FILE_DIR, "test_data", "bed", "hg19", "correct") +FILE_PATH = f"{HG19_CORRECT_DIR}/sample1.bed.gz" +FILE_PATH_UNZIPPED = f"{HG19_CORRECT_DIR}/hg19_example1.bed" + + +@pytest.mark.skip(reason="Illegal seek during teardown.") +def test_classification(): + with TemporaryDirectory() as d: + bedclass = BedClassifier(input_file=FILE_PATH, output_dir=d) + + +def test_get_bed_type(): + bedtype = get_bed_type(bed=FILE_PATH_UNZIPPED) + assert bedtype == ("bed6+3", "bed") + + +@pytest.mark.skip(reason="Not implemented") +def test_from_PEPhub_beds(): + """""" + # TODO implement testing from pephub + pass + + +# def test_manual_dir_beds(): +# """This test is currently just for local manual testing""" +# # local_dir = "/home/drc/Downloads/test_beds_BED_classifier/" +# # local_dir = "/home/drc/Downloads/individual_beds/" +# local_dir = "/home/drc/Downloads/only_narrowpeaks/" +# output_dir = "/home/drc/Downloads/BED_CLASSIFIER_OUTPUT/" +# +# for root, dirs, files in os.walk(local_dir): +# for file in files: +# print(file) +# file_path = os.path.join(root, file) +# print(file_path) +# bedclass = BedClassifier( +# input_file=file_path, output_dir=output_dir, bed_digest=file +# ) +# print("\nDEBUG BEDCLASS\n") +# print(bedclass.bed_type) +# print("+++++++++++++++++++") + + +# if __name__ == "__main__": +# test_get_bed_type() +# test_classification() +# test_manual_dir_beds()