From d362e21ed5a472fb4d2660bc62c55367457258f4 Mon Sep 17 00:00:00 2001 From: Joel Welling Date: Sat, 29 Aug 2020 22:08:03 -0400 Subject: [PATCH] Add multi-parent support to salmon_rnaseq_snareseq and sc_atac_seq_snare --- .../airflow/dags/bulk_atacseq.py | 1 - .../airflow/dags/salmon_rnaseq_snareseq.py | 19 ++++++++++++------- .../airflow/dags/sc_atac_seq_snare.py | 19 +++++++++++-------- .../airflow/dags/workflow_map.yml | 6 ++++++ 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_atacseq.py b/src/ingest-pipeline/airflow/dags/bulk_atacseq.py index 03885351..75f707d4 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_atacseq.py +++ b/src/ingest-pipeline/airflow/dags/bulk_atacseq.py @@ -73,7 +73,6 @@ def build_cwltool_cmd1(**kwargs): run_id = kwargs['run_id'] tmpdir = Path(utils.get_tmp_dir_path(run_id)) - # TODO: expand this to multiple directories in whatever way is appropriate data_dirs = ctx['parent_lz_path'] data_dirs = [data_dirs] if isinstance(data_dirs, str) else data_dirs diff --git a/src/ingest-pipeline/airflow/dags/salmon_rnaseq_snareseq.py b/src/ingest-pipeline/airflow/dags/salmon_rnaseq_snareseq.py index 80594bf7..f42859fa 100644 --- a/src/ingest-pipeline/airflow/dags/salmon_rnaseq_snareseq.py +++ b/src/ingest-pipeline/airflow/dags/salmon_rnaseq_snareseq.py @@ -62,10 +62,12 @@ cwl_workflow1 = os.path.join(pipeline_name, 'pipeline.cwl') cwl_workflow2 = os.path.join('portal-containers', 'h5ad-to-arrow.cwl') + def build_dataset_name(**kwargs): - return '{}__{}__{}'.format(dag.dag_id, - kwargs['dag_run'].conf['parent_submission_id'], - pipeline_name) + id_l = kwargs['dag_run'].conf['parent_submission_id'] + inner_str = id_l if isinstance(id_l, str) else '_'.join(id_l) + return f'{dag.dag_id}__{inner_str}__{pipeline_name}' + # prepare_cwl1 = PythonOperator( # python_callable=utils.clone_or_update_pipeline, @@ -92,8 +94,10 @@ def build_cwltool_cmd1(**kwargs): run_id = kwargs['run_id'] tmpdir = Path(utils.get_tmp_dir_path(run_id)) print('tmpdir: ', tmpdir) - data_dir = ctx['parent_lz_path'] - print('data_dir: ', data_dir) + + data_dirs = ctx['parent_lz_path'] + data_dirs = [data_dirs] if isinstance(data_dirs, str) else data_dirs + print('data_dirs: ', data_dirs) command = [ *get_cwltool_base_cmd(tmpdir), @@ -102,11 +106,12 @@ def build_cwltool_cmd1(**kwargs): os.path.join(tmpdir, 'cwl_out'), '--parallel', os.fspath(PIPELINE_BASE_DIR / cwl_workflow1), - '--fastq_dir', - data_dir, '--threads', str(THREADS), ] + for data_dir in data_dirs: + command.append('--fastq_dir') + command.append(data_dir) # command = [ # 'cp', diff --git a/src/ingest-pipeline/airflow/dags/sc_atac_seq_snare.py b/src/ingest-pipeline/airflow/dags/sc_atac_seq_snare.py index c9ea302e..00eb316a 100644 --- a/src/ingest-pipeline/airflow/dags/sc_atac_seq_snare.py +++ b/src/ingest-pipeline/airflow/dags/sc_atac_seq_snare.py @@ -66,11 +66,12 @@ for workflow in cwl_workflows ] + def build_dataset_name(**kwargs): - return '{}__{}__{}'.format(dag.dag_id, - kwargs['dag_run'].conf['parent_submission_id'], - pipeline_name - ) + id_l = kwargs['dag_run'].conf['parent_submission_id'] + inner_str = id_l if isinstance(id_l, str) else '_'.join(id_l) + return f'{dag.dag_id}__{inner_str}__{pipeline_name}' + prepare_cwl1 = DummyOperator( task_id='prepare_cwl1' @@ -85,8 +86,9 @@ def build_cwltool_cmd1(**kwargs): run_id = kwargs['run_id'] tmpdir = Path(utils.get_tmp_dir_path(run_id)) print('tmpdir: ', tmpdir) - data_dir = ctx['parent_lz_path'] - print('data_dir: ', data_dir) + data_dirs = ctx['parent_lz_path'] + data_dirs = [data_dirs] if isinstance(data_dirs, str) else data_dirs + print('data_dirs: ', data_dirs) command = [ *get_cwltool_base_cmd(tmpdir), @@ -94,11 +96,12 @@ def build_cwltool_cmd1(**kwargs): os.path.join(tmpdir, 'cwl_out'), '--parallel', os.fspath(cwl_workflows_absolute[0]), - '--sequence_directory', - data_dir, '--threads', str(THREADS), ] + for data_dir in data_dirs: + command.append('--sequence_directory') + command.append(data_dir) command_str = ' '.join(shlex.quote(piece) for piece in command) print('final command_str: {!r}'.format(command_str)) diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 0f572aff..0b58475a 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -26,6 +26,9 @@ workflow_map: - 'collection_type': 'single_metadatatsv' 'assay_type': 'SNARE-seq2' 'workflow': 'sc_atac_seq_snare' + - 'collection_type': 'snare_atac_collection' + 'assay_type': 'SNAREseq' + 'workflow': 'sc_atac_seq_snare' - 'collection_type': 'single_metadatatsv' 'assay_type': 'sciATACseq' 'workflow': 'sc_atac_seq_sci' @@ -38,6 +41,9 @@ workflow_map: - 'collection_type': 'single_metadatatsv' 'assay_type': 'SNARE2-RNAseq' 'workflow': 'salmon_rnaseq_snareseq' + - 'collection_type': 'snare_rnaseq_collection' + 'assay_type': 'SNAREseq' + 'workflow': 'salmon_rnaseq_snareseq' - 'collection_type': '.*' 'assay_type': 'Imaging Mass Cytometry' 'workflow': 'ometiff_pyramid'