Skip to content

Commit

Permalink
Merge pull request #141 from hubmapconsortium/mruffalo/workflow-funct…
Browse files Browse the repository at this point in the history
…ion-cleanups

Additional workflow function cleanups
  • Loading branch information
jswelling authored Aug 31, 2020
2 parents d362e21 + 69e4899 commit cf39e50
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 331 deletions.
22 changes: 9 additions & 13 deletions src/ingest-pipeline/airflow/dags/bulk_atacseq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import json
import shlex
from pathlib import Path
from pprint import pprint
from datetime import datetime, timedelta
Expand Down Expand Up @@ -29,6 +28,7 @@
get_dataset_uuid,
get_parent_dataset_uuid,
get_uuid_for_error,
join_quote_command_str,
localized_assert_json_matches_schema as assert_json_matches_schema,
)

Expand Down Expand Up @@ -58,7 +58,7 @@
) as dag:
pipeline_name = 'bulk-atac-seq'
cwl_workflows = get_absolute_workflows(
[Path(pipeline_name, 'bulk-atac-seq-pipeline.cwl')],
Path(pipeline_name, 'bulk-atac-seq-pipeline.cwl'),
)

def build_dataset_name(**kwargs):
Expand All @@ -79,19 +79,17 @@ def build_cwltool_cmd1(**kwargs):
command = [
*get_cwltool_base_cmd(tmpdir),
'--outdir',
os.path.join(tmpdir, 'cwl_out'),
tmpdir / 'cwl_out',
'--parallel',
os.fspath(cwl_workflows[0]),
cwl_workflows[0],
'--threads',
str(THREADS),
THREADS,
]
for data_dir in data_dirs:
command.append('--sequence_directory')
command.append(data_dir)

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: {!r}'.format(command_str))
return command_str
return join_quote_command_str(command)

