diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ebfcb2a..77d793ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,13 +9,14 @@ Initial release of genomic-medicine-sweden/skierfe, created with the [nf-core](h ### `Added` +- Refactored conditionally required parameters validation [#69](https://github.com/genomic-medicine-sweden/skierfe/pull/69) - Added CNV workflow to test profile [#68](https://github.com/genomic-medicine-sweden/skierfe/pull/68) -- Update and rename output directories [#65](https://github.com/genomic-medicine-sweden/skierfe/pull/65) -- Update GLNexus version [#58](https://github.com/genomic-medicine-sweden/skierfe/pull/58) +- Updated and rename output directories [#65](https://github.com/genomic-medicine-sweden/skierfe/pull/65) +- Updated GLNexus version [#58](https://github.com/genomic-medicine-sweden/skierfe/pull/58) - Added uBAM support and multisample test [#51](https://github.com/genomic-medicine-sweden/skierfe/pull/51) - Added Revio BAM test data [#50](https://github.com/genomic-medicine-sweden/skierfe/pull/50) -- Update template to 2.13.1 [#38](https://github.com/genomic-medicine-sweden/skierfe/pull/38) -- Update pipeline to run with a small test dataset [#35](https://github.com/genomic-medicine-sweden/skierfe/pull/35) +- Updated template to 2.13.1 [#38](https://github.com/genomic-medicine-sweden/skierfe/pull/38) +- Updated pipeline to run with a small test dataset [#35](https://github.com/genomic-medicine-sweden/skierfe/pull/35) - Added test data and test profile [#33](https://github.com/genomic-medicine-sweden/skierfe/pull/33) ### `Fixed` diff --git a/nextflow.config b/nextflow.config index 4f69e5ed..c36733dc 100644 --- a/nextflow.config +++ b/nextflow.config @@ -15,7 +15,7 @@ params { dipcall_par = null extra_snfs = null extra_gvcfs = null - skip_qc = null + skip_qc = false skip_assembly_wf = false skip_mapping_wf = false skip_methylation_wf = false diff --git a/subworkflows/local/utils_nfcore_skierfe_pipeline/main.nf b/subworkflows/local/utils_nfcore_skierfe_pipeline/main.nf index 71c35663..5929b9ce 100644 --- a/subworkflows/local/utils_nfcore_skierfe_pipeline/main.nf +++ b/subworkflows/local/utils_nfcore_skierfe_pipeline/main.nf @@ -20,6 +20,90 @@ include { imNotification } from '../../nf-core/utils_nfcore_pipeline' include { UTILS_NFCORE_PIPELINE } from '../../nf-core/utils_nfcore_pipeline' include { workflowCitation } from '../../nf-core/utils_nfcore_pipeline' +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + DEFINE DEPENDENCIES (FILES AND WORKFLOWS) FOR OTHER WORKFLOWS +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ + +// +// nf-validation does not support contitional file and params validation, +// add these here. +// + +// +// Define subworkflows and their associated "--skip" +// +def workflowSkips = [ + assembly : "skip_assembly_wf", + qc : "skip_qc", + mapping : "skip_mapping_wf", + snv_calling : "skip_short_variant_calling", + snv_annotation: "skip_snv_annotation", + cnv_calling : "skip_cnv_calling", + phasing : "skip_phasing_wf", + repeat_calling: "skip_repeat_wf", + methylation : "skip_methylation_wf", +] + +// +// E.g., the CNV-calling workflow depends on mapping and snv_calling and can't run without them. +// +def workflowDependencies = [ + snv_calling : ["mapping"], + snv_annotation : ["mapping", "snv_calling"], + cnv_calling : ["mapping", "snv_calling"], + phasing : ["mapping", "snv_calling"], + repeat_calling : ["mapping", "snv_calling", "phasing"], + methylation : ["mapping", "snv_calling", "phasing"], +] + +// +// E.g., the dipcall_par file is required by the assembly workflow and the assembly workflow can't run without dipcall_par +// +def fileDependencies = [ + assembly : ["dipcall_par"], + snv_annotation: ["snp_db", "vep_cache"], + cnv_calling : ["hificnv_xy", "hificnv_xx", "hificnv_exclude"], + repeat_calling: ["trgt_repeats"] +] + +// +// E.g., pacbio can't run with the methylation workflow +// +def presetIncompatibilities = [ + pacbio : ["methylation"], + ONT_R10: ["assembly", "cnv_calling"], +] + +def parameterStatus = [ + workflow: [ + skip_short_variant_calling: params.skip_short_variant_calling, + skip_phasing_wf : params.skip_phasing_wf, + skip_methylation_wf : params.skip_methylation_wf, + skip_repeat_wf : params.skip_repeat_wf, + skip_snv_annotation : params.skip_snv_annotation, + skip_cnv_calling : params.skip_cnv_calling, + skip_mapping_wf : params.skip_mapping_wf, + skip_qc : params.skip_qc, + skip_assembly_wf : params.skip_assembly_wf, + ], + files: [ + dipcall_par : params.dipcall_par, + snp_db : params.snp_db, + vep_cache : params.vep_cache, + hificnv_xy : params.hificnv_xy, + hificnv_xx : params.hificnv_xx, + hificnv_exclude: params.hificnv_exclude, + trgt_repeats : params.trgt_repeats, + ], + preset: [ + pacbio : params.preset == "pacbio", + revio : params.preset == "revio", + ONT_R10: params.preset == "ONT_R10", + ] +] + /* ======================================================================================== SUBWORKFLOW TO INITIALISE PIPELINE @@ -75,7 +159,7 @@ workflow PIPELINE_INITIALISATION { // // Custom validation for pipeline parameters // - validateInputParameters() + validateInputParameters(parameterStatus, workflowSkips, workflowDependencies, fileDependencies, presetIncompatibilities) // // Create channel from input file provided through params.input @@ -137,8 +221,10 @@ workflow PIPELINE_COMPLETION { // // Check and validate pipeline parameters // -def validateInputParameters() { + +def validateInputParameters(statusMap, workflowMap, workflowDependencies, fileDependencies, presetDependencies) { genomeExistsError() + validateParameterCombinations(statusMap, workflowMap, workflowDependencies, fileDependencies, presetDependencies) } // @@ -228,3 +314,165 @@ def methodsDescriptionText(mqc_methods_yaml) { return description_html.toString() } + +// +// Validate preset and workflow skip combinations +// +def validateParameterCombinations(statusMap, workflowMap, workflowDependencies, fileDependencies, presetIncompatibilities) { + // Array to store errors + def errors = [] + // For each of the "workflow", "files", "preset" + statusMap.each { paramsType, allParams -> + // Go through all params and their status + statusMap[paramsType].each { param, paramStatus -> + switch (paramsType) { + case "files": + checkFileDependencies(param, fileDependencies, statusMap, workflowMap, errors) + break + case "workflow": + checkWorkflowDependencies(param, workflowDependencies, statusMap, workflowMap, errors) + break + case "preset": + checkPresetDependencies(param, presetIncompatibilities, statusMap, workflowMap, errors) + break + default: + break + } + } + } + // Give error if there are any + if(errors) { + def error_string = + "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n" + + " " + errors.join("\n ") + "\n" + + "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + error(error_string) + } +} + +// +// Lookup all workflows that needs to be active for a certain preset +// +def checkPresetDependencies(String preset, Map combinationsMap, Map statusMap, Map workflowMap, List errors) { + + // If preset is not active, then give no error + presetIsActive = statusMap["preset"][preset] + if(!presetIsActive) { + return + } + + // Get all required workflows for a preset + def requiredWorkflows = combinationsMap[preset] as Set + // If no direct dependencies are found, return an empty list + if (!requiredWorkflows) { + return [] + } + // Collect the required --skips that are not active for the current preset + def dependencyString = findRequiredSkips("preset", requiredWorkflows, statusMap, workflowMap) + .collect { [ '--', it ].join('') } + .join(" ") + // If all reqired sets are set, give no error + if (!dependencyString) { + return + } + errors << "--preset $preset is active, the pipeline has to be run with: $dependencyString" + return errors +} + +// +// Lookup all workflows that needs to be active for another workflow +// +def checkWorkflowDependencies(String skip, Map combinationsMap, Map statusMap, Map workflowMap, List errors) { + + // Lookup the workflow associated with the --skip_xxx parameter + currentWorkflow = workflowMap.find { key, mapValue -> mapValue == skip }?.key + + // If the --skip is not set, then the workflow is active, give no error + workflowIsActive = !statusMap["workflow"][skip] + if(workflowIsActive) { + return + } + + // Get all other worflows that are required for a certain workflow + def requiredWorkflows = combinationsMap.findAll { it.value.contains(currentWorkflow) }.keySet() + // If no direct dependencies are found or combinationsMap does not contain the workflow, return an empty list + if (!requiredWorkflows) { + return [] + } + // Collect the required --skips that are not active for the current workflow + def dependencyString = findRequiredSkips("workflow", requiredWorkflows, statusMap, workflowMap) + .collect { [ '--', it ].join('') } + .join(" ") + // If all reqired sets are set, give no error + if (!dependencyString) { + return + } + errors << "--$skip is active, the pipeline has to be run with: $dependencyString" + return errors +} + +// +// Lookup if a file is required by any workflows, and add to errors +// +def checkFileDependencies(String file, Map combinationsMap, Map statusMap, Map workflowMap, List errors) { + // Get the the workflow required by file + def workflowThatRequiresFile = findKeyForValue(file, combinationsMap) + // Get the "--skip" for that workflow + def workflowSkip = workflowMap[workflowThatRequiresFile] + // Get the status of the "--skip", if false then workflow is active + def WorkflowIsActive = !statusMap["workflow"][workflowSkip] + // Get the file path + def FilePath = statusMap["files"][file] + // If the workflow that requires the file is active & theres no file available + if(WorkflowIsActive && FilePath == null) { + errors << "--$workflowSkip is NOT active, the following files are required: --$file" + } + return errors +} + +// +// Find the workflow skips that are not currently active +// +def findRequiredSkips(paramType, Set requiredWorkflows, Map statusMap, Map workflowMap) { + + def requiredSkips = [] + + for (currentWorkflow in requiredWorkflows) { + // Get the skip associated with the workflow + skip = workflowMap[currentWorkflow] + + workflowIsSkipped = !statusMap[paramType][skip] + + println("$requiredWorkflows $skip $workflowIsActive") + + if(paramType == "workflow") { + if(workflowIsSkipped) { + requiredSkips << skip + } + } else if(paramType == "preset") { + if(!workflowIsSkipped) { + requiredSkips << skip + } + } + } + return requiredSkips +} + +def findKeyForValue(def valueToFind, Map map) { + for (entry in map) { + def key = entry.key + def value = entry.value + + if (value instanceof List) { + if (value.contains(valueToFind)) { + return key + } + } else { + if (value == valueToFind) { + return key + } + } + } + return null // Value not found +} + diff --git a/workflows/skierfe.nf b/workflows/skierfe.nf index 88a00f7a..3b9178da 100644 --- a/workflows/skierfe.nf +++ b/workflows/skierfe.nf @@ -1,89 +1,5 @@ -/* -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - VALIDATE INPUTS -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -*/ - include { fromSamplesheet } from 'plugin/nf-validation' -// Check mandatory input files -ch_fasta = Channel.fromPath(params.fasta).map { it -> [it.simpleName, it] }.collect() - -// Check optional input files -ch_extra_snfs = params.extra_snfs ? Channel.fromSamplesheet('extra_snfs' , immutable_meta: false) : Channel.empty() -ch_extra_gvcfs = params.extra_gvcfs ? Channel.fromSamplesheet('extra_gvcfs', immutable_meta: false) : Channel.empty() -ch_tandem_repeats = params.tandem_repeats ? Channel.fromPath(params.tandem_repeats).collect() : Channel.value([]) -ch_bed = params.bed ? Channel.fromPath(params.bed).map{ [ it.getSimpleName(), it]}.collect() : Channel.empty() -ch_input_bed = params.bed ? Channel.fromPath(params.bed).map{ [ it.getSimpleName(), it]}.collect() : Channel.value([[],[]]) - -// This should be able to in schema? -if (params.split_fastq < 250 & params.split_fastq > 0 ) { exit 1, '--split_fastq must be 0 or >= 250'} -if (params.parallel_snv == 0 ) { exit 1, '--parallel_snv must be > 0'} - -def checkUnsupportedCombinations() { - if (params.skip_short_variant_calling) { - if (params.skip_phasing_wf & !params.skip_methylation_wf) { - exit 1, 'Cannot run methylation analysis without short variant calling and phasing' - } else if (params.skip_phasing_wf & !params.skip_repeat_wf) { - exit 1, 'Cannot run repeat analysis without short variant calling and phasing' - } else if (!params.skip_phasing_wf) { - exit 1, 'Cannot run phasing analysis without short variant calling' - } else if (!params.skip_repeat_wf ) { - exit 1, 'Cannot run repeat analysis without short variant calling' - } else if (!params.skip_snv_annotation ) { - exit 1, 'Cannot run snv annotation without short variant calling' - } else if (!params.skip_cnv_calling) { - exit 1, 'Cannot run CNV-calling without short variant calling' - } - } - if (!params.skip_assembly_wf) { - // TODO: should be one assembly wf, and one assembly variant calling wf - if(params.dipcall_par) { ch_par = Channel.fromPath(params.dipcall_par).collect() } else { exit 1, 'Not skipping genome assembly: missing input PAR-file (--dipcall_par)' } - } - if (!params.skip_short_variant_calling & !params.skip_repeat_wf) { - if (params.trgt_repeats) { ch_trgt_bed = Channel.fromPath(params.trgt_repeats).collect() } else { exit 1, 'Not skipping repeat calling: missing TGT repeat BED (--trgt_repeats)' } - } - if (!params.skip_short_variant_calling & !params.skip_snv_annotation) { - // TODO: no duplicate dbs should be allowed, although echtvar gives pretty clear error - if(params.snp_db) { ch_databases = Channel.fromSamplesheet('snp_db', immutable_meta: false).map{it[1]}.collect() } else { exit 1, 'Not skipping SNV Annotation: Missing Echtvar-DB samplesheet (--snp_db)'} - if(params.vep_cache) { ch_vep_cache = Channel.fromPath(params.vep_cache).collect() } else { exit 1, 'Not skipping SNV Annotation: missing path to VEP cache-dir (--vep_cache)'} - } - if (!params.skip_short_variant_calling & !params.skip_cnv_calling) { - if(params.hificnv_xy) { ch_expected_xy_bed = Channel.fromPath(params.hificnv_xy).collect() } else { exit 1, 'Not skipping CNV-calling: Missing --hificnv_xy'} - if(params.hificnv_xx) { ch_expected_xx_bed = Channel.fromPath(params.hificnv_xx).collect() } else { exit 1, 'Not skipping CNV-calling: Missing --hificnv_xx'} - if(params.hificnv_exclude) { ch_exclude_bed = Channel.fromPath(params.hificnv_exclude).collect() } else { ch_exclude_bed = Channel.value([]) } - } -} - -// Check and set input files that are mandatory for some analyses -checkUnsupportedCombinations() - -// Validate workflows for different presets -def getValidCallers(preset) { - switch(preset) { - case "revio": - return ["deepvariant"] - case "pacbio": - return ["deepvariant"] - case "ONT_R10": - return ["deepvariant"] - } -} - -def getValidWorkflows(preset) { - switch(preset) { - case "pacbio": - return ["skip_methylation_wf"] - case "ONT_R10": - return ["skip_cnv_calling", "skip_assembly_wf"] - } -} - -if( (params.preset == "pacbio" & !params.skip_methylation_wf) | - (params.preset == "ONT_R10" & (!params.skip_cnv_calling | !params.skip_assembly_wf))) { - exit 1, "Preset \'$params.preset\' cannot be run wih: " + getValidWorkflows(params.preset) -} - /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ IMPORT LOCAL SUBWORKFLOWS @@ -111,10 +27,10 @@ include { SNV_ANNOTATION } from '../subworkflows/local/snv_annotatio */ // local -include { FQCRS } from '../modules/local/fqcrs' -include { CONVERT_ONT_READ_NAMES } from '../modules/local/convert_ont_read_names' -include { BUILD_INTERVALS } from '../modules/local/build_intervals/main' -include { SPLIT_BED_CHUNKS } from '../modules/local/split_bed_chunks/main' +include { FQCRS } from '../modules/local/fqcrs' +include { CONVERT_ONT_READ_NAMES } from '../modules/local/convert_ont_read_names' +include { BUILD_INTERVALS } from '../modules/local/build_intervals/main' +include { SPLIT_BED_CHUNKS } from '../modules/local/split_bed_chunks/main' // nf-core include { FASTQC } from '../modules/nf-core/fastqc/main' @@ -136,17 +52,52 @@ workflow SKIERFE { ch_input main: - ch_versions = Channel.empty() ch_multiqc_files = Channel.empty() + // Mandatory input files + ch_fasta = Channel.fromPath(params.fasta).map { it -> [it.simpleName, it] }.collect() + + // Optional input files + ch_extra_snfs = params.extra_snfs ? Channel.fromSamplesheet('extra_snfs' , immutable_meta: false) + : Channel.empty() + ch_extra_gvcfs = params.extra_gvcfs ? Channel.fromSamplesheet('extra_gvcfs', immutable_meta: false) + : Channel.empty() + ch_tandem_repeats = params.tandem_repeats ? Channel.fromPath(params.tandem_repeats).collect() + : Channel.value([]) + ch_bed = params.bed ? Channel.fromPath(params.bed).map{ [ it.getSimpleName(), it]}.collect() + : Channel.empty() + ch_input_bed = params.bed ? Channel.fromPath(params.bed).map{ [ it.getSimpleName(), it]}.collect() + : Channel.value([]) + + // Conditional input files that has to be set depending on which workflow is run + ch_par = params.dipcall_par ? Channel.fromPath(params.dipcall_par).collect() + : '' + ch_trgt_bed = params.trgt_repeats ? Channel.fromPath(params.trgt_repeats).collect() + : '' + ch_databases = params.snp_db ? Channel.fromSamplesheet('snp_db', immutable_meta: false).map{it[1]}.collect() + : '' + ch_vep_cache = params.vep_cache ? Channel.fromPath(params.vep_cache).collect() + : '' + ch_expected_xy_bed = params.hificnv_xy ? Channel.fromPath(params.hificnv_xy).collect() + : '' + ch_expected_xx_bed = params.hificnv_xx ? Channel.fromPath(params.hificnv_xx).collect() + : '' + ch_exclude_bed = params.hificnv_exclude ? Channel.fromPath(params.hificnv_exclude).collect() + : '' + // Check parameter that doesn't conform to schema validation here + if (params.split_fastq < 250 & params.split_fastq > 0 ) { exit 1, '--split_fastq must be 0 or >= 250'} + if (params.parallel_snv == 0 ) { exit 1, '--parallel_snv must be > 0'} + + // + // Main workflow + // BAM_TO_FASTQ ( ch_input ) ch_versions = ch_versions.mix(BAM_TO_FASTQ.out.versions) BAM_TO_FASTQ.out.fastq .set { ch_sample } - if(!params.skip_qc) { // Fastq QC @@ -202,7 +153,6 @@ workflow SKIERFE { bai = ALIGN_READS.out.bai bam_bai = ALIGN_READS.out.bam_bai - // TODO: parallel_snv should only be allowed when snv calling is active // TODO: move inside PREPARE GENOME, but only run if(parallel_snv > 1) // Split BED/Genome into equal chunks