Skip to content

Commit

Permalink
Add multi-parent support to salmon_rnaseq_snareseq and sc_atac_seq_snare
Browse files Browse the repository at this point in the history
  • Loading branch information
jswelling committed Aug 30, 2020
1 parent 0e1b2f1 commit d362e21
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
1 change: 0 additions & 1 deletion src/ingest-pipeline/airflow/dags/bulk_atacseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def build_cwltool_cmd1(**kwargs):
run_id = kwargs['run_id']
tmpdir = Path(utils.get_tmp_dir_path(run_id))

# TODO: expand this to multiple directories in whatever way is appropriate
data_dirs = ctx['parent_lz_path']
data_dirs = [data_dirs] if isinstance(data_dirs, str) else data_dirs

Expand Down
19 changes: 12 additions & 7 deletions src/ingest-pipeline/airflow/dags/salmon_rnaseq_snareseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@
cwl_workflow1 = os.path.join(pipeline_name, 'pipeline.cwl')
cwl_workflow2 = os.path.join('portal-containers', 'h5ad-to-arrow.cwl')


def build_dataset_name(**kwargs):
return '{}__{}__{}'.format(dag.dag_id,
kwargs['dag_run'].conf['parent_submission_id'],
pipeline_name)
id_l = kwargs['dag_run'].conf['parent_submission_id']
inner_str = id_l if isinstance(id_l, str) else '_'.join(id_l)
return f'{dag.dag_id}__{inner_str}__{pipeline_name}'


# prepare_cwl1 = PythonOperator(
# python_callable=utils.clone_or_update_pipeline,
Expand All @@ -92,8 +94,10 @@ def build_cwltool_cmd1(**kwargs):
run_id = kwargs['run_id']
tmpdir = Path(utils.get_tmp_dir_path(run_id))
print('tmpdir: ', tmpdir)
data_dir = ctx['parent_lz_path']
print('data_dir: ', data_dir)

data_dirs = ctx['parent_lz_path']
data_dirs = [data_dirs] if isinstance(data_dirs, str) else data_dirs
print('data_dirs: ', data_dirs)

command = [
*get_cwltool_base_cmd(tmpdir),
Expand All @@ -102,11 +106,12 @@ def build_cwltool_cmd1(**kwargs):
os.path.join(tmpdir, 'cwl_out'),
'--parallel',
os.fspath(PIPELINE_BASE_DIR / cwl_workflow1),
'--fastq_dir',
data_dir,
'--threads',
str(THREADS),
]
for data_dir in data_dirs:
command.append('--fastq_dir')
command.append(data_dir)

# command = [
# 'cp',
Expand Down
19 changes: 11 additions & 8 deletions src/ingest-pipeline/airflow/dags/sc_atac_seq_snare.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@
for workflow in cwl_workflows
]


def build_dataset_name(**kwargs):
return '{}__{}__{}'.format(dag.dag_id,
kwargs['dag_run'].conf['parent_submission_id'],
pipeline_name
)
id_l = kwargs['dag_run'].conf['parent_submission_id']
inner_str = id_l if isinstance(id_l, str) else '_'.join(id_l)
return f'{dag.dag_id}__{inner_str}__{pipeline_name}'


prepare_cwl1 = DummyOperator(
task_id='prepare_cwl1'
Expand All @@ -85,20 +86,22 @@ def build_cwltool_cmd1(**kwargs):
run_id = kwargs['run_id']
tmpdir = Path(utils.get_tmp_dir_path(run_id))
print('tmpdir: ', tmpdir)
data_dir = ctx['parent_lz_path']
print('data_dir: ', data_dir)
data_dirs = ctx['parent_lz_path']
data_dirs = [data_dirs] if isinstance(data_dirs, str) else data_dirs
print('data_dirs: ', data_dirs)

command = [
*get_cwltool_base_cmd(tmpdir),
'--outdir',
os.path.join(tmpdir, 'cwl_out'),
'--parallel',
os.fspath(cwl_workflows_absolute[0]),
'--sequence_directory',
data_dir,
'--threads',
str(THREADS),
]
for data_dir in data_dirs:
command.append('--sequence_directory')
command.append(data_dir)

command_str = ' '.join(shlex.quote(piece) for piece in command)
print('final command_str: {!r}'.format(command_str))
Expand Down
6 changes: 6 additions & 0 deletions src/ingest-pipeline/airflow/dags/workflow_map.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ workflow_map:
- 'collection_type': 'single_metadatatsv'
'assay_type': 'SNARE-seq2'
'workflow': 'sc_atac_seq_snare'
- 'collection_type': 'snare_atac_collection'
'assay_type': 'SNAREseq'
'workflow': 'sc_atac_seq_snare'
- 'collection_type': 'single_metadatatsv'
'assay_type': 'sciATACseq'
'workflow': 'sc_atac_seq_sci'
Expand All @@ -38,6 +41,9 @@ workflow_map:
- 'collection_type': 'single_metadatatsv'
'assay_type': 'SNARE2-RNAseq'
'workflow': 'salmon_rnaseq_snareseq'
- 'collection_type': 'snare_rnaseq_collection'
'assay_type': 'SNAREseq'
'workflow': 'salmon_rnaseq_snareseq'
- 'collection_type': '.*'
'assay_type': 'Imaging Mass Cytometry'
'workflow': 'ometiff_pyramid'
Expand Down

0 comments on commit d362e21

Please sign in to comment.