Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove hard coding of wf details from cleanup functions #115

Merged
merged 8 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Change Log
------


3.2.0
=====

* refactor of cleanup.py to get info about accepted workflow versions and run times from db and remove hard coded array
* updated find_and_release notebook to utilize this - not backward compatible
* corrected small bug in delete_wfr function that failed to cleanup errored or duplicated workflow runs on user submitted files as they are not outputs of wfrs


3.1.1
=====

Expand Down
103 changes: 33 additions & 70 deletions functions/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,33 @@
from dcicutils import ff_utils
from datetime import datetime

# accepted workflows
# workflow name, accepted revision numbers (0 if none), accetable run time (hours)
workflow_details = [
# TODO: take this info from foursight
# common ones
['md5', ['0.0.4', '0.2.6'], 12],
['fastqc-0-11-4-1', ['0.2.0'], 50],
['fastqc', ['v1', 'v2'], 50],
# 4dn ones
['bwa-mem', ['0.2.6'], 50],
['pairsqc-single', ['0.2.5', '0.2.6'], 100],
['hi-c-processing-bam', ['0.2.6'], 50],
['hi-c-processing-pairs', ['0.2.6', '0.2.7'], 200],
['hi-c-processing-pairs-nore', ['0.2.6'], 200],
['hi-c-processing-pairs-nonorm', ['0.2.6'], 200],
['hi-c-processing-pairs-nore-nonorm', ['0.2.6'], 200],
['imargi-processing-fastq', ["1.1.1_dcic_4"], 200],
['imargi-processing-bam', ["1.1.1_dcic_4"], 200],
['imargi-processing-pairs', ["1.1.1_dcic_4"], 200],
['repliseq-parta', ['v13.1', 'v14', 'v16', 'v16.1'], 200],
['bedGraphToBigWig', ['v4'], 24],
['bedtobeddb', ['v2', 'v3'], 24],
['encode-chipseq-aln-chip', ['1.1.1', '2.1.6'], 200],
['encode-chipseq-aln-ctl', ['1.1.1','2.1.6'], 200],
['encode-chipseq-postaln', ['1.1.1','2.1.6'], 200],
['encode-atacseq-aln', ['1.1.1'], 200],
['encode-atacseq-postaln', ['1.1.1'], 200],
['mergebed', ['v1'], 200],
['merge-fastq', ['v1'], 200],
['bamqc', ['v2', 'v3'], 200],
['encode-rnaseq-stranded', ['1.1'], 200],
['encode-rnaseq-unstranded', ['1.1'], 200],
['rna-strandedness', ['v2'], 200],
['fastq-first-line', ['v2'], 200],
['re_checker_workflow', ['v1.1', 'v1.2'], 200],
['mad_qc_workflow', ['1.1_dcic_2'], 200],
['insulation-scores-and-boundaries-caller', ['v1'], 200],
['compartments-caller', ['v1.2'], 200],
['mcoolQC', ['v1'], 200],
# cgap ones
['workflow_bwa-mem_no_unzip-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 48],
['workflow_add-readgroups-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_merge-bam-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_picard-MarkDuplicates-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_sort-bam-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-BaseRecalibrator', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-ApplyBQSR-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_index-sorted-bam', ['v9'], 12],
['workflow_gatk-HaplotypeCaller', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-CombineGVCFs', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-GenotypeGVCFs-check', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-VQSR-check', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_qcboard-bam', ['v9'], 12],
['workflow_cram2fastq', ['v12', 'v13'], 12],
]

workflow_names = [i[0] for i in workflow_details]
# function to get workflow_details info from db
# initial datastructure that is the same as that to get info for foursight is transformed
# into the format used in the cleanup functions
# workflow name, accepted revision numbers (0 if none), accetable run time (hours)
def get_workflow_details(my_auth):
wf_details = {}
wf_query = "search/?type=Workflow&tags=current&tags=accepted&field=max_runtime" \
"&app_name!=No value&app_version!=No value&field=app_name&field=app_version"
workflows = ff_utils.search_metadata(wf_query, my_auth)
for wf in workflows:
app_name = wf.get('app_name')
app_version = wf.get('app_version')
run_time = wf.get('max_runtime', 0)
wf_details.setdefault(app_name, {})
wf_details[app_name].setdefault('accepted_versions', []).append(app_version)
wf_details[app_name].setdefault('run_time', run_time)
# for unexpected case of different wf items with same app_name having
# different run times - use max value
if run_time > wf_details[app_name].get('run_time'):
wf_details[app_name]['run_time'] = run_time
# here is the transformation
# workflow_details = []
# for wfname, wf_info in wf_details.items():
# workflow_details.append((wfname, wf_info.get('accepted_versions'), [], wf_info.get('run_time')))
return [(wfname, wf_details[wfname].get('accepted_versions', []), wf_details[wfname].get('run_time'))
for wfname in wf_details.keys()]


def fetch_pf_associated(pf_id_or_dict, my_key):
Expand Down Expand Up @@ -116,19 +86,14 @@ def get_wfr_report(wfrs):
# skip all style awsem runs
try:
wfr_type_base, wfr_version = wfr_type.strip().split(' ')
except:
except Exception:
continue
time_info = time_info.strip('on').strip()
try:
wfr_time = datetime.strptime(time_info, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
wfr_time = datetime.strptime(time_info, '%Y-%m-%d %H:%M:%S')
run_hours = (datetime.utcnow() - wfr_time).total_seconds() / 3600
# try:
# wfr_time = datetime.strptime(wfr_data['date_created'], '%Y-%m-%dT%H:%M:%S.%f+00:00')
# except ValueError: # if it was exact second, no fraction is in value
# print("wfr time bingo", wfr_uuid)
# wfr_time = datetime.strptime(wfr_data['date_created'], '%Y-%m-%dT%H:%M:%S+00:00')
output_files = wfr_data.get('output_files', None)
output_uuids = []
qc_uuids = []
Expand Down Expand Up @@ -158,7 +123,7 @@ def get_wfr_report(wfrs):
return wfr_report


def delete_wfrs(file_resp, my_key, delete=False, stash=None):
def delete_wfrs(file_resp, my_key, workflow_details, delete=False, stash=None):
# file_resp in embedded frame
# stash: all related wfrs for file_resp
deleted_wfrs = [] # reports WorkflowRun items deleted by this function
Expand All @@ -170,16 +135,13 @@ def delete_wfrs(file_resp, my_key, delete=False, stash=None):
# do not delete output wfrs of control files
output_wfrs = file_resp.get('workflow_run_outputs')
if not output_wfrs:
if file_type == 'files-processed':
# user submtted processed files
return
else:
# raw files:
pass
# user submitted and raw files generally lack wfr_outputs but they can still have
# duplicate and errored runs so changed return (for file_processed) to pass for all
pass
else:
output_wfr = output_wfrs[0]
wfr_type, _ = output_wfr['display_title'].split(' run ')
if wfr_type in ['encode-chipseq-aln-ctl 1.1.1', 'encode-chipseq-aln-ctl 2.1.6'] :
if wfr_type in ['encode-chipseq-aln-ctl 1.1.1', 'encode-chipseq-aln-ctl 2.1.6']:
print('skipping control file for wfr check', file_resp['accession'])
return

Expand Down Expand Up @@ -221,6 +183,7 @@ def _delete_action(wfr_to_del):
return

# CLEAN UP IF FILE IS DELETED
workflow_names = [wfinfo[0] for wfinfo in workflow_details]
if file_resp['status'] == 'deleted':
if file_resp.get('quality_metric'):
if delete:
Expand Down
Loading
Loading