Skip to content

Commit

Permalink
Correct number of MTs loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakob37 committed Sep 2, 2024
1 parent 8647b7b commit 549342a
Showing 1 changed file with 76 additions and 60 deletions.
136 changes: 76 additions & 60 deletions bin/reference_tools/update_bed.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@

# import pdb

CLNSIG = "CLNSIG"
CLNACC = "CLNACC"
CLNDN = "CLNDN"
CLNSIGINCL = "CLNSIGINCL"
CLNSIGCONF = "CLNSIGCONF"


def main(
clinvar_vcf_path: Path,
clinvar_vcf_old_path: Path,
clinvardate: str,
out_dir: Path,
release: str,
ensembl: str,
skip_download: bool,
incl_bed: list[str],
):
Expand All @@ -25,61 +30,55 @@ def main(
if not out_dir.exists():
out_dir.mkdir(parents=True, exist_ok=True)

# gtf_fp = f'Homo_sapiens.GRCh38.{release}.gtf.gz'
ensembl_bed_fp = f"{out_dir}/exons_hg38_{release}.bed"

# FIXME: What is the difference here?
small_pad = 5
pad = 20
variant_pad = 5
exon_pad = 20
print("Write initial base")
write_ensembl(ensembl, ensembl_bed_fp, release, skip_download, pad)
write_ensembl(ensembl_bed_fp, release, skip_download, exon_pad)

final_bed_path = Path(
f"exons_{release}.padded{pad}bp_clinvar-{clinvardate}padded{small_pad}bp.bed"
f"exons_{release}.padded{exon_pad}bp_clinvar-{clinvardate}padded{variant_pad}bp.bed"
)

if final_bed_path.exists() and final_bed_path.is_file():
print(f"Removing file: {final_bed_path}")
print(f"Removing previous result file: {final_bed_path}")
final_bed_path.unlink()
final_bed_path.write_text("")

append_to_bed(str(final_bed_path), ensembl_bed_fp, f"EXONS-{release}")

if len(incl_bed) > 0:
print(f"Include {incl_bed}")
for bed_fp in incl_bed:
print(f"Additional included BED file: {bed_fp}")
suffix = bed_fp
append_to_bed(str(final_bed_path), bed_fp, suffix)
else:
print("No extra bed files to include")
print("No extra BED files to include")

(clinvar_new, new_benign) = read_clinvar(str(clinvar_vcf_path))
(clinvar_old, _old_benign) = read_clinvar(str(clinvar_vcf_old_path))

print(f"Number new: {len(clinvar_new)}")
print(f"Number old: {len(clinvar_old)}")
print(f"Number new benign: {len(new_benign)}")
print(f"Number old benign: {len(_old_benign)}")

clinvar_all: dict[str, Variant] = {**clinvar_new, **clinvar_old}

# clinvar_final_bed_fp = f"clinvar_{clinvardate}.bed"
(new_to_add, old_to_remove) = compare_clinvar(
clinvar_new, clinvar_old, str(final_bed_path), small_pad, out_dir
clinvar_new, clinvar_old, str(final_bed_path), variant_pad, out_dir
)

clinvar_log_path = Path(f"{out_dir}/clinvar_{clinvardate}.log")
# clinvar_final_bed_path = Path(f"{out_dir}/clinvar_{clinvardate}.bed")
if clinvar_log_path.exists():
clinvar_log_path.unlink()
# if clinvar_final_bed_path.exists():
# clinvar_final_bed_path.unlink()

log_changes(
str(clinvar_log_path), clinvar_all, new_to_add, old_to_remove, new_benign
)

# log_clinvar_bed_and_info(new_to_add, 'new', clinvar_log_path, new_benign)
# log_clinvar_bed_and_info(old_to_remove, 'old', clinvar_log_path, new_benign)
# log_clinvar_bed_and_info(new_to_add, 'bed', clinvar_final_bed_fp, new_benign)

# append_to_bed(str(final_bed_path), str(clinvar_final_bed_path), ".")

sort_merge_output(str(final_bed_path))

# sort_merge_output()
Expand Down Expand Up @@ -107,8 +106,6 @@ class Variant:
@staticmethod
def from_line(vcf_line: str) -> "Variant":

print(vcf_line)

vcf_line = vcf_line.rstrip()
fields = vcf_line.split("\t")
chrom = fields[0]
Expand Down Expand Up @@ -141,35 +138,27 @@ def __init__(
self.clndn = self.info["CLNDN"]

self.key = "FIXME"
# clnacc = self.info['CLNACC'] if self.info.get('CLNACC') is not None else ""
# self.fourth = '~'.join([self.key, self.reason, clnacc])

def get_bed_str(self, padding: int) -> str:
return f"{self.chrom}\t{self.pos - padding}\t{self.pos + padding}\t<INFO PLACEHOLDER>"

def get_bed_annot(self):
reason = self.reason
clnacc = (
self.info["CLNACC"] if self.info.get("CLNDN") is not None else "Undefined"
)
clndn = (
self.info["CLNDN"] if self.info.get("CLNDN") is not None else "Undefined"
)
clnacc = self.info[CLNACC] if self.info.get(CLNACC) is not None else "Undefined"
clndn = self.info[CLNDN] if self.info.get(CLNDN) is not None else "Undefined"
return (
f"{self.chrom}:{self.pos}_{self.ref}_{self.alt}~{reason}~{clnacc}~{clndn}"
)

def get_clnsig(self) -> str:
clnsig = self.info.get("CLNSIG")
clnsig = self.info.get(CLNSIG)
if clnsig != None:
return clnsig
else:
return "MISSING"


# fourth = [key, variant.reason, variant.info['CLNACC']]


# FIXME: Needed?
class BedEntry:
def __init__(self, line: str):
line = line.rstrip()
Expand All @@ -186,6 +175,7 @@ def __str__(self) -> str:
return "\t".join(out_fields)


# FIXME: Needed?
class GtfEntry:
def __init__(self, line: str):
line = line.rstrip()
Expand All @@ -198,12 +188,27 @@ def __init__(self, line: str):


def read_clinvar(vcf_fp: str) -> tuple[dict[str, Variant], dict[str, Variant]]:
"""
CLNSIG: Clinical significance of variant
CLNSIGINCL: Clinical significance for a haplotype or genotype that includes
this variant. Reported as pairs (VariantID:ClinSig).
CLNSIGCONF: Reviewer certainty
-
- Mitochondria variants are not included
"""

clinvar_variants: dict[str, Variant] = {}
benign_clinvar_variants: dict[str, Variant] = {}

nbr_skipped = 0
branch1_hits = 0
branch2_hits = 0
branch3_hits = 0

with open(vcf_fp, "r") as vcf_fh:
for line in vcf_fh:
if line.startswith("#"):
if line.startswith("#") or line.startswith("MT\t"):
continue
variant = Variant.from_line(line)

Expand All @@ -213,63 +218,75 @@ def read_clinvar(vcf_fp: str) -> tuple[dict[str, Variant], dict[str, Variant]]:

info = variant.info

if info.get("CLNSIG") is not None:
sig = info["CLNSIG"]
if info.get("CLNSIGINCL") is not None:
haplo = info["CLNSIGINCL"]
if info.get("CLNSIGCONF") is not None:
confidence = info["CLNSIGCONF"]
if info.get(CLNSIG) is not None:
sig = info[CLNSIG]
if info.get(CLNSIGINCL) is not None:
haplo = info[CLNSIGINCL]
if info.get(CLNSIGCONF) is not None:
confidence = info[CLNSIGCONF]

# FIXME: Annotate why
if sig != "" and haplo != "":
# Skipping entries where no significance is assigned while
# the haplotype is
if sig == "" and haplo != "":
nbr_skipped += 1
continue

keep = False
reason = None

# print("New entry")
# FIXME: Look over this part
if "Pathogenic" in sig or "Likely_pathogenic" in sig:
# print('First hit')
keep = True
reason = sig
elif sig in "Conflicting_interpretations_of_pathogenicity":
branch1_hits += 1
elif "Conflicting_interpretations_of_pathogenicity" in sig:
if "Pathogenic" in confidence or "Likely_pathogenic" in confidence:
# print('Second hit')
keep = True
reason = confidence
# FIXME: What is 'athogenic'
branch2_hits += 1
elif "athogenic" in haplo:
# print('Third hit')
keep = True
reason = haplo
branch3_hits += 1

key = f"{variant.chrom}:{variant.pos}_{variant.ref}_{variant.alt}"
if keep:
variant.reason = reason
# if key in clinvar_variants:
# print(f"I am already in clinvar_variants: {key}")
clinvar_variants[key] = variant
else:
variant.reason = reason
benign_clinvar_variants[key] = variant
# if key in benign_clinvar_variants:
# print(f"I am already in benign_clinvar_variants: {key}")

print(f"Nbr skipped: {nbr_skipped}")
print(f"Branch 1: {branch1_hits}")
print(f"Branch 2: {branch2_hits}")
print(f"Branch 3: {branch3_hits}")

return (clinvar_variants, benign_clinvar_variants)


def write_ensembl(
ensembl_fp: str, out_fp: str, release: str, skip_download: bool, padding: int
):
def write_ensembl(out_fp: str, release: str, skip_download: bool, exon_padding: int):

ensembl_tmp_gz_fp = "tmp.gtf.gz"
ensembl_tmp_fp = "tmp.gtf"

if not skip_download:
gtf_request = f"https://ftp.ensembl.org/pub/release-{release}/gtf/homo_sapiens/Homo_sapiens.GRCh38.{release}.gtf.gz"
tmp_fp = "tmp.gtf.gz"
download_gtf_cmd = ["wget", gtf_request, "-O", tmp_fp]
download_gtf_cmd = ["wget", gtf_request, "-O", ensembl_tmp_gz_fp]
subprocess.run(download_gtf_cmd, check=True)
gunzip_cmd = ["gunzip", "tmp.gtf.gz"]
gunzip_cmd = ["gunzip", ensembl_tmp_gz_fp]
subprocess.run(gunzip_cmd, check=True)
else:
# FIXME: Handle situation where not available
pass
if not Path(ensembl_tmp_fp).exists():
raise ValueError(
f"To skip download, the ENSEMBL GTF needs to be present at the location: {ensembl_tmp_fp}"
)

with open(ensembl_fp, "r") as gtf_fh, open(out_fp, "w") as out_fh:
with open(ensembl_tmp_fp, "r") as gtf_fh, open(out_fp, "w") as out_fh:
keep = False
for line in gtf_fh:
if line.startswith("#"):
Expand All @@ -287,7 +304,7 @@ def write_ensembl(
!= -1
)
if gtf_entry.molecule == "exon" and keep:
out_line = f"{gtf_entry.chr}\t{gtf_entry.start_pos - padding}\t{gtf_entry.end_pos + padding}"
out_line = f"{gtf_entry.chr}\t{gtf_entry.start_pos - exon_padding}\t{gtf_entry.end_pos + exon_padding}"
print(out_line, file=out_fh)


Expand Down Expand Up @@ -418,7 +435,6 @@ def append_clinvar_to_bed(bed_fp: str, clinvar_new: dict[str, Variant]):
clinvardate=args.clinvardate,
out_dir=Path(args.out_dir),
release=args.release,
ensembl=args.ensembl,
skip_download=args.skip_download,
incl_bed=args.incl_bed,
)

0 comments on commit 549342a

Please sign in to comment.