t_build_cmd1 = PythonOperator(
task_id='build_cmd1',
Expand Down Expand Up @@ -173,10 +171,8 @@ def send_status_msg(**kwargs):

if success:
md = {}
files_for_provenance = [
__file__,
*cwl_workflows,
]
files_for_provenance = [__file__, *cwl_workflows]

if 'dag_provenance' in kwargs['dag_run'].conf:
md['dag_provenance'] = kwargs['dag_run'].conf['dag_provenance'].copy()
new_prv_dct = utils.get_git_provenance_dict(files_for_provenance)
Expand All @@ -188,7 +184,7 @@ def send_status_msg(**kwargs):
dag_prv.extend(utils.get_git_provenance_list(files_for_provenance))
md['dag_provenance_list'] = dag_prv

manifest_files = find_pipeline_manifests(*cwl_workflows)
manifest_files = find_pipeline_manifests(cwl_workflows)
md.update(
utils.get_file_metadata_dict(
ds_dir,
Expand Down
66 changes: 23 additions & 43 deletions src/ingest-pipeline/airflow/dags/codex_cytokit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import json
import shlex
from pathlib import Path
from pprint import pprint
from datetime import datetime, timedelta
Expand All @@ -22,14 +21,15 @@

import utils
from utils import (
PIPELINE_BASE_DIR,
decrypt_tok,
find_pipeline_manifests,
get_absolute_workflows,
get_cwltool_base_cmd,
get_dataset_uuid,
get_parent_dataset_uuid,
get_uuid_for_error,
localized_assert_json_matches_schema as assert_json_matches_schema,
decrypt_tok
join_quote_command_str,
)


Expand Down Expand Up @@ -57,9 +57,11 @@
) as dag:

pipeline_name = 'codex-pipeline'
cwl_workflow1 = os.path.join(pipeline_name, 'pipeline.cwl')
cwl_workflow2 = os.path.join('portal-containers', 'ome-tiff-offsets.cwl')
cwl_workflow3 = os.path.join('portal-containers', 'sprm-to-json.cwl')
cwl_workflows = get_absolute_workflows(
Path(pipeline_name, 'pipeline.cwl'),
Path('portal-containers', 'ome-tiff-offsets.cwl'),
Path('portal-containers', 'sprm-to-json.cwl'),
)

def build_dataset_name(**kwargs):
return '{}__{}__{}'.format(dag.dag_id,
Expand Down Expand Up @@ -87,23 +89,13 @@ def build_cwltool_cmd1(**kwargs):

command = [
*get_cwltool_base_cmd(tmpdir),
os.fspath(PIPELINE_BASE_DIR / cwl_workflow1),
cwl_workflows[0],
'--gpus=0,1',
'--data_dir',
data_dir,
]

# command = [
# 'cp',
# '-R',
# os.path.join(os.environ['AIRFLOW_HOME'],
# 'data', 'temp', 'std_salmon_out', 'cwl_out'),
# tmpdir
# ]

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: %s' % command_str)
return command_str

return join_quote_command_str(command)


t_build_cmd1 = PythonOperator(
Expand Down Expand Up @@ -157,14 +149,12 @@ def build_cwltool_cmd2(**kwargs):

command = [
*get_cwltool_base_cmd(tmpdir),
os.fspath(PIPELINE_BASE_DIR / cwl_workflow2),
cwl_workflows[1],
'--input_dir',
os.path.join(data_dir, 'output', 'extract', 'expressions', 'ome-tiff')
data_dir / 'output/extract/expressions/ome-tiff',
]

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: %s' % command_str)
return command_str
return join_quote_command_str(command)


t_build_cmd2 = PythonOperator(
Expand Down Expand Up @@ -211,19 +201,17 @@ def build_cwltool_cmd3(**kwargs):
print('tmpdir: ', tmpdir)
parent_data_dir = ctx['parent_lz_path']
print('parent_data_dir: ', parent_data_dir)
data_dir = os.path.join(tmpdir, 'cwl_out') # This stage reads input from stage 1
data_dir = tmpdir / 'cwl_out' # This stage reads input from stage 1
print('data_dir: ', data_dir)

command = [
*get_cwltool_base_cmd(tmpdir),
os.fspath(PIPELINE_BASE_DIR / cwl_workflow3),
cwl_workflows[2],
'--input_dir',
os.path.join(data_dir, 'sprm_outputs')
data_dir / 'sprm_outputs',
]

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: %s' % command_str)
return command_str
return join_quote_command_str(command)


t_build_cmd3 = PythonOperator(
Expand Down Expand Up @@ -319,27 +307,19 @@ def send_status_msg(**kwargs):

if success:
md = {}

workflows = [cwl_workflow1,
cwl_workflow2,
cwl_workflow3]
files_for_provenance = [__file__, *cwl_workflows]

if 'dag_provenance' in kwargs['dag_run'].conf:
md['dag_provenance'] = kwargs['dag_run'].conf['dag_provenance'].copy()
new_prv_dct = utils.get_git_provenance_dict([__file__]
+ [PIPELINE_BASE_DIR / cwl
for cwl in workflows])
new_prv_dct = utils.get_git_provenance_dict(files_for_provenance)
md['dag_provenance'].update(new_prv_dct)
else:
dag_prv = (kwargs['dag_run'].conf['dag_provenance_list']
if 'dag_provenance_list' in kwargs['dag_run'].conf
else [])
dag_prv.extend(utils.get_git_provenance_list([__file__]
+ [PIPELINE_BASE_DIR / cwl
for cwl in workflows]))
dag_prv.extend(utils.get_git_provenance_list(files_for_provenance))
md['dag_provenance_list'] = dag_prv
manifest_files = find_pipeline_manifests(
*[PIPELINE_BASE_DIR / cwl for cwl in workflows]
)
manifest_files = find_pipeline_manifests(cwl_workflows)
md.update(utils.get_file_metadata_dict(ds_dir,
utils.get_tmp_dir_path(kwargs['run_id']),
manifest_files))
Expand Down
49 changes: 20 additions & 29 deletions src/ingest-pipeline/airflow/dags/ometiff_pyramid.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import json
import shlex
from pathlib import Path
from pprint import pprint
from datetime import datetime, timedelta
Expand All @@ -24,14 +23,15 @@

import utils
from utils import (
PIPELINE_BASE_DIR,
decrypt_tok,
find_pipeline_manifests,
get_cwltool_base_cmd,
get_absolute_workflows,
get_dataset_uuid,
get_parent_dataset_uuid,
get_uuid_for_error,
join_quote_command_str,
localized_assert_json_matches_schema as assert_json_matches_schema,
decrypt_tok
)

# after running this DAG you should have on disk
Expand Down Expand Up @@ -60,14 +60,15 @@
user_defined_macros={'tmp_dir_path' : utils.get_tmp_dir_path}
) as dag:

#does the name need to match the filename?
# does the name need to match the filename?
pipeline_name = 'ometiff_pyramid'

#this workflow creates the image pyramid
cwl_workflow1 = os.path.join('ome-tiff-pyramid', 'pipeline.cwl')

#this workflow computes the offsets
cwl_workflow2 = os.path.join('portal-containers', 'ome-tiff-offsets.cwl')
cwl_workflows = get_absolute_workflows(
# this workflow creates the image pyramid
Path('ome-tiff-pyramid', 'pipeline.cwl'),
# this workflow computes the offsets
Path('portal-containers', 'ome-tiff-offsets.cwl'),
)

def build_dataset_name(**kwargs):
return '{}__{}__{}'.format(dag.dag_id,
Expand Down Expand Up @@ -95,14 +96,12 @@ def build_cwltool_cmd1(**kwargs):
#this is the call to the CWL
command = [
*get_cwltool_base_cmd(tmpdir),
os.fspath(PIPELINE_BASE_DIR / cwl_workflow1),
cwl_workflows[0],
'--ometiff_directory',
data_dir
data_dir,
]

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: %s' % command_str)
return command_str
return join_quote_command_str(command)

t_build_cmd1 = PythonOperator(
task_id='build_cmd1',
Expand Down Expand Up @@ -143,14 +142,12 @@ def build_cwltool_cmd2(**kwargs):
#this is the call to the CWL
command = [
*get_cwltool_base_cmd(tmpdir),
os.fspath(PIPELINE_BASE_DIR / cwl_workflow2),
cwl_workflows[1],
'--input_directory',
'./ometiff-pyramids'
'./ometiff-pyramids',
]

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: %s' % command_str)
return command_str
return join_quote_command_str(command)

t_build_cmd2 = PythonOperator(
task_id='build_cmd2',
Expand Down Expand Up @@ -251,25 +248,19 @@ def send_status_msg(**kwargs):

if success:
md = {}
files_for_provenance = [__file__, *cwl_workflows]

workflows = [cwl_workflow1, cwl_workflow2]
if 'dag_provenance' in kwargs['dag_run'].conf:
md['dag_provenance'] = kwargs['dag_run'].conf['dag_provenance'].copy()
new_prv_dct = utils.get_git_provenance_dict([__file__]
+ [PIPELINE_BASE_DIR / cwl
for cwl in workflows])
new_prv_dct = utils.get_git_provenance_dict(files_for_provenance)
md['dag_provenance'].update(new_prv_dct)
else:
dag_prv = (kwargs['dag_run'].conf['dag_provenance_list']
if 'dag_provenance_list' in kwargs['dag_run'].conf
else [])
dag_prv.extend(utils.get_git_provenance_list([__file__]
+ [PIPELINE_BASE_DIR / cwl
for cwl in workflows]))
dag_prv.extend(utils.get_git_provenance_list(files_for_provenance))
md['dag_provenance_list'] = dag_prv
manifest_files = find_pipeline_manifests(
*[PIPELINE_BASE_DIR / cwl for cwl in workflows]
)
manifest_files = find_pipeline_manifests(cwl_workflows)
md.update(utils.get_file_metadata_dict(ds_dir,
utils.get_tmp_dir_path(kwargs['run_id']),
manifest_files))
Expand Down
Loading

0 comments on commit cf39e50

Please sign in to comment.