Skip to content

Commit

Permalink
Add support for inputs and outputs arity
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Sep 10, 2023
1 parent 5686bf1 commit 42504d3
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 33 deletions.
59 changes: 47 additions & 12 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,22 +471,41 @@ workflow {
}
```

The `stageAs` option allows you to control how the file should be named in the task work directory. You can provide a specific name or a pattern as described in the [Multiple input files](#multiple-input-files) section:
Available options:

```groovy
process foo {
`arity`
: :::{versionadded} 23.09.0-edge
:::
: Specify the number of expected files. Can be a number or a range:

```groovy
input:
path x, stageAs: 'data.txt'
path('one.txt', arity: '1') // exactly one file is expected
path('pair_*.txt', arity: '2') // exactly two files are expected
path('many_*.txt', arity: '1..*') // one or more files are expected
```

"""
your_command --in data.txt
"""
}
When a task is created, Nextflow will check whether the received files for each path input match the declared arity, and fail if they do not.

workflow {
foo('/some/data/file.txt')
}
```
`stageAs`
: Specify how the file should be named in the task work directory:

```groovy
process foo {
input:
path x, stageAs: 'data.txt'
"""
your_command --in data.txt
"""
}
workflow {
foo('/some/data/file.txt')
}
```

Can be a name or a pattern as described in the [Multiple input files](#multiple-input-files) section.

:::{note}
Process `path` inputs have nearly the same interface as described in {ref}`script-file-io`, with one difference which is relevant when files are staged into a subdirectory. Given the following input:
Expand Down Expand Up @@ -922,6 +941,22 @@ In the above example, the `randomNum` process creates a file named `result.txt`

Available options:

`arity`
: :::{versionadded} 23.09.0-edge
:::
: Specify the number of expected files. Can be a number or a range:

```groovy
output:
path('one.txt', arity: '1') // exactly one file is expected
path('pair_*.txt', arity: '2') // exactly two files are expected
path('many_*.txt', arity: '1..*') // one or more files are expected
```

When a task completes, Nextflow will check whether the produced files for each path output match the declared arity,
and fail if they do not. If the arity is `1`, a sole file object will be emitted. Otherwise, a list will always be emitted,
even if only one file is produced.

`followLinks`
: When `true` target files are return in place of any matching symlink (default: `true`)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.exception

import groovy.transform.CompileStatic
import groovy.transform.InheritConstructors

/**
* Exception thrown when input/output arity check fails
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
@InheritConstructors
class IllegalArityException extends ProcessUnrecoverableException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ import nextflow.ast.TaskTemplateVarsXform
import nextflow.cloud.CloudSpotTerminationException
import nextflow.dag.NodeMarker
import nextflow.exception.FailedGuardException
import nextflow.exception.IllegalArityException
import nextflow.exception.MissingFileException
import nextflow.exception.MissingValueException
import nextflow.exception.ProcessException
import nextflow.exception.ProcessFailedException
import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.exception.ProcessRetryableException
import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.exception.ShowOnlyExceptionMessage
import nextflow.exception.UnexpectedException
Expand Down Expand Up @@ -1570,7 +1571,6 @@ class TaskProcessor {
task.setOutput(param, stdout)
}


protected void collectOutFiles( TaskRun task, FileOutParam param, Path workDir, Map context ) {

final List<Path> allFiles = []
Expand All @@ -1594,7 +1594,7 @@ class TaskProcessor {
else {
def path = param.glob ? splitter.strip(filePattern) : filePattern
def file = workDir.resolve(path)
def exists = param.followLinks ? file.exists() : file.exists(LinkOption.NOFOLLOW_LINKS)
def exists = checkFileExists(file, param.followLinks)
if( exists )
result = [file]
else
Expand All @@ -1604,18 +1604,24 @@ class TaskProcessor {
if( result )
allFiles.addAll(result)

else if( !param.optional ) {
else if( !param.optional && (!param.arity || param.arity.min > 0) ) {
def msg = "Missing output file(s) `$filePattern` expected by process `${safeTaskName(task)}`"
if( inputsRemovedFlag )
msg += " (note: input files are not included in the default matching set)"
throw new MissingFileException(msg)
}
}

task.setOutput( param, allFiles.size()==1 ? allFiles[0] : allFiles )
if( !param.isValidArity(allFiles.size()) )
throw new IllegalArityException("Incorrect number of output files for process `${safeTaskName(task)}` -- expected ${param.arity}, found ${allFiles.size()}")

task.setOutput( param, allFiles.size()==1 && param.isSingle() ? allFiles[0] : allFiles )

}

protected boolean checkFileExists(Path file, boolean followLinks) {
followLinks ? file.exists() : file.exists(LinkOption.NOFOLLOW_LINKS)
}

protected void collectOutValues( TaskRun task, ValueOutParam param, Map ctx ) {

Expand Down Expand Up @@ -1814,7 +1820,7 @@ class TaskProcessor {
if( obj instanceof Path )
return obj

if( !obj == null )
if( obj == null )
throw new ProcessUnrecoverableException("Path value cannot be null")

if( !(obj instanceof CharSequence) )
Expand Down Expand Up @@ -1856,10 +1862,10 @@ class TaskProcessor {
return files
}

protected singleItemOrList( List<FileHolder> items, ScriptType type ) {
protected singleItemOrList( List<FileHolder> items, boolean single, ScriptType type ) {
assert items != null

if( items.size() == 1 ) {
if( items.size() == 1 && single ) {
return makePath(items[0],type)
}

Expand Down Expand Up @@ -2059,7 +2065,11 @@ class TaskProcessor {
final fileParam = param as FileInParam
final normalized = normalizeInputToFiles(val, count, fileParam.isPathQualifier(), batch)
final resolved = expandWildcards( fileParam.getFilePattern(ctx), normalized )
ctx.put( param.name, singleItemOrList(resolved, task.type) )

if( !param.isValidArity(resolved.size()) )
throw new IllegalArityException("Incorrect number of input files for process `${safeTaskName(task)}` -- expected ${param.arity}, found ${resolved.size()}")

ctx.put( param.name, singleItemOrList(resolved, param.isSingle(), task.type) )
count += resolved.size()
for( FileHolder item : resolved ) {
Integer num = allNames.getOrCreate(item.stageName, 0) +1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.script.params

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import nextflow.exception.IllegalArityException

/**
* Implements an arity option for process inputs and outputs.
*
* @author Ben Sherman <[email protected]>
*/
@CompileStatic
trait ArityParam {

Range arity

Range getArity() { arity }

def setArity(String value) {
if( value.isInteger() ) {
def n = value.toInteger()
this.arity = new Range(n, n)
return this
}

final tokens = value.tokenize('..')
if( tokens.size() == 2 ) {
final min = tokens[0]
final max = tokens[1]
if( min.isInteger() && (max == '*' || max.isInteger()) ) {
this.arity = new Range(
min.toInteger(),
max == '*' ? Integer.MAX_VALUE : max.toInteger()
)
return this
}
}

throw new IllegalArityException("Path arity should be a number (e.g. '1') or a range (e.g. '1..*')")
}

/**
* Determine whether a single output file should be unwrapped.
*/
boolean isSingle() {
return !arity || arity.max == 1
}

boolean isValidArity(int size) {
return !arity || arity.contains(size)
}

@EqualsAndHashCode
static class Range {
int min
int max

Range(int min, int max) {
if( min<0 )
throw new IllegalArityException("Path arity min value must be greater or equals to 0")
if( max<1 )
throw new IllegalArityException("Path arity max value must be greater or equals to 1")
if( min==0 && max==1 )
throw new IllegalArityException("Path arity 0..1 is not allowed")
this.min = min
this.max = max
}

boolean contains(int value) {
min <= value && value <= max
}

@Override
String toString() {
min == max
? min.toString()
: "${min}..${max == Integer.MAX_VALUE ? '*' : max}".toString()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ abstract class BaseInParam extends BaseParam implements InParam {
return inChannel
}

BaseInParam(ProcessConfig config ) {
BaseInParam( ProcessConfig config ) {
this(config.getOwnerScript().getBinding(), config.getInputs())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import nextflow.script.TokenVar
*/
@Slf4j
@InheritConstructors
class FileInParam extends BaseInParam implements PathQualifier {
class FileInParam extends BaseInParam implements ArityParam, PathQualifier {

protected filePattern

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import nextflow.util.BlankSeparatedList
*/
@Slf4j
@InheritConstructors
class FileOutParam extends BaseOutParam implements OutParam, OptionalParam, PathQualifier {
class FileOutParam extends BaseOutParam implements OutParam, ArityParam, OptionalParam, PathQualifier {

/**
* ONLY FOR TESTING DO NOT USE
Expand Down
Loading

0 comments on commit 42504d3

Please sign in to comment.