Skip to content

Commit

Permalink
remove pypiper, just use pipestat and pephub
Browse files Browse the repository at this point in the history
  • Loading branch information
donaldcampbelljr committed May 29, 2024
1 parent 08f96d9 commit 8ca32cc
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 27 deletions.
23 changes: 23 additions & 0 deletions scripts/bedclassifier_tuning/bedclassifier_output_schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
title: Bed Classifier
description: Output for bed classification results
type: object
properties:
pipeline_name: "bedclassifier"
samples:
type: object
properties:
bedfile_named:
type: string
description: "reported bedfile name e.g. narrowpeak"
bedfile_type:
type: string
description: "reported bedfile type"
given_bedfile_type:
type: string
description: "given bed file type"
types_match:
type: boolean
description: "Do the types match?"
gsm:
type: string
description: "given gsm"
108 changes: 81 additions & 27 deletions scripts/bedclassifier_tuning/bedclassify.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import os
import shutil

import pipestat
import pypiper
from typing import Optional

Expand Down Expand Up @@ -30,9 +32,13 @@ def __init__(
input_type: Optional[str] = None,
pm: pypiper.PipelineManager = None,
report_to_database: Optional[bool] = False,
psm: pipestat.PipestatManager = None,
gsm: str = None,
):
# Raise Exception if input_type is given and it is NOT a BED file
# Raise Exception if the input file cannot be resolved

self.gsm = gsm
self.input_file = input_file
self.bed_digest = bed_digest
self.input_type = input_type
Expand All @@ -47,19 +53,28 @@ def __init__(
)
# Use existing Pipeline Manager or Construct New one
# Want to use Pipeline Manager to log work AND cleanup unzipped gz files.
if pm is not None:
self.pm = pm
self.pm_created = False
else:
self.logs_dir = os.path.join(self.output_dir, "logs")
self.pm = pypiper.PipelineManager(
name="bedclassifier",
outfolder=self.logs_dir,
recover=True,
pipestat_sample_name=bed_digest,
# if pm is not None:
# self.pm = pm
# self.pm_created = False
# else:
# self.logs_dir = os.path.join(self.output_dir, "logs")
# self.pm = pypiper.PipelineManager(
# name="bedclassifier",
# outfolder=self.logs_dir,
# recover=True,
# pipestat_sample_name=bed_digest,
# )
# self.pm.start_pipeline()
# self.pm_created = True

if psm is None:
pephuburl = "donaldcampbelljr/bedclassifier_tuning_geo:default"
self.psm = pipestat.PipestatManager(
pephub_path=pephuburl, schema_path="bedclassifier_output_schema.yaml"
)
self.pm.start_pipeline()
self.pm_created = True
# create piepstat manager
else:
self.psm = psm

if self.file_extension == ".gz":
unzipped_input_file = os.path.join(self.output_dir, self.file_name)
Expand All @@ -71,20 +86,48 @@ def __init__(
with open(unzipped_input_file, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
self.input_file = unzipped_input_file
self.pm.clean_add(unzipped_input_file)
# self.pm.clean_add(unzipped_input_file)

self.bed_type = get_bed_type(self.input_file)
self.bed_type, self.bed_type_named = get_bed_type(self.input_file)
# return f"bed{bedtype}+{n}", bed_type_named

if self.input_type is not None:
if self.bed_type != self.input_type:
if self.bed_type_named != self.input_type:
_LOGGER.warning(
f"BED file classified as different type than given input: {self.bed_type} vs {self.input_type}"
)
do_types_match = False
else:
do_types_match = True
else:
do_types_match = False

# Create Value Dict to report via pipestat

all_values = {}

if self.input_type:
all_values.update({"given_bedfile_type": self.input_type})
if self.bed_type:
all_values.update({"bedfile_type": self.bed_type})
if self.bed_type_named:
all_values.update({"bedfile_named": self.bed_type_named})
if self.gsm:
all_values.update({"gsm": self.gsm})

all_values.update({"types_match": do_types_match})

self.pm.report_result(key="bedtype", value=self.bed_type)
try:
psm.report(record_identifier=bed_digest, values=all_values)
# psm.set_status(record_identifier=bed_digest, status_identifier="completed")
except Exception as e:
_LOGGER.warning(msg=f"FAILED {bed_digest} Exception {e}")
# psm.set_status(record_identifier=bed_digest, status_identifier="failed")

if self.pm_created is True:
self.pm.stop_pipeline()
# self.pm.report_result(key="bedtype", value=self.bed_type)

# if self.pm_created is True:
# self.pm.stop_pipeline()


def main():
Expand Down Expand Up @@ -122,17 +165,28 @@ def main():

print(samples)

pephuburl = "donaldcampbelljr/bedclassifier_tuning_geo:default"
psm = pipestat.PipestatManager(
pephub_path=pephuburl, schema_path="bedclassifier_output_schema.yaml"
)

for sample in samples:
bedfile = sample.output_file_path[0]
if isinstance(sample.output_file_path, list):
bedfile = sample.output_file_path[0]
else:
bedfile = sample.output_file_path
geo_accession = sample.sample_geo_accession

# bed = BedClassifier(
# input_file="/home/drc/GITHUB/bedboss/bedboss/test/data/bed/simpleexamples/bed1.bed",
# bed_digest="bed1.bed",
# output_dir=os.path.abspath("results")
#
#
# )
sample_name = sample.sample_name
bed_type_from_geo = sample.type.lower()

bed = BedClassifier(
input_file=bedfile,
bed_digest=sample_name, # TODO FIX THIS IT HOULD BE AN ACTUAL DIGEST
output_dir=os.path.abspath("results"),
input_type=bed_type_from_geo,
psm=psm,
gsm=geo_accession,
)

# Get list of Bed Files and Download them

Expand Down

0 comments on commit 8ca32cc

Please sign in to comment.