From 47d01688e074f56e7492c06de0d303259df95797 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Mon, 27 Mar 2023 14:01:09 -0500 Subject: [PATCH 01/13] Add initial task graph and metadata json file Signed-off-by: Ben Sherman --- docs/config.rst | 1 + .../src/main/groovy/nextflow/Session.groovy | 9 +++ .../groovy/nextflow/dag/ConcreteDAG.groovy | 76 +++++++++++++++++++ .../nextflow/dag/CytoscapeHtmlRenderer.groovy | 2 +- .../nextflow/dag/CytoscapeJsRenderer.groovy | 2 +- .../src/main/groovy/nextflow/dag/DAG.groovy | 2 +- .../groovy/nextflow/dag/DagRenderer.groovy | 15 +++- .../groovy/nextflow/dag/DotRenderer.groovy | 2 +- .../groovy/nextflow/dag/GexfRenderer.groovy | 2 +- .../nextflow/dag/GraphVizRenderer.groovy | 2 +- .../nextflow/dag/MermaidRenderer.groovy | 29 +++++-- .../groovy/nextflow/dag/NodeMarker.groovy | 18 ++++- .../nextflow/processor/TaskHandler.groovy | 29 +++++++ .../nextflow/processor/TaskProcessor.groovy | 10 ++- .../groovy/nextflow/processor/TaskRun.groovy | 1 + .../trace/DefaultObserverFactory.groovy | 1 + .../nextflow/trace/GraphObserver.groovy | 50 +++++------- .../nextflow/dag/DotRendererTest.groovy | 2 +- .../nextflow/dag/GexfRendererTest.groovy | 2 +- .../nextflow/dag/MermaidRendererTest.groovy | 2 +- .../nextflow/processor/TaskHandlerTest.groovy | 26 +++++++ .../main/nextflow/extension/FilesEx.groovy | 8 ++ 22 files changed, 239 insertions(+), 52 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy diff --git a/docs/config.rst b/docs/config.rst index 3be4e6b518..744b814da4 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -254,6 +254,7 @@ Name Description enabled When ``true`` turns on the generation of the DAG file (default: ``false``). file Graph file name (default: ``dag-.dot``). overwrite When ``true`` overwrites any existing DAG file with the same name. +type Can be ``abstract`` to render the abstract DAG or ``concrete`` to render the concrete (task) DAG (default: ``abstract``). ================== ================ The above options can be used by prefixing them with the ``dag`` scope or surrounding them by curly diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 9fc4248502..55ef2cf6cc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -40,6 +40,7 @@ import nextflow.conda.CondaConfig import nextflow.config.Manifest import nextflow.container.ContainerConfig import nextflow.dag.DAG +import nextflow.dag.ConcreteDAG import nextflow.exception.AbortOperationException import nextflow.exception.AbortSignalException import nextflow.exception.IllegalConfigException @@ -193,6 +194,8 @@ class Session implements ISession { private DAG dag + private ConcreteDAG concreteDag + private CacheDB cache private Barrier processesBarrier = new Barrier() @@ -345,6 +348,7 @@ class Session implements ISession { // -- DAG object this.dag = new DAG() + this.concreteDag = new ConcreteDAG() // -- init work dir this.workDir = ((config.workDir ?: 'work') as Path).complete() @@ -800,6 +804,8 @@ class Session implements ISession { DAG getDag() { this.dag } + ConcreteDAG getConcreteDAG() { this.concreteDag } + ExecutorService getExecService() { execService } /** @@ -1008,6 +1014,9 @@ class Session implements ISession { final trace = handler.safeTraceRecord() cache.putTaskAsync(handler, trace) + // save the task meta file to the task directory + handler.writeMetaFile() + // notify the event to the observers for( int i=0; i + */ +@Slf4j +class ConcreteDAG { + + Map nodes = new HashMap<>(100) + + /** + * Create a new node for a task + * + * @param task + * @param hash + */ + synchronized void addTaskNode( TaskRun task, String hash ) { + final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}" + final preds = task.getInputFilesMap().values() + .collect { p -> getPredecessorHash(p) } + .findAll { h -> h != null } + + nodes[hash] = new Node( + index: nodes.size(), + label: label, + predecessors: preds + ) + } + + static public String getPredecessorHash(Path path) { + final pattern = Pattern.compile('.*/([a-z0-9]{2}/[a-z0-9]{30})') + final matcher = pattern.matcher(path.toString()) + + matcher.find() ? matcher.group(1).replace('/', '') : null + } + + @MapConstructor + @ToString(includeNames = true, includes = 'label', includePackage=false) + protected class Node { + + int index + + String label + + List predecessors + + String getSlug() { "t${index}" } + + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy index 1d0bbe117f..7cd207807b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy @@ -29,7 +29,7 @@ import java.nio.file.Path class CytoscapeHtmlRenderer implements DagRenderer { @Override - void renderDocument(DAG dag, Path file) { + void renderAbstractGraph(DAG dag, Path file) { String tmplPage = readTemplate() String network = CytoscapeJsRenderer.renderNetwork(dag) file.text = tmplPage.replaceAll(~/\/\* REPLACE_WITH_NETWORK_DATA \*\//, network) diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy index d6eff29552..1486f4648b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy @@ -29,7 +29,7 @@ import java.nio.file.Path class CytoscapeJsRenderer implements DagRenderer { @Override - void renderDocument(DAG dag, Path file) { + void renderAbstractGraph(DAG dag, Path file) { file.text = renderNetwork(dag) } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy index 808bc86660..34e703c0e3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy @@ -41,7 +41,7 @@ import nextflow.script.params.OutputsList import nextflow.script.params.TupleInParam import nextflow.script.params.TupleOutParam /** - * Model a direct acyclic graph of the pipeline execution. + * Model the abstract graph of a pipeline execution. * * @author Paolo Di Tommaso */ diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy index 4b7e487937..f526d5f765 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy @@ -24,10 +24,19 @@ import java.nio.file.Path * @author Paolo Di Tommaso * @author Mike Smoot */ -interface DagRenderer { +trait DagRenderer { /** - * Render the dag to the specified file. + * Render an abstract (process/operator) DAG. */ - void renderDocument(DAG dag, Path file); + void renderAbstractGraph(DAG dag, Path file) { + throw new UnsupportedOperationException("Abstract graph rendering is not supported for this file format") + } + + /** + * Render a concrete (task) DAG. + */ + void renderConcreteGraph(ConcreteDAG dag, Path file) { + throw new UnsupportedOperationException("Concrete graph rendering is not supported for this file format") + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy index 65184f62c2..817d47a3d2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy @@ -46,7 +46,7 @@ class DotRenderer implements DagRenderer { static String normalise(String str) { str.replaceAll(/[^0-9_A-Za-z]/,'') } @Override - void renderDocument(DAG dag, Path file) { + void renderAbstractGraph(DAG dag, Path file) { file.text = renderNetwork(dag) } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy index c2bac51110..43aaba6e9c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy @@ -41,7 +41,7 @@ class GexfRenderer implements DagRenderer { } @Override - void renderDocument(DAG dag, Path file) { + void renderAbstractGraph(DAG dag, Path file) { final Charset charset = Charset.defaultCharset() Writer bw = Files.newBufferedWriter(file, charset) final XMLOutputFactory xof = XMLOutputFactory.newFactory() diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy index 5d119adbdc..7ce79b839c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy @@ -42,7 +42,7 @@ class GraphvizRenderer implements DagRenderer { * See http://www.graphviz.org for more info. */ @Override - void renderDocument(DAG dag, Path target) { + void renderAbstractGraph(DAG dag, Path target) { def result = Files.createTempFile('nxf-',".$format") def temp = Files.createTempFile('nxf-','.dot') // save the DAG as `dot` to a temp file diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy index 4b3ec56c2a..6e55674054 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy @@ -27,11 +27,7 @@ import java.nio.file.Path class MermaidRenderer implements DagRenderer { @Override - void renderDocument(DAG dag, Path file) { - file.text = renderNetwork(dag) - } - - String renderNetwork(DAG dag) { + void renderAbstractGraph(DAG dag, Path file) { def lines = [] lines << "flowchart TD" @@ -45,7 +41,7 @@ class MermaidRenderer implements DagRenderer { lines << "" - return lines.join('\n') + file.text = lines.join('\n') } private String renderVertex(DAG.Vertex vertex) { @@ -76,4 +72,25 @@ class MermaidRenderer implements DagRenderer { return "${edge.from.name} -->${label} ${edge.to.name}" } + + @Override + void renderConcreteGraph(ConcreteDAG graph, Path file) { + def lines = [] + lines << "flowchart TD" + + graph.nodes.values().each { node -> + lines << " ${node.getSlug()}[\"${node.label}\"]" + + node.predecessors.each { key -> + final pred = graph.nodes[key] + + if( pred ) + lines << " ${pred.getSlug()} --> ${node.getSlug()}" + } + } + + lines << "" + + file.text = lines.join('\n') + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy index 4e5dc60c1c..d9f8f14990 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy @@ -22,6 +22,7 @@ import groovyx.gpars.dataflow.operator.DataflowProcessor import nextflow.Global import nextflow.Session import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun import nextflow.script.params.InputsList import nextflow.script.params.OutputsList /** @@ -41,7 +42,7 @@ class NodeMarker { static private Session getSession() { Global.session as Session } /** - * Creates a new vertex in the DAG representing a computing `process` + * Creates a vertex in the abstract DAG representing a computing `process` * * @param label The label associated to the process * @param inputs The list of inputs entering in the process @@ -53,7 +54,7 @@ class NodeMarker { } /** - * Creates a new DAG vertex representing a dataflow operator + * Creates a vertex in the abstract DAG representing a dataflow operator * * @param label The operator label * @param inputs The operator input(s). It can be either a single channel or a list of channels. @@ -67,7 +68,7 @@ class NodeMarker { } /** - * Creates a vertex in the DAG representing a dataflow channel source. + * Creates a vertex in the abstract DAG representing a dataflow channel source. * * @param label The node description * @param source Either a dataflow channel or a list of channel. @@ -88,4 +89,15 @@ class NodeMarker { session.dag.addDataflowBroadcastPair(readChannel, broadcastChannel) } + /** + * Creates a vertex in the concrete DAG representing a task + * + * @param task + * @param hash + */ + static void addTaskNode( TaskRun task, String hash ) { + if( session?.concreteDAG && !session.aborted ) + session.concreteDAG.addTaskNode( task, hash ) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index e14b612566..61efe65c18 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -21,7 +21,11 @@ import static nextflow.processor.TaskStatus.* import java.nio.file.NoSuchFileException +import groovy.json.JsonBuilder import groovy.util.logging.Slf4j +import nextflow.dag.ConcreteDAG +import nextflow.extension.FilesEx +import nextflow.script.params.FileOutParam import nextflow.trace.TraceRecord /** * Actions to handle the underlying job running the user task. @@ -214,6 +218,31 @@ abstract class TaskHandler { return record } + void writeMetaFile() { + final record = [ + hash: task.hash.toString(), + inputs: task.getInputFilesMap().collect { name, path -> + [ + name: name, + path: path.toString(), + predecessor: ConcreteDAG.getPredecessorHash(path) + ] + }, + outputs: task.getOutputsByType(FileOutParam).values().flatten().collect { path -> + final file = path.toFile() + + [ + name: path.name, + path: path.toString(), + size: file.size(), + checksum: file.isFile() ? FilesEx.getChecksum(path) : null + ] + } + ] + + task.workDir.resolve(TaskRun.CMD_META).text = new JsonBuilder(record).toString() + } + /** * Determine if a process can be forked i.e. can launch * a parallel task execution. This is only enforced when diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 2b547f27c0..05e4643928 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -524,7 +524,7 @@ class TaskProcessor { def invoke = new InvokeTaskAdapter(this, opInputs.size()) session.allOperators << (operator = new DataflowOperator(group, params, invoke)) - // notify the creation of a new vertex the execution DAG + // notify the creation of a new process in the abstract DAG NodeMarker.addProcessNode(this, config.getInputs(), config.getOutputs()) // fix issue #41 @@ -756,8 +756,11 @@ class TaskProcessor { log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry" def cached = shouldTryCache && exists && checkCachedOutput(task.clone(), resumeDir, hash, entry) - if( cached ) + if( cached ) { + // add cached task to the task graph + NodeMarker.addTaskNode(task, hash.toString()) break + } } catch (Throwable t) { log.warn1("[${safeTaskName(task)}] Unable to resume cached task -- See log file for details", causedBy: t) @@ -780,6 +783,9 @@ class TaskProcessor { lock.release() } + // add submitted task to the task graph + NodeMarker.addTaskNode(task, hash.toString()) + // submit task for execution submitTask( task, hash, workDir ) break diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index af98f7f372..dd0942fd0e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -543,6 +543,7 @@ class TaskRun implements Cloneable { static final public String CMD_RUN = '.command.run' static final public String CMD_TRACE = '.command.trace' static final public String CMD_ENV = '.command.env' + static final public String CMD_META = '.command.meta.json' String toString( ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index 42325c723a..832e73ede2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -93,6 +93,7 @@ class DefaultObserverFactory implements TraceObserverFactory { if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME def traceFile = (fileName as Path).complete() def observer = new GraphObserver(traceFile) + config.navigate('dag.type') { observer.dagType = it ?: 'abstract' } config.navigate('dag.overwrite') { observer.overwrite = it } result << observer } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy index 2b10d3d244..07e2c01b12 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy @@ -24,6 +24,7 @@ import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.CytoscapeHtmlRenderer import nextflow.dag.DAG +import nextflow.dag.ConcreteDAG import nextflow.dag.DagRenderer import nextflow.dag.DotRenderer import nextflow.dag.GexfRenderer @@ -46,7 +47,11 @@ class GraphObserver implements TraceObserver { private Path file - private DAG dag + private String type + + private DAG abstractDag + + private ConcreteDAG concreteDag private String name @@ -67,7 +72,8 @@ class GraphObserver implements TraceObserver { @Override void onFlowCreate(Session session) { - this.dag = session.dag + this.abstractDag = session.dag + this.concreteDag = session.concreteDag // check file existance final attrs = FileHelper.readAttributes(file) if( attrs ) { @@ -80,14 +86,22 @@ class GraphObserver implements TraceObserver { @Override void onFlowComplete() { - // -- normalise the DAG - dag.normalize() - // -- render it to a file - createRender().renderDocument(dag,file) + if( type == 'abstract' ) { + // -- normalise the DAG + abstractDag.normalize() + // -- render it to a file + createRenderer().renderAbstractGraph(abstractDag,file) + } + else if( type == 'concrete' ) { + createRenderer().renderConcreteGraph(concreteDag,file) + } + else { + log.warn("Invalid DAG type '${type}', should be 'abstract' or 'concrete'") + } } @PackageScope - DagRenderer createRender() { + DagRenderer createRenderer() { if( format == 'dot' ) new DotRenderer(name) @@ -104,28 +118,6 @@ class GraphObserver implements TraceObserver { new GraphvizRenderer(name, format) } - - @Override - void onProcessCreate(TaskProcessor process) { - - } - - - @Override - void onProcessSubmit(TaskHandler handler, TraceRecord trace) { - - } - - @Override - void onProcessStart(TaskHandler handler, TraceRecord trace) { - - } - - @Override - void onProcessComplete(TaskHandler handler, TraceRecord trace) { - - } - @Override boolean enableMetrics() { return false diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy index 499a68105a..5fbab45413 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy @@ -55,7 +55,7 @@ class DotRendererTest extends Specification { dag.normalize() when: - new DotRenderer('TheGraph').renderDocument(dag, file) + new DotRenderer('TheGraph').renderAbstractGraph(dag, file) then: file.text == ''' diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy index f112732b2e..6d4a9e3539 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy @@ -47,7 +47,7 @@ class GexfRendererTest extends Specification { dag.normalize() when: - new GexfRenderer('TheGraph').renderDocument(dag, file.toPath()) + new GexfRenderer('TheGraph').renderAbstractGraph(dag, file.toPath()) then: def graph = new XmlSlurper().parse(file); assert graph.name() == 'gexf' diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy index 43255e4c88..0b46bd3f84 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy @@ -47,7 +47,7 @@ class MermaidRendererTest extends Specification { dag.normalize() when: - new MermaidRenderer().renderDocument(dag, file) + new MermaidRenderer().renderAbstractGraph(dag, file) then: file.text == ''' diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index de052de06a..559be4ef2c 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -17,10 +17,13 @@ package nextflow.processor +import java.nio.file.Paths import java.util.concurrent.atomic.LongAdder +import com.google.common.hash.HashCode import nextflow.Session import nextflow.executor.Executor +import nextflow.script.params.FileOutParam import nextflow.util.Duration import nextflow.util.MemoryUnit import spock.lang.Specification @@ -136,6 +139,29 @@ class TaskHandlerTest extends Specification { } + def 'should write meta file' () { + + given: + def folder = File.createTempDir() + def outputFile = new File(folder, 'bar.txt') ; outputFile.text = 'bar' + def task = Mock(TaskRun) { + hash >> HashCode.fromString("aabbccddeeff00112233445566778899") + workDir >> folder.toPath() + getInputFilesMap() >> [ 'foo.txt': Paths.get('/tmp/00/112233445566778899aabbccddeeff/foo.txt') ] + getOutputsByType(FileOutParam) >> [ 'bar.txt': outputFile.toPath() ] + } + def handler = [:] as TaskHandler + handler.task = task + + when: + handler.writeMetaFile() + then: + task.workDir.resolve(TaskRun.CMD_META).text == """{"hash":"aabbccddeeff00112233445566778899","inputs":[{"name":"foo.txt","path":"/tmp/00/112233445566778899aabbccddeeff/foo.txt","predecessor":"00112233445566778899aabbccddeeff"}],"outputs":[{"name":"bar.txt","path":"${folder}/bar.txt","size":3,"checksum":"37b51d194a7513e45b56f6524f2d51f2"}]}""" + + cleanup: + folder.delete() + } + LongAdder _adder(Integer x) { if( x != null ) { def adder = new LongAdder() diff --git a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy index 94447e677f..9d79cd0a1b 100644 --- a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy +++ b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy @@ -35,6 +35,7 @@ import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileTime import java.nio.file.attribute.PosixFilePermission import java.nio.file.attribute.PosixFilePermissions +import java.security.MessageDigest import groovy.transform.CompileStatic import groovy.transform.PackageScope @@ -1600,4 +1601,11 @@ class FilesEx { static String getScheme(Path path) { path.getFileSystem().provider().getScheme() } + + static String getChecksum(Path path) { + final data = Files.readAllBytes(path) + final hash = MessageDigest.getInstance("MD5").digest(data) + + new BigInteger(1, hash).toString(16) + } } From ae6702734b61615922f3956124a9f091ec7c20f5 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 28 Mar 2023 14:09:43 -0500 Subject: [PATCH 02/13] Add task inputs and outputs to conrete DAG Signed-off-by: Ben Sherman --- .../groovy/nextflow/dag/ConcreteDAG.groovy | 59 +++++++++---- .../nextflow/dag/MermaidRenderer.groovy | 38 ++++++-- .../groovy/nextflow/dag/NodeMarker.groovy | 12 --- .../nextflow/processor/TaskProcessor.groovy | 8 +- .../trace/DefaultObserverFactory.groovy | 2 +- .../nextflow/trace/GraphObserver.groovy | 16 ++++ .../nextflow/dag/ConcreteDAGTest.groovy | 86 +++++++++++++++++++ .../nextflow/dag/MermaidRendererTest.groovy | 68 ++++++++++++++- .../nextflow/processor/TaskHandlerTest.groovy | 5 +- 9 files changed, 246 insertions(+), 48 deletions(-) create mode 100644 modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy index 735a7cf76b..5768916d16 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy @@ -23,6 +23,7 @@ import groovy.transform.MapConstructor import groovy.transform.ToString import groovy.util.logging.Slf4j import nextflow.processor.TaskRun +import nextflow.script.params.FileOutParam /** * Model the conrete (task) graph of a pipeline execution. * @@ -31,46 +32,74 @@ import nextflow.processor.TaskRun @Slf4j class ConcreteDAG { - Map nodes = new HashMap<>(100) + Map nodes = new HashMap<>(100) /** - * Create a new node for a task + * Add a task to the graph * * @param task - * @param hash */ - synchronized void addTaskNode( TaskRun task, String hash ) { + synchronized void addTask( TaskRun task ) { + final hash = task.hash.toString() final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}" - final preds = task.getInputFilesMap().values() - .collect { p -> getPredecessorHash(p) } - .findAll { h -> h != null } + final inputs = task.getInputFilesMap() + .collect { name, path -> + new Input(name: name, path: path, predecessor: getPredecessorHash(path)) + } - nodes[hash] = new Node( + nodes[hash] = new Task( index: nodes.size(), label: label, - predecessors: preds + inputs: inputs ) } static public String getPredecessorHash(Path path) { - final pattern = Pattern.compile('.*/([a-z0-9]{2}/[a-z0-9]{30})') + final pattern = Pattern.compile('.*/([0-9a-f]{2}/[0-9a-f]{30})') final matcher = pattern.matcher(path.toString()) matcher.find() ? matcher.group(1).replace('/', '') : null } + /** + * Add a task's outputs to the graph + * + * @param task + */ + synchronized void addTaskOutputs( TaskRun task ) { + final hash = task.hash.toString() + final outputs = task.getOutputsByType(FileOutParam) + .values() + .flatten() + .collect { path -> + new Output(name: path.name, path: path) + } + + nodes[hash].outputs = outputs + } + @MapConstructor @ToString(includeNames = true, includes = 'label', includePackage=false) - protected class Node { - + static protected class Task { int index - String label - - List predecessors + List inputs + List outputs String getSlug() { "t${index}" } + } + @MapConstructor + static protected class Input { + String name + Path path + String predecessor + } + + @MapConstructor + static protected class Output { + String name + Path path } } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy index 6e55674054..706a93f5d9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy @@ -75,17 +75,41 @@ class MermaidRenderer implements DagRenderer { @Override void renderConcreteGraph(ConcreteDAG graph, Path file) { + def renderedOutputs = [] as Set + def numInputs = 0 + def numOutputs = 0 + def lines = [] lines << "flowchart TD" - graph.nodes.values().each { node -> - lines << " ${node.getSlug()}[\"${node.label}\"]" - - node.predecessors.each { key -> - final pred = graph.nodes[key] + // render tasks and task inputs + graph.nodes.values().each { task -> + // render task node + lines << " ${task.getSlug()}[\"${task.label}\"]" + + task.inputs.each { input -> + // render task input from predecessor + if( input.predecessor != null ) { + final pred = graph.nodes[input.predecessor] + lines << " ${pred.getSlug()} -->|${input.name}| ${task.getSlug()}" + renderedOutputs << input.path + } + + // render task input from source node + else { + numInputs += 1 + lines << " i${numInputs}(( )) -->|${input.name}| ${task.getSlug()}" + } + } + } - if( pred ) - lines << " ${pred.getSlug()} --> ${node.getSlug()}" + // render task outputs with sink nodes + graph.nodes.values().each { task -> + task.outputs.each { output -> + if( output.path !in renderedOutputs ) { + numOutputs += 1 + lines << " ${task.getSlug()} -->|${output.name}| o${numOutputs}(( ))" + } } } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy index d9f8f14990..0d543130e9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy @@ -22,7 +22,6 @@ import groovyx.gpars.dataflow.operator.DataflowProcessor import nextflow.Global import nextflow.Session import nextflow.processor.TaskProcessor -import nextflow.processor.TaskRun import nextflow.script.params.InputsList import nextflow.script.params.OutputsList /** @@ -89,15 +88,4 @@ class NodeMarker { session.dag.addDataflowBroadcastPair(readChannel, broadcastChannel) } - /** - * Creates a vertex in the concrete DAG representing a task - * - * @param task - * @param hash - */ - static void addTaskNode( TaskRun task, String hash ) { - if( session?.concreteDAG && !session.aborted ) - session.concreteDAG.addTaskNode( task, hash ) - } - } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 05e4643928..0032b8e9ae 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -756,11 +756,8 @@ class TaskProcessor { log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry" def cached = shouldTryCache && exists && checkCachedOutput(task.clone(), resumeDir, hash, entry) - if( cached ) { - // add cached task to the task graph - NodeMarker.addTaskNode(task, hash.toString()) + if( cached ) break - } } catch (Throwable t) { log.warn1("[${safeTaskName(task)}] Unable to resume cached task -- See log file for details", causedBy: t) @@ -783,9 +780,6 @@ class TaskProcessor { lock.release() } - // add submitted task to the task graph - NodeMarker.addTaskNode(task, hash.toString()) - // submit task for execution submitTask( task, hash, workDir ) break diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index 832e73ede2..72a8b31d5f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -93,7 +93,7 @@ class DefaultObserverFactory implements TraceObserverFactory { if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME def traceFile = (fileName as Path).complete() def observer = new GraphObserver(traceFile) - config.navigate('dag.type') { observer.dagType = it ?: 'abstract' } + config.navigate('dag.type') { observer.type = it ?: 'abstract' } config.navigate('dag.overwrite') { observer.overwrite = it } result << observer } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy index 07e2c01b12..2be9444056 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy @@ -84,6 +84,22 @@ class GraphObserver implements TraceObserver { } } + @Override + void onProcessSubmit(TaskHandler handler, TraceRecord trace) { + concreteDag.addTask( handler.task ) + } + + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace) { + concreteDag.addTaskOutputs( handler.task ) + } + + @Override + void onProcessCached(TaskHandler handler, TraceRecord trace) { + concreteDag.addTask( handler.task ) + concreteDag.addTaskOutputs( handler.task ) + } + @Override void onFlowComplete() { if( type == 'abstract' ) { diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy new file mode 100644 index 0000000000..a666372169 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy @@ -0,0 +1,86 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.dag + +import java.nio.file.Paths + +import com.google.common.hash.HashCode +import nextflow.processor.TaskRun +import spock.lang.Specification +/** + * + * @author Ben Sherman + */ +class ConcreteDAGTest extends Specification { + + def 'should add task nodes and outputs' () { + + given: + def task1 = Mock(TaskRun) { + getHash() >> HashCode.fromString('00112233445566778899aabbccddeeff') + getName() >> 'foo' + getInputFilesMap() >> [ + 'data.txt': Paths.get('/inputs/data.txt') + ] + getOutputsByType(_) >> [ + 'data.foo': Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') + ] + } + def task2 = Mock(TaskRun) { + getHash() >> HashCode.fromString('aabbccddeeff00112233445566778899') + getName() >> 'bar' + getInputFilesMap() >> [ + 'data.foo': Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') + ] + getOutputsByType(_) >> [ + 'data.bar': Paths.get('/work/aa/bbccddeeff00112233445566778899/data.bar') + ] + } + def dag = new ConcreteDAG() + + when: + dag.addTask( task1 ) + dag.addTask( task2 ) + def node1 = dag.nodes['00112233445566778899aabbccddeeff'] + def node2 = dag.nodes['aabbccddeeff00112233445566778899'] + then: + node1.index == 0 + node1.label == '[00/112233] foo' + node1.inputs.size() == 1 + node1.inputs[0].name == 'data.txt' + node1.inputs[0].path == Paths.get('/inputs/data.txt') + node1.inputs[0].predecessor == null + node2.index == 1 + node2.label == '[aa/bbccdd] bar' + node2.inputs.size() == 1 + node2.inputs[0].name == 'data.foo' + node2.inputs[0].path == Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') + node2.inputs[0].predecessor == '00112233445566778899aabbccddeeff' + + when: + dag.addTaskOutputs( task1 ) + dag.addTaskOutputs( task2 ) + then: + node1.outputs.size() == 1 + node1.outputs[0].name == 'data.foo' + node1.outputs[0].path == Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') + node2.outputs.size() == 1 + node2.outputs[0].name == 'data.bar' + node2.outputs[0].path == Paths.get('/work/aa/bbccddeeff00112233445566778899/data.bar') + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy index 0b46bd3f84..95d408fdc3 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy @@ -16,12 +16,13 @@ */ package nextflow.dag + import java.nio.file.Files +import java.nio.file.Paths import groovyx.gpars.dataflow.DataflowQueue -import spock.lang.Specification - import nextflow.Session +import spock.lang.Specification /** * @@ -33,7 +34,7 @@ class MermaidRendererTest extends Specification { new Session() } - def 'should render a graph using the `mmd` format' () { + def 'should render an abstract graph using the `mmd` format' () { given: def file = Files.createTempFile('test', null) def ch1 = new DataflowQueue() @@ -65,4 +66,65 @@ class MermaidRendererTest extends Specification { cleanup: file.delete() } + + def 'should render a concrete graph using the `mmd` format' () { + given: + def file = Files.createTempFile('test', null) + + def dag = Mock(ConcreteDAG) { + nodes >> [ + '012345': new ConcreteDAG.Task( + index: 1, + label: 'foo', + inputs: [ + new ConcreteDAG.Input( + name: 'data.txt', + path: Paths.get('/inputs/data.txt'), + predecessor: null + ) + ], + outputs: [ + new ConcreteDAG.Output( + name: 'data.foo', + path: Paths.get('/work/012345/data.foo'), + ) + ] + ), + 'abcdef': new ConcreteDAG.Task( + index: 2, + label: 'bar', + inputs: [ + new ConcreteDAG.Input( + name: 'data.foo', + path: Paths.get('/work/012345/data.foo'), + predecessor: '012345' + ) + ], + outputs: [ + new ConcreteDAG.Output( + name: 'data.bar', + path: Paths.get('/work/abcdef/data.bar'), + ) + ] + ) + ] + } + + when: + new MermaidRenderer().renderConcreteGraph(dag, file) + then: + file.text == + ''' + flowchart TD + t1["foo"] + i1(( )) -->|data.txt| t1 + t2["bar"] + t1 -->|data.foo| t2 + t2 -->|data.bar| o1(( )) + ''' + .stripIndent().leftTrim() + + cleanup: + file.delete() + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index 559be4ef2c..cc73df6c00 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.LongAdder import com.google.common.hash.HashCode import nextflow.Session import nextflow.executor.Executor -import nextflow.script.params.FileOutParam import nextflow.util.Duration import nextflow.util.MemoryUnit import spock.lang.Specification @@ -145,10 +144,10 @@ class TaskHandlerTest extends Specification { def folder = File.createTempDir() def outputFile = new File(folder, 'bar.txt') ; outputFile.text = 'bar' def task = Mock(TaskRun) { - hash >> HashCode.fromString("aabbccddeeff00112233445566778899") + hash >> HashCode.fromString('aabbccddeeff00112233445566778899') workDir >> folder.toPath() getInputFilesMap() >> [ 'foo.txt': Paths.get('/tmp/00/112233445566778899aabbccddeeff/foo.txt') ] - getOutputsByType(FileOutParam) >> [ 'bar.txt': outputFile.toPath() ] + getOutputsByType(_) >> [ 'bar.txt': outputFile.toPath() ] } def handler = [:] as TaskHandler handler.task = task From 8f95cd6466e5d79aadc68e7ff18f17aaa530978b Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 28 Mar 2023 14:33:44 -0500 Subject: [PATCH 03/13] Fix failing tests Signed-off-by: Ben Sherman --- .../nextflow/trace/GraphObserver.groovy | 2 +- .../nextflow/trace/GraphObserverTest.groovy | 34 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy index 2be9444056..3826e0b6b5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy @@ -47,7 +47,7 @@ class GraphObserver implements TraceObserver { private Path file - private String type + private String type = 'abstract' private DAG abstractDag diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy index bc1efe8214..baca8d5270 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy @@ -36,7 +36,7 @@ import test.TestHelper */ class GraphObserverTest extends Specification { - DAG test_dag + DAG dag def setup() { new Session() @@ -46,28 +46,28 @@ class GraphObserverTest extends Specification { def ch2 = new DataflowQueue() def ch3 = new DataflowQueue() - test_dag = new DAG() + dag = new DAG() - test_dag.addVertex( + dag.addVertex( DAG.Type.ORIGIN, 'Source', null, [ new DAG.ChannelHandler(channel: src, label: 'src') ] ) - test_dag.addVertex( + dag.addVertex( DAG.Type.PROCESS, 'Process 1', [ new DAG.ChannelHandler(channel: src, label: 'Source') ], [ new DAG.ChannelHandler(channel: ch1, label: 'Channel 1') ] ) - test_dag.addVertex( + dag.addVertex( DAG.Type.OPERATOR, 'Filter', [ new DAG.ChannelHandler(channel: ch1, label: 'Channel 1') ], [ new DAG.ChannelHandler(channel: ch2, label: 'Channel 2') ] ) - test_dag.addVertex( + dag.addVertex( DAG.Type.PROCESS, 'Process 2', [ new DAG.ChannelHandler(channel: ch2, label: 'Channel 2') ], @@ -79,7 +79,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf_','.dot') def gr = new GraphObserver(file) - gr.dag = test_dag + gr.abstractDag = dag when: gr.onFlowComplete() @@ -106,7 +106,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.html') def gr = new GraphObserver(file) - gr.dag = test_dag + gr.abstractDag = dag when: gr.onFlowComplete() @@ -134,7 +134,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.svg') def gr = new GraphObserver(file) - gr.dag = test_dag + gr.abstractDag = dag when: gr.onFlowComplete() @@ -152,7 +152,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.png') def gr = new GraphObserver(file) - gr.dag = test_dag + gr.abstractDag = dag when: gr.onFlowComplete() @@ -169,7 +169,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.pdf') def gr = new GraphObserver(file) - gr.dag = test_dag + gr.abstractDag = dag when: gr.onFlowComplete() @@ -186,7 +186,7 @@ class GraphObserverTest extends Specification { def folder = Files.createTempDirectory('test') def file = folder.resolve('nope') def gr = new GraphObserver(file) - gr.dag = test_dag + gr.abstractDag = dag when: gr.onFlowComplete() @@ -217,34 +217,34 @@ class GraphObserverTest extends Specification { then: observer.name == 'hello-world' observer.format == 'dot' - observer.createRender() instanceof DotRenderer + observer.createRenderer() instanceof DotRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.html')) then: observer.name == 'TheGraph' observer.format == 'html' - observer.createRender() instanceof CytoscapeHtmlRenderer + observer.createRenderer() instanceof CytoscapeHtmlRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.mmd')) then: observer.name == 'TheGraph' observer.format == 'mmd' - observer.createRender() instanceof MermaidRenderer + observer.createRenderer() instanceof MermaidRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.SVG')) then: observer.name == 'TheGraph' observer.format == 'svg' - observer.createRender() instanceof GraphvizRenderer + observer.createRenderer() instanceof GraphvizRenderer when: observer = new GraphObserver(Paths.get('/path/to/anonymous')) then: observer.name == 'anonymous' observer.format == 'dot' - observer.createRender() instanceof DotRenderer + observer.createRenderer() instanceof DotRenderer } } From 9f11e4b193b9187a928992416115df1bbd6b4a61 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 29 Mar 2023 11:18:40 -0500 Subject: [PATCH 04/13] Use path-based APIs to get file metadata Signed-off-by: Ben Sherman --- .../nextflow/processor/TaskHandler.groovy | 7 ++--- .../main/nextflow/extension/FilesEx.groovy | 17 +++++++++-- .../main/nextflow/file/ETagAwareFile.groovy | 29 +++++++++++++++++++ .../src/main/com/upplication/s3fs/S3Path.java | 11 ++++++- .../cloud/azure/nio/AzFileAttributes.groovy | 8 +++++ .../nextflow/cloud/azure/nio/AzPath.groovy | 8 ++++- 6 files changed, 71 insertions(+), 9 deletions(-) create mode 100644 modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 61efe65c18..44212e8cd8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -19,6 +19,7 @@ package nextflow.processor import static nextflow.processor.TaskStatus.* +import java.nio.file.Files import java.nio.file.NoSuchFileException import groovy.json.JsonBuilder @@ -229,13 +230,11 @@ abstract class TaskHandler { ] }, outputs: task.getOutputsByType(FileOutParam).values().flatten().collect { path -> - final file = path.toFile() - [ name: path.name, path: path.toString(), - size: file.size(), - checksum: file.isFile() ? FilesEx.getChecksum(path) : null + size: Files.size(path), + checksum: FilesEx.getChecksum(path) ] } ] diff --git a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy index 9d79cd0a1b..1bac3c19db 100644 --- a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy +++ b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy @@ -35,6 +35,7 @@ import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileTime import java.nio.file.attribute.PosixFilePermission import java.nio.file.attribute.PosixFilePermissions +import java.security.DigestInputStream import java.security.MessageDigest import groovy.transform.CompileStatic @@ -42,6 +43,7 @@ import groovy.transform.PackageScope import groovy.transform.stc.ClosureParams import groovy.transform.stc.FromString import groovy.util.logging.Slf4j +import nextflow.file.ETagAwareFile import nextflow.file.FileHelper import nextflow.file.FileSystemPathFactory import nextflow.io.ByteBufferBackedInputStream @@ -1603,9 +1605,18 @@ class FilesEx { } static String getChecksum(Path path) { - final data = Files.readAllBytes(path) - final hash = MessageDigest.getInstance("MD5").digest(data) + if( Files.isDirectory(path) ) + return null + + if( path instanceof ETagAwareFile ) + return ((ETagAwareFile)path).getETag() + + final md = MessageDigest.getInstance('MD5') + final is = Files.newInputStream(path) + final dis = new DigestInputStream(is, md) + + while( dis.read() != -1 ) {} - new BigInteger(1, hash).toString(16) + new BigInteger(1, md.digest()).toString(16) } } diff --git a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy new file mode 100644 index 0000000000..f1c40073b3 --- /dev/null +++ b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.file + +/** + * Defines the interface for files that have an ETag + * + * @author Ben Sherman + */ +interface ETagAwareFile { + + String getETag() + +} diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java index dfd12cea08..9e9688945e 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java @@ -66,13 +66,14 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import nextflow.file.ETagAwareFile; import nextflow.file.TagAwareFile; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; -public class S3Path implements Path, TagAwareFile { +public class S3Path implements Path, ETagAwareFile, TagAwareFile { public static final String PATH_SEPARATOR = "/"; /** @@ -571,6 +572,14 @@ public String getContentType() { return contentType; } + @Override + public String getETag() { + return fileSystem + .getClient() + .getObjectMetadata(getBucket(), getKey()) + .getETag(); + } + // ~ helpers methods private static Function strip(final String ... strs) { diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy index a955888d70..ba07cda22f 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy @@ -46,6 +46,8 @@ class AzFileAttributes implements BasicFileAttributes { private String objectId + private String etag + static AzFileAttributes root() { new AzFileAttributes(size: 0, objectId: '/', directory: true) } @@ -60,6 +62,7 @@ class AzFileAttributes implements BasicFileAttributes { updateTime = time(props.getLastModified()) directory = client.blobName.endsWith('/') size = props.getBlobSize() + etag = props.getETag() } AzFileAttributes(String containerName, BlobItem item) { @@ -69,6 +72,7 @@ class AzFileAttributes implements BasicFileAttributes { creationTime = time(item.properties.getCreationTime()) updateTime = time(item.properties.getLastModified()) size = item.properties.getContentLength() + etag = item.properties.getETag() } } @@ -144,6 +148,10 @@ class AzFileAttributes implements BasicFileAttributes { return objectId } + String getETag() { + return etag + } + @Override boolean equals( Object obj ) { if( this.class != obj?.class ) return false diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy index 2f654b4ad8..3bdd222f6b 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlobItem import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.PackageScope +import nextflow.file.ETagAwareFile /** * Implements Azure path object @@ -37,7 +38,7 @@ import groovy.transform.PackageScope */ @CompileStatic @EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true) -class AzPath implements Path { +class AzPath implements Path, ETagAwareFile { private AzFileSystem fs @@ -305,6 +306,11 @@ class AzPath implements Path { return this.toString() <=> other.toString() } + @Override + String getETag() { + return attributes.getETag() + } + String getContainerName() { if( path.isAbsolute() ) { path.nameCount==0 ? '/' : path.getName(0) From 84568928fc483ec4c1c55ae415b20f006a52f4d0 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 30 Mar 2023 09:57:20 -0500 Subject: [PATCH 05/13] Use buffer to compute checksum Signed-off-by: Ben Sherman --- .../src/main/nextflow/extension/FilesEx.groovy | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy index df77ee2023..026d0dbc92 100644 --- a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy +++ b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy @@ -34,7 +34,6 @@ import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileTime import java.nio.file.attribute.PosixFilePermission import java.nio.file.attribute.PosixFilePermissions -import java.security.DigestInputStream import java.security.MessageDigest import groovy.transform.CompileStatic @@ -1610,11 +1609,13 @@ class FilesEx { if( path instanceof ETagAwareFile ) return ((ETagAwareFile)path).getETag() - final md = MessageDigest.getInstance('MD5') final is = Files.newInputStream(path) - final dis = new DigestInputStream(is, md) + final md = MessageDigest.getInstance('MD5') + final buf = new byte[16 << 10] - while( dis.read() != -1 ) {} + int len + while( (len=is.read(buf)) != -1 ) + md.update(buf, 0, len) new BigInteger(1, md.digest()).toString(16) } From 77f2cdc316b105dba4d9a42c9abec5a47397d575 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 31 Mar 2023 15:57:59 -0500 Subject: [PATCH 06/13] Add support for temporary output paths Signed-off-by: Ben Sherman --- docs/process.rst | 33 ++++ .../script/params/FileOutParam.groovy | 5 + .../trace/DefaultObserverFactory.groovy | 5 + .../trace/TemporaryFileObserver.groovy | 184 ++++++++++++++++++ tests/temporary-outputs.nf | 49 +++++ 5 files changed, 276 insertions(+) create mode 100644 modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy create mode 100644 tests/temporary-outputs.nf diff --git a/docs/process.rst b/docs/process.rst index e24de605c8..082660ee18 100644 --- a/docs/process.rst +++ b/docs/process.rst @@ -1026,6 +1026,7 @@ Name Description ``type`` Type of paths returned, either ``file``, ``dir`` or ``any`` (default: ``any``, or ``file`` if the specified file name pattern contains a double star (``**``)) ``maxDepth`` Maximum number of directory levels to visit (default: no limit) ``includeInputs`` When ``true`` any input files matching an output file glob pattern are included. +``temporary`` When ``true`` the file will be "emptied" once it is no longer needed by downstream tasks. ================== ===================== The parenthesis are optional for input and output qualifiers, but when you want to set an additional option and there @@ -1150,6 +1151,38 @@ on the actual value of the ``species`` input. because it will result in simpler and more portable code. +Temporary output files +---------------------- + +.. warning:: + This feature is experimental and may change in a future release. + +When ``path`` output is declared with ``temporary: true``, any file associated with this output will be automatically "emptied" +during pipeline execution, as soon as it is no longer needed by downstream tasks. This feature is useful for deleting large +intermediate files that can be discarded once a pipeline run is finished. + +The lifetime of a temporary file is determined by the processes that are downstream of the file's originating process, +either directly or indirectly through channel operators. When all of these processes finish (i.e. all of their tasks finish), +all temporary files produced by the original process can be deleted. + +"Emptying" a file means that the file contents are deleted, but the file metadata (size, last modified timestamp) is preserved, +such that it will be cached on a resumed run. + +The following caveats apply when using temporary outputs: + +- A pipeline run with temporary outputs should not be resumed if any regular downstream outputs have also been deleted. If a + temporary output is cached but a downstream output is not cached (e.g. because it was deleted or the process script was modified), + the downstream task will fail or produce incorrect output. + +- Temporary outputs will not be cached when the ``cache`` directive is set to ``'deep'``, because this mode includes the file + contents in the cache key. + +- A temporary output should not be forwarded by a downstream process using the ``includeInputs`` option. In this case, the temporary + output will be cleaned prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. + +- Directories and remote paths (e.g. S3) are not currently supported. + + .. _process-env: Output type ``env`` diff --git a/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy b/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy index e305c46c3e..58bc52a446 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy @@ -82,6 +82,11 @@ class FileOutParam extends BaseOutParam implements OutParam, OptionalParam, Path boolean glob = true + /** + * When true the file will be "cleaned" once it is no longer needed by downstream tasks. + */ + boolean temporary = false + private GString gstring private Closure dynamicObj private String filePattern diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index 72a8b31d5f..cd7bd72d7e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -26,6 +26,7 @@ class DefaultObserverFactory implements TraceObserverFactory { createDagObserver(result) createWebLogObserver(result) createAnsiLogObserver(result) + createTemporaryFileObserver(result) return result } @@ -117,4 +118,8 @@ class DefaultObserverFactory implements TraceObserverFactory { result << observer } + protected void createTemporaryFileObserver(Collection result) { + result << new TemporaryFileObserver() + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy new file mode 100644 index 0000000000..62a1ba2415 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -0,0 +1,184 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace + +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.attribute.BasicFileAttributes +import java.util.concurrent.ConcurrentHashMap + +import groovy.transform.MapConstructor +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.dag.DAG +import nextflow.dag.ConcreteDAG +import nextflow.processor.TaskHandler +import nextflow.processor.TaskProcessor +import nextflow.script.params.FileOutParam +/** + * Watch for temporary output files and "clean" them once they + * are no longer needed. + * + * @author Ben Sherman + */ +@Slf4j +class TemporaryFileObserver implements TraceObserver { + + private DAG dag + + private Map statusMap + + @Override + void onFlowCreate(Session session) { + this.dag = session.dag + this.statusMap = new ConcurrentHashMap<>() + } + + /** + * When a task is completed, track any temporary output files + * for automatic cleanup. + * + * @param handler + * @param trace + */ + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace) { + // find all temporary output files + final task = handler.task + final temporaryOutputs = task + .getOutputsByType(FileOutParam) + .findAll { param, paths -> param.temporary } + .values() + .flatten() + + if( temporaryOutputs.isEmpty() ) + return + + // update status tracker for the task's process + final processName = task.processor.name + + if( processName !in statusMap ) { + log.trace "Process ${processName} has temporary output files, tracking for automatic cleanup" + + statusMap[processName] = new Status( + paths: [] as Set, + processBarriers: findAllConsumers(processName) + ) + } + + statusMap[processName].paths.addAll(temporaryOutputs) + } + + /** + * Find all processes which are consumers of a given process. + * + * @param processName + */ + Set findAllConsumers(String processName) { + + // find the task's process node in the abstract dag + final processNode = dag.vertices + .find { node -> node.process?.name == processName } + + // find all downstream processes in the abstract dag + def consumers = [] as Set + def queue = [ processNode ] + + while( !queue.isEmpty() ) { + // remove a node from the search queue + final sourceNode = queue.remove(0) + + // search each outgoing edge from the source node + for( def edge : dag.edges ) { + if( edge.from != sourceNode ) + continue + + def node = edge.to + + // add process nodes to the list of consumers + if( node.process != null ) + consumers.add(node.process.name) + // add operator nodes to the queue to keep searching + else + queue.add(node) + } + } + + log.trace "Process ${processName} has the following consumers: ${consumers.join(', ')}" + + return consumers + } + + /** + * When a process is completed, update the status of any processes + * that are waiting on it in order to cleanup temporary outputs. + * + * If, after this update is removed, a process has no more barriers, + * then clean all temporary files for that process. + * + * @param process + */ + @Override + void onProcessTerminate(TaskProcessor process) { + for( def entry : statusMap ) { + // remove barrier from each upstream process + final producer = entry.key + final status = entry.value + final consumers = status.processBarriers + + consumers.remove(process.name) + + // if a process has no more barriers, trigger the cleanup + if( consumers.isEmpty() ) { + log.trace "All consumers of process ${producer} are complete, time to clean up temporary files" + + cleanTemporaryFiles(status.paths) + statusMap.remove(producer) + } + } + } + + void cleanTemporaryFiles(Collection paths) { + for( Path path : paths ) { + log.trace "Cleaning temporary file: ${path}" + + // get original file metadata + final attrs = Files.readAttributes(path, BasicFileAttributes.class) + + // delete file contents + try { + final file = new RandomAccessFile(path.toFile(), 'rw') + file.setLength(0) + file.setLength(attrs.size()) + file.close() + } + catch( UnsupportedOperationException e ) { + log.warn "Unable to truncate file, skipping: ${path.toUriString()}" + } + + // restore original file metadata + Files.setLastModifiedTime(path, attrs.lastModifiedTime()) + } + } + + @MapConstructor + static protected class Status { + Set paths + Set processBarriers + } + +} diff --git a/tests/temporary-outputs.nf b/tests/temporary-outputs.nf new file mode 100644 index 0000000000..5c73ab19ae --- /dev/null +++ b/tests/temporary-outputs.nf @@ -0,0 +1,49 @@ + +process foo { + input: + val meta_id + output: + tuple val(meta_id), path('a.txt', temporary: true) + + script: + """ + touch a.txt + echo 'foo was here' >> a.txt + """ +} + +process bar { + input: + tuple val(meta_id), path('a.txt') + output: + tuple val(meta_id), path('b.txt', temporary: true) + + script: + """ + cat a.txt > b.txt + echo 'bar was here' >> b.txt + """ +} + +process baz { + publishDir '.' + + input: + tuple val(meta_id), path('a.txt'), path('b.txt') + output: + tuple val(meta_id), path('c.txt') + + script: + """ + cat b.txt > c.txt + echo 'baz was here' >> c.txt + """ +} + +workflow { + meta_ids = Channel.of( '1', '2', '3' ) + ch_a = foo(meta_ids) + ch_b = bar(ch_a) + ch_ab = ch_a.join(ch_b) + ch_c = baz(ch_ab) +} From 3e55ad5d303042f8f3107d9bdcda51ded62e9665 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 31 Mar 2023 16:43:05 -0500 Subject: [PATCH 07/13] Fix failing test Signed-off-by: Ben Sherman --- .../src/test/groovy/nextflow/SessionTest.groovy | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy b/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy index ea1ce7e1dc..98efce7139 100644 --- a/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy @@ -25,9 +25,10 @@ import nextflow.container.ContainerConfig import nextflow.exception.AbortOperationException import nextflow.script.ScriptFile import nextflow.script.WorkflowMetadata +import nextflow.trace.TemporaryFileObserver +import nextflow.trace.TraceFileObserver import nextflow.trace.TraceHelper import nextflow.trace.WorkflowStatsObserver -import nextflow.trace.TraceFileObserver import nextflow.util.Duration import nextflow.util.VersionNumber import spock.lang.Specification @@ -248,16 +249,16 @@ class SessionTest extends Specification { session = [:] as Session result = session.createObservers() then: - result.size()==1 + result.size() == 2 result.any { it instanceof WorkflowStatsObserver } + result.any { it instanceof TemporaryFileObserver } when: session = [:] as Session session.config = [trace: [enabled: true, file:'name.txt']] result = session.createObservers() - observer = result[1] as TraceFileObserver + observer = result.find { it instanceof TraceFileObserver } then: - result.size() == 2 observer.tracePath == Paths.get('name.txt').complete() observer.separator == '\t' @@ -265,9 +266,8 @@ class SessionTest extends Specification { session = [:] as Session session.config = [trace: [enabled: true, sep: 'x', fields: 'task_id,name,exit', file: 'alpha.txt']] result = session.createObservers() - observer = result[1] as TraceFileObserver + observer = result.find { it instanceof TraceFileObserver } then: - result.size() == 2 observer.tracePath == Paths.get('alpha.txt').complete() observer.separator == 'x' observer.fields == ['task_id','name','exit'] @@ -283,9 +283,8 @@ class SessionTest extends Specification { session = [:] as Session session.config = [trace: [enabled: true, fields: 'task_id,name,exit,vmem']] result = session.createObservers() - observer = result[1] as TraceFileObserver + observer = result.find { it instanceof TraceFileObserver } then: - result.size() == 2 observer.tracePath == Paths.get('trace-20221001.txt').complete() observer.separator == '\t' observer.fields == ['task_id','name','exit','vmem'] From e307f750b6bbbd6ada72ea7e166515e1c4c0f98b Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 31 Mar 2023 17:09:19 -0500 Subject: [PATCH 08/13] Add caveat about overlapping output channels [ci skip] Signed-off-by: Ben Sherman --- docs/process.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/process.rst b/docs/process.rst index 082660ee18..e801395304 100644 --- a/docs/process.rst +++ b/docs/process.rst @@ -1180,6 +1180,10 @@ The following caveats apply when using temporary outputs: - A temporary output should not be forwarded by a downstream process using the ``includeInputs`` option. In this case, the temporary output will be cleaned prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. +- If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary + file. While it is safe to declare multiple output channels in this way, if the regular output path is also published, it may lead + to some workflow outputs being empty. + - Directories and remote paths (e.g. S3) are not currently supported. From 08881b020848cbb9a6c98d851291dbeb8bdd75ca Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 31 Mar 2023 23:33:02 -0500 Subject: [PATCH 09/13] Delete files instead of emptying them (now supports directories and remote paths) Signed-off-by: Ben Sherman --- docs/process.rst | 24 ++++++------------ .../trace/TemporaryFileObserver.groovy | 25 ++++++------------- 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/docs/process.rst b/docs/process.rst index e801395304..5bbb73d179 100644 --- a/docs/process.rst +++ b/docs/process.rst @@ -1157,34 +1157,24 @@ Temporary output files .. warning:: This feature is experimental and may change in a future release. -When ``path`` output is declared with ``temporary: true``, any file associated with this output will be automatically "emptied" -during pipeline execution, as soon as it is no longer needed by downstream tasks. This feature is useful for deleting large -intermediate files that can be discarded once a pipeline run is finished. +When a ``path`` output is declared with ``temporary: true``, any file associated with this output will be automatically deleted +during pipeline execution, as soon as it is no longer needed by downstream tasks. This feature is useful for cleaning up large +intermediate files in order to free up disk storage. The lifetime of a temporary file is determined by the processes that are downstream of the file's originating process, either directly or indirectly through channel operators. When all of these processes finish (i.e. all of their tasks finish), all temporary files produced by the original process can be deleted. -"Emptying" a file means that the file contents are deleted, but the file metadata (size, last modified timestamp) is preserved, -such that it will be cached on a resumed run. - The following caveats apply when using temporary outputs: -- A pipeline run with temporary outputs should not be resumed if any regular downstream outputs have also been deleted. If a - temporary output is cached but a downstream output is not cached (e.g. because it was deleted or the process script was modified), - the downstream task will fail or produce incorrect output. - -- Temporary outputs will not be cached when the ``cache`` directive is set to ``'deep'``, because this mode includes the file - contents in the cache key. +- This feature will break the resumability of your pipeline. If you try to resume a run with temporary outputs, any tasks that + were cleaned will have to be re-run. - A temporary output should not be forwarded by a downstream process using the ``includeInputs`` option. In this case, the temporary - output will be cleaned prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. + output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. - If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary - file. While it is safe to declare multiple output channels in this way, if the regular output path is also published, it may lead - to some workflow outputs being empty. - -- Directories and remote paths (e.g. S3) are not currently supported. + file. Declare multiple output channels in this way is safe to do as long as the regular output path isn't also published. .. _process-env: diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy index 62a1ba2415..db6ccde226 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.DAG import nextflow.dag.ConcreteDAG +import nextflow.extension.FilesEx import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor import nextflow.script.params.FileOutParam @@ -146,32 +147,20 @@ class TemporaryFileObserver implements TraceObserver { if( consumers.isEmpty() ) { log.trace "All consumers of process ${producer} are complete, time to clean up temporary files" - cleanTemporaryFiles(status.paths) + deleteTemporaryFiles(status.paths) statusMap.remove(producer) } } } - void cleanTemporaryFiles(Collection paths) { + void deleteTemporaryFiles(Collection paths) { for( Path path : paths ) { log.trace "Cleaning temporary file: ${path}" - // get original file metadata - final attrs = Files.readAttributes(path, BasicFileAttributes.class) - - // delete file contents - try { - final file = new RandomAccessFile(path.toFile(), 'rw') - file.setLength(0) - file.setLength(attrs.size()) - file.close() - } - catch( UnsupportedOperationException e ) { - log.warn "Unable to truncate file, skipping: ${path.toUriString()}" - } - - // restore original file metadata - Files.setLastModifiedTime(path, attrs.lastModifiedTime()) + if( Files.isDirectory(path) ) + FilesEx.deleteDir(path) + else + Files.delete(path) } } From e81e584c27172f76dd65d8ed3b189e4b3e61ed9f Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 26 Apr 2023 11:54:54 -0500 Subject: [PATCH 10/13] Replace synchronized with lock Signed-off-by: Ben Sherman --- docs/config.md | 3 +- .../groovy/nextflow/dag/ConcreteDAG.groovy | 38 ++++++++++++++----- .../groovy/nextflow/dag/DagRenderer.groovy | 2 +- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/docs/config.md b/docs/config.md index fec1b46930..a6bccaca22 100644 --- a/docs/config.md +++ b/docs/config.md @@ -484,8 +484,7 @@ The following settings are available: : When `true` overwrites any existing DAG file with the same name. `dag.type` -: Can be `abstract` to render the abstract DAG or `concrete` to render the concrete (task) DAG (default: `abstract`). - +: Can be `abstract` to render the abstract (process) DAG or `concrete` to render the concrete (task) DAG (default: `abstract`). Read the {ref}`dag-visualisation` page to learn more about the execution graph that can be generated by Nextflow. diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy index 5768916d16..c580f62147 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy @@ -18,6 +18,8 @@ package nextflow.dag import java.nio.file.Path import java.util.regex.Pattern +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock import groovy.transform.MapConstructor import groovy.transform.ToString @@ -32,14 +34,20 @@ import nextflow.script.params.FileOutParam @Slf4j class ConcreteDAG { - Map nodes = new HashMap<>(100) + private Lock sync = new ReentrantLock() + + private Map nodes = new HashMap<>(100) + + Map getNodes() { + nodes + } /** * Add a task to the graph * * @param task */ - synchronized void addTask( TaskRun task ) { + void addTask(TaskRun task) { final hash = task.hash.toString() final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}" final inputs = task.getInputFilesMap() @@ -47,11 +55,17 @@ class ConcreteDAG { new Input(name: name, path: path, predecessor: getPredecessorHash(path)) } - nodes[hash] = new Task( - index: nodes.size(), - label: label, - inputs: inputs - ) + sync.lock() + try { + nodes[hash] = new Task( + index: nodes.size(), + label: label, + inputs: inputs + ) + } + finally { + sync.unlock() + } } static public String getPredecessorHash(Path path) { @@ -66,7 +80,7 @@ class ConcreteDAG { * * @param task */ - synchronized void addTaskOutputs( TaskRun task ) { + void addTaskOutputs(TaskRun task) { final hash = task.hash.toString() final outputs = task.getOutputsByType(FileOutParam) .values() @@ -75,7 +89,13 @@ class ConcreteDAG { new Output(name: path.name, path: path) } - nodes[hash].outputs = outputs + sync.lock() + try { + nodes[hash].outputs = outputs + } + finally { + sync.unlock() + } } @MapConstructor diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy index ccdb8fecc0..768c9ef11e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy @@ -26,7 +26,7 @@ import java.nio.file.Path trait DagRenderer { /** - * Render an abstract (process/operator) DAG. + * Render an abstract (process) DAG. */ void renderAbstractGraph(DAG dag, Path file) { throw new UnsupportedOperationException("Abstract graph rendering is not supported for this file format") From 6fa9e9281f9acfeabaf72c4964a427f3eb46bb50 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 26 Apr 2023 16:08:52 -0500 Subject: [PATCH 11/13] Use lock in temp file observer Signed-off-by: Ben Sherman --- .../script/params/FileOutParam.groovy | 2 +- .../trace/TemporaryFileObserver.groovy | 91 ++++++++++--------- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy b/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy index 58bc52a446..2cf6f44f61 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/params/FileOutParam.groovy @@ -83,7 +83,7 @@ class FileOutParam extends BaseOutParam implements OutParam, OptionalParam, Path boolean glob = true /** - * When true the file will be "cleaned" once it is no longer needed by downstream tasks. + * When true the target files will be deleted once they are no longer needed by downstream tasks. */ boolean temporary = false diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy index db6ccde226..eafd6ebd07 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -16,22 +16,21 @@ package nextflow.trace -import java.nio.file.Files import java.nio.file.Path -import java.nio.file.attribute.BasicFileAttributes import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock -import groovy.transform.MapConstructor import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.DAG import nextflow.dag.ConcreteDAG -import nextflow.extension.FilesEx +import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor import nextflow.script.params.FileOutParam /** - * Watch for temporary output files and "clean" them once they + * Track temporary output files and delete them once they * are no longer needed. * * @author Ben Sherman @@ -41,12 +40,13 @@ class TemporaryFileObserver implements TraceObserver { private DAG dag - private Map statusMap + private Map statusMap = new ConcurrentHashMap<>() + + private Lock sync = new ReentrantLock() @Override void onFlowCreate(Session session) { this.dag = session.dag - this.statusMap = new ConcurrentHashMap<>() } /** @@ -60,28 +60,29 @@ class TemporaryFileObserver implements TraceObserver { void onProcessComplete(TaskHandler handler, TraceRecord trace) { // find all temporary output files final task = handler.task - final temporaryOutputs = task + final tempOutputs = task .getOutputsByType(FileOutParam) .findAll { param, paths -> param.temporary } .values() .flatten() - if( temporaryOutputs.isEmpty() ) + if( tempOutputs.isEmpty() ) return // update status tracker for the task's process final processName = task.processor.name - if( processName !in statusMap ) { - log.trace "Process ${processName} has temporary output files, tracking for automatic cleanup" - - statusMap[processName] = new Status( - paths: [] as Set, - processBarriers: findAllConsumers(processName) - ) + sync.lock() + try { + if( processName !in statusMap ) { + log.trace "Process ${processName} has temporary output files, tracking for automatic cleanup" + statusMap[processName] = new Status(findAllConsumers(processName)) + } + statusMap[processName].paths.addAll(tempOutputs) + } + finally { + sync.unlock() } - - statusMap[processName].paths.addAll(temporaryOutputs) } /** @@ -89,7 +90,7 @@ class TemporaryFileObserver implements TraceObserver { * * @param processName */ - Set findAllConsumers(String processName) { + private Set findAllConsumers(String processName) { // find the task's process node in the abstract dag final processNode = dag.vertices @@ -135,39 +136,45 @@ class TemporaryFileObserver implements TraceObserver { */ @Override void onProcessTerminate(TaskProcessor process) { - for( def entry : statusMap ) { - // remove barrier from each upstream process - final producer = entry.key - final status = entry.value - final consumers = status.processBarriers - - consumers.remove(process.name) - - // if a process has no more barriers, trigger the cleanup - if( consumers.isEmpty() ) { - log.trace "All consumers of process ${producer} are complete, time to clean up temporary files" - - deleteTemporaryFiles(status.paths) - statusMap.remove(producer) + sync.lock() + try { + for( def entry : statusMap ) { + // remove barrier from each upstream process + final producer = entry.key + final status = entry.value + final consumers = status.processBarriers + + consumers.remove(process.name) + + // if a process has no more barriers, trigger the cleanup + if( consumers.isEmpty() ) { + log.trace "All consumers of process ${producer} are complete, deleting temporary files" + + deleteTemporaryFiles(status.paths) + statusMap.remove(producer) + } } } + finally { + sync.unlock() + } } - void deleteTemporaryFiles(Collection paths) { + private void deleteTemporaryFiles(Collection paths) { for( Path path : paths ) { - log.trace "Cleaning temporary file: ${path}" - - if( Files.isDirectory(path) ) - FilesEx.deleteDir(path) - else - Files.delete(path) + log.trace "Deleting temporary file: ${path}" + FileHelper.deletePath(path) } } - @MapConstructor - static protected class Status { + static private class Status { Set paths Set processBarriers + + Status(Set processes) { + this.paths = [] as Set + this.processBarriers = processes + } } } From f46f5065a79f723615bcb36a93666b1d6b08cb05 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 28 Apr 2023 12:08:37 -0500 Subject: [PATCH 12/13] Improve tracking of temporary files Signed-off-by: Ben Sherman --- docs/process.md | 6 +- .../src/main/groovy/nextflow/Session.groovy | 11 ++ .../nextflow/processor/TaskProcessor.groovy | 5 + .../trace/TemporaryFileObserver.groovy | 134 ++++++++++++------ .../nextflow/trace/TraceObserver.groovy | 5 + 5 files changed, 111 insertions(+), 50 deletions(-) diff --git a/docs/process.md b/docs/process.md index 06ef28dfaa..81fa754b82 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1020,13 +1020,13 @@ This feature is experimental and may change in a future release. When a `path` output is declared with `temporary: true`, the target files for this output will be automatically deleted during pipeline execution, as soon as they are no longer needed by downstream tasks. This feature is useful for cleaning up large intermediate files in order to free up disk storage. -The lifetime of a temporary file is determined by the processes that are downstream of the file's originating process, either directly or indirectly through channel operators. When all of these processes finish (i.e. all of their tasks finish), all temporary files produced by the original process can be deleted. +The lifetime of a temporary file is determined by the downstream tasks that take the file as an input. When all of these tasks finish, the temporary file can be deleted. The following caveats apply when using temporary outputs: -- This feature will break the resumability of your pipeline. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run. +- Resumability is not currently supported for tasks with temporary outputs. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run. -- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. +- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. To avoid this problem, only the most downstream output should be declared as temporary. - If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary file. If the regular output path is also published, some outputs may be deleted before they can be published. diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 87660008ac..836d2f50e7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -941,6 +941,17 @@ class Session implements ISession { } } + void notifyProcessClose(TaskProcessor process) { + observers.each { observer -> + try { + observer.onProcessClose(process) + } + catch( Exception e ) { + log.debug(e.getMessage(), e) + } + } + } + void notifyProcessTerminate(TaskProcessor process) { for( int i=0; i it.incCompleted() } } + protected void closeProcess() { + session.notifyProcessClose(this) + } + protected void terminateProcess() { log.trace "<${name}> Sending poison pills and terminating process" sendPoisonPill() @@ -2440,6 +2444,7 @@ class TaskProcessor { @Override void afterStop(final DataflowProcessor processor) { log.trace "<${name}> After stop" + closeProcess() } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy index eafd6ebd07..e487d8746d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock +import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.DAG @@ -28,6 +29,7 @@ import nextflow.dag.ConcreteDAG import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun import nextflow.script.params.FileOutParam /** * Track temporary output files and delete them once they @@ -36,11 +38,18 @@ import nextflow.script.params.FileOutParam * @author Ben Sherman */ @Slf4j +@CompileStatic class TemporaryFileObserver implements TraceObserver { private DAG dag - private Map statusMap = new ConcurrentHashMap<>() + private Map pathStatuses = new HashMap<>() + + private Set completedTasks = new HashSet<>() + + private Map> processConsumers = new HashMap<>() + + private Set closedProcesses = new HashSet<>() private Lock sync = new ReentrantLock() @@ -49,6 +58,29 @@ class TemporaryFileObserver implements TraceObserver { this.dag = session.dag } + /** + * When a task is created, add it to the set of consumers for any temporary + * file that it takes as input. + * + * @param handler + * @param trace + */ + @Override + void onProcessPending(TaskHandler handler, TraceRecord trace) { + sync.lock() + try { + final task = handler.task + for( def entry : task.getInputFilesMap() ) { + final storePath = entry.value + if( storePath in pathStatuses ) + pathStatuses[storePath].consumers.add(task) + } + } + finally { + sync.unlock() + } + } + /** * When a task is completed, track any temporary output files * for automatic cleanup. @@ -58,27 +90,25 @@ class TemporaryFileObserver implements TraceObserver { */ @Override void onProcessComplete(TaskHandler handler, TraceRecord trace) { - // find all temporary output files + // query all temporary files produced by task final task = handler.task final tempOutputs = task .getOutputsByType(FileOutParam) .findAll { param, paths -> param.temporary } .values() - .flatten() - - if( tempOutputs.isEmpty() ) - return - - // update status tracker for the task's process - final processName = task.processor.name + .flatten() as Set sync.lock() try { - if( processName !in statusMap ) { - log.trace "Process ${processName} has temporary output files, tracking for automatic cleanup" - statusMap[processName] = new Status(findAllConsumers(processName)) - } - statusMap[processName].paths.addAll(tempOutputs) + // mark task as completed + completedTasks.add(task) + + // scan temporary files for cleanup + cleanup0() + + // add new temporary outputs to status map + for( Path path : tempOutputs ) + pathStatuses[path] = new Status(task) } finally { sync.unlock() @@ -86,11 +116,18 @@ class TemporaryFileObserver implements TraceObserver { } /** - * Find all processes which are consumers of a given process. + * Get the consumers of a process. * * @param processName */ - private Set findAllConsumers(String processName) { + private Set getProcessConsumers(String processName) { + if( processName !in processConsumers ) + processConsumers[processName] = getProcessConsumers0(processName) + + return processConsumers[processName] + } + + private Set getProcessConsumers0(String processName) { // find the task's process node in the abstract dag final processNode = dag.vertices @@ -126,54 +163,57 @@ class TemporaryFileObserver implements TraceObserver { } /** - * When a process is completed, update the status of any processes - * that are waiting on it in order to cleanup temporary outputs. - * - * If, after this update is removed, a process has no more barriers, - * then clean all temporary files for that process. + * When a process is closed (all tasks of the process have been created), + * mark the process as closed and scan for automatic cleanup. * * @param process */ @Override - void onProcessTerminate(TaskProcessor process) { + void onProcessClose(TaskProcessor process) { sync.lock() try { - for( def entry : statusMap ) { - // remove barrier from each upstream process - final producer = entry.key - final status = entry.value - final consumers = status.processBarriers - - consumers.remove(process.name) - - // if a process has no more barriers, trigger the cleanup - if( consumers.isEmpty() ) { - log.trace "All consumers of process ${producer} are complete, deleting temporary files" - - deleteTemporaryFiles(status.paths) - statusMap.remove(producer) - } - } + closedProcesses.add(process.name) + cleanup0() } finally { sync.unlock() } } - private void deleteTemporaryFiles(Collection paths) { - for( Path path : paths ) { - log.trace "Deleting temporary file: ${path}" - FileHelper.deletePath(path) + /** + * Delete any temporary file that has no more barriers. + */ + private void cleanup0() { + for( Path path : pathStatuses.keySet() ) { + final status = pathStatuses[path] + if( !status.deleted && canDelete(path) ) { + log.trace "Deleting temporary file: ${path}" + FileHelper.deletePath(path) + status.deleted = true + } } } + /** + * Determine whether a path can be deleted. + * + * A path can be deleted if all of its process consumers + * are closed and all of its task consumers are completed. + */ + private boolean canDelete(Path path) { + final status = pathStatuses[path] + final processConsumers = getProcessConsumers(status.task.processor.name) + final taskConsumers = status.consumers + processConsumers.every { p -> p in closedProcesses } && taskConsumers.every { t -> t in completedTasks } + } + static private class Status { - Set paths - Set processBarriers + TaskRun task + Set consumers = [] as Set + boolean deleted = false - Status(Set processes) { - this.paths = [] as Set - this.processBarriers = processes + Status(TaskRun task) { + this.task = task } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy index 45e414764e..aacdbe95de 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy @@ -50,6 +50,11 @@ trait TraceObserver { */ void onProcessCreate( TaskProcessor process ){} + /** + * Invoked when the process is closed (all tasks have been created). + */ + void onProcessClose( TaskProcessor process ){} + /* * Invoked when all tak have been executed and process ends. */ From 9637e34b7a81ef5054428f8b17e36d4f427ec243 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 28 Apr 2023 15:15:16 -0500 Subject: [PATCH 13/13] Remove dependency on task graph branch Signed-off-by: Ben Sherman --- docs/config.md | 3 - .../src/main/groovy/nextflow/Session.groovy | 9 -- .../groovy/nextflow/dag/ConcreteDAG.groovy | 125 ------------------ .../nextflow/dag/CytoscapeHtmlRenderer.groovy | 2 +- .../nextflow/dag/CytoscapeJsRenderer.groovy | 2 +- .../src/main/groovy/nextflow/dag/DAG.groovy | 2 +- .../groovy/nextflow/dag/DagRenderer.groovy | 15 +-- .../groovy/nextflow/dag/DotRenderer.groovy | 2 +- .../groovy/nextflow/dag/GexfRenderer.groovy | 2 +- .../nextflow/dag/GraphVizRenderer.groovy | 2 +- .../nextflow/dag/MermaidRenderer.groovy | 53 +------- .../groovy/nextflow/dag/NodeMarker.groovy | 6 +- .../nextflow/processor/TaskHandler.groovy | 28 ---- .../nextflow/processor/TaskProcessor.groovy | 2 +- .../groovy/nextflow/processor/TaskRun.groovy | 1 - .../trace/DefaultObserverFactory.groovy | 1 - .../nextflow/trace/GraphObserver.groovy | 66 ++++----- .../trace/TemporaryFileObserver.groovy | 1 - .../nextflow/dag/ConcreteDAGTest.groovy | 86 ------------ .../nextflow/dag/DotRendererTest.groovy | 2 +- .../nextflow/dag/GexfRendererTest.groovy | 2 +- .../nextflow/dag/MermaidRendererTest.groovy | 70 +--------- .../nextflow/processor/TaskHandlerTest.groovy | 25 ---- .../nextflow/trace/GraphObserverTest.groovy | 34 ++--- .../main/nextflow/extension/FilesEx.groovy | 20 --- .../main/nextflow/file/ETagAwareFile.groovy | 29 ---- .../main/nextflow/cloud/aws/nio/S3Path.java | 11 +- .../cloud/azure/nio/AzFileAttributes.groovy | 8 -- .../nextflow/cloud/azure/nio/AzPath.groovy | 8 +- 29 files changed, 73 insertions(+), 544 deletions(-) delete mode 100644 modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy delete mode 100644 modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy delete mode 100644 modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy diff --git a/docs/config.md b/docs/config.md index a6bccaca22..b729fb48cd 100644 --- a/docs/config.md +++ b/docs/config.md @@ -483,9 +483,6 @@ The following settings are available: `dag.overwrite` : When `true` overwrites any existing DAG file with the same name. -`dag.type` -: Can be `abstract` to render the abstract (process) DAG or `concrete` to render the concrete (task) DAG (default: `abstract`). - Read the {ref}`dag-visualisation` page to learn more about the execution graph that can be generated by Nextflow. (config-docker)= diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 836d2f50e7..0020c7e11d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -39,7 +39,6 @@ import nextflow.conda.CondaConfig import nextflow.config.Manifest import nextflow.container.ContainerConfig import nextflow.dag.DAG -import nextflow.dag.ConcreteDAG import nextflow.exception.AbortOperationException import nextflow.exception.AbortSignalException import nextflow.exception.IllegalConfigException @@ -194,8 +193,6 @@ class Session implements ISession { private DAG dag - private ConcreteDAG concreteDag - private CacheDB cache private Barrier processesBarrier = new Barrier() @@ -348,7 +345,6 @@ class Session implements ISession { // -- DAG object this.dag = new DAG() - this.concreteDag = new ConcreteDAG() // -- init work dir this.workDir = ((config.workDir ?: 'work') as Path).complete() @@ -803,8 +799,6 @@ class Session implements ISession { DAG getDag() { this.dag } - ConcreteDAG getConcreteDAG() { this.concreteDag } - ExecutorService getExecService() { execService } /** @@ -1024,9 +1018,6 @@ class Session implements ISession { final trace = handler.safeTraceRecord() cache.putTaskAsync(handler, trace) - // save the task meta file to the task directory - handler.writeMetaFile() - // notify the event to the observers for( int i=0; i - */ -@Slf4j -class ConcreteDAG { - - private Lock sync = new ReentrantLock() - - private Map nodes = new HashMap<>(100) - - Map getNodes() { - nodes - } - - /** - * Add a task to the graph - * - * @param task - */ - void addTask(TaskRun task) { - final hash = task.hash.toString() - final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}" - final inputs = task.getInputFilesMap() - .collect { name, path -> - new Input(name: name, path: path, predecessor: getPredecessorHash(path)) - } - - sync.lock() - try { - nodes[hash] = new Task( - index: nodes.size(), - label: label, - inputs: inputs - ) - } - finally { - sync.unlock() - } - } - - static public String getPredecessorHash(Path path) { - final pattern = Pattern.compile('.*/([0-9a-f]{2}/[0-9a-f]{30})') - final matcher = pattern.matcher(path.toString()) - - matcher.find() ? matcher.group(1).replace('/', '') : null - } - - /** - * Add a task's outputs to the graph - * - * @param task - */ - void addTaskOutputs(TaskRun task) { - final hash = task.hash.toString() - final outputs = task.getOutputsByType(FileOutParam) - .values() - .flatten() - .collect { path -> - new Output(name: path.name, path: path) - } - - sync.lock() - try { - nodes[hash].outputs = outputs - } - finally { - sync.unlock() - } - } - - @MapConstructor - @ToString(includeNames = true, includes = 'label', includePackage=false) - static protected class Task { - int index - String label - List inputs - List outputs - - String getSlug() { "t${index}" } - } - - @MapConstructor - static protected class Input { - String name - Path path - String predecessor - } - - @MapConstructor - static protected class Output { - String name - Path path - } - -} diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy index e839824da8..5aff50ffe3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy @@ -28,7 +28,7 @@ import java.nio.file.Path class CytoscapeHtmlRenderer implements DagRenderer { @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { String tmplPage = readTemplate() String network = CytoscapeJsRenderer.renderNetwork(dag) file.text = tmplPage.replaceAll(~/\/\* REPLACE_WITH_NETWORK_DATA \*\//, network) diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy index af2a09b98e..dfcb8246df 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy @@ -28,7 +28,7 @@ import java.nio.file.Path class CytoscapeJsRenderer implements DagRenderer { @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { file.text = renderNetwork(dag) } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy index da4e5887ce..d1ef0c0666 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy @@ -43,7 +43,7 @@ import nextflow.script.params.TupleOutParam import java.util.concurrent.atomic.AtomicLong /** - * Model the abstract graph of a pipeline execution. + * Model a direct acyclic graph of the pipeline execution. * * @author Paolo Di Tommaso */ diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy index 768c9ef11e..0d3bfcd252 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy @@ -23,19 +23,10 @@ import java.nio.file.Path * @author Paolo Di Tommaso * @author Mike Smoot */ -trait DagRenderer { +interface DagRenderer { /** - * Render an abstract (process) DAG. + * Render the dag to the specified file. */ - void renderAbstractGraph(DAG dag, Path file) { - throw new UnsupportedOperationException("Abstract graph rendering is not supported for this file format") - } - - /** - * Render a concrete (task) DAG. - */ - void renderConcreteGraph(ConcreteDAG dag, Path file) { - throw new UnsupportedOperationException("Concrete graph rendering is not supported for this file format") - } + void renderDocument(DAG dag, Path file); } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy index c5411f820b..f9b5a36e41 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy @@ -45,7 +45,7 @@ class DotRenderer implements DagRenderer { static String normalise(String str) { str.replaceAll(/[^0-9_A-Za-z]/,'') } @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { file.text = renderNetwork(dag) } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy index 43aaba6e9c..c2bac51110 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy @@ -41,7 +41,7 @@ class GexfRenderer implements DagRenderer { } @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { final Charset charset = Charset.defaultCharset() Writer bw = Files.newBufferedWriter(file, charset) final XMLOutputFactory xof = XMLOutputFactory.newFactory() diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy index 0fddcee58d..3106890b9c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy @@ -41,7 +41,7 @@ class GraphvizRenderer implements DagRenderer { * See http://www.graphviz.org for more info. */ @Override - void renderAbstractGraph(DAG dag, Path target) { + void renderDocument(DAG dag, Path target) { def result = Files.createTempFile('nxf-',".$format") def temp = Files.createTempFile('nxf-','.dot') // save the DAG as `dot` to a temp file diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy index fa8f6f602f..3afdb7cf42 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy @@ -26,7 +26,11 @@ import java.nio.file.Path class MermaidRenderer implements DagRenderer { @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { + file.text = renderNetwork(dag) + } + + String renderNetwork(DAG dag) { def lines = [] lines << "flowchart TD" @@ -40,7 +44,7 @@ class MermaidRenderer implements DagRenderer { lines << "" - file.text = lines.join('\n') + return lines.join('\n') } private String renderVertex(DAG.Vertex vertex) { @@ -71,49 +75,4 @@ class MermaidRenderer implements DagRenderer { return "${edge.from.name} -->${label} ${edge.to.name}" } - - @Override - void renderConcreteGraph(ConcreteDAG graph, Path file) { - def renderedOutputs = [] as Set - def numInputs = 0 - def numOutputs = 0 - - def lines = [] - lines << "flowchart TD" - - // render tasks and task inputs - graph.nodes.values().each { task -> - // render task node - lines << " ${task.getSlug()}[\"${task.label}\"]" - - task.inputs.each { input -> - // render task input from predecessor - if( input.predecessor != null ) { - final pred = graph.nodes[input.predecessor] - lines << " ${pred.getSlug()} -->|${input.name}| ${task.getSlug()}" - renderedOutputs << input.path - } - - // render task input from source node - else { - numInputs += 1 - lines << " i${numInputs}(( )) -->|${input.name}| ${task.getSlug()}" - } - } - } - - // render task outputs with sink nodes - graph.nodes.values().each { task -> - task.outputs.each { output -> - if( output.path !in renderedOutputs ) { - numOutputs += 1 - lines << " ${task.getSlug()} -->|${output.name}| o${numOutputs}(( ))" - } - } - } - - lines << "" - - file.text = lines.join('\n') - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy index 18550b4cc6..68d56a12d5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy @@ -40,7 +40,7 @@ class NodeMarker { static private Session getSession() { Global.session as Session } /** - * Creates a vertex in the abstract DAG representing a computing `process` + * Creates a new vertex in the DAG representing a computing `process` * * @param label The label associated to the process * @param inputs The list of inputs entering in the process @@ -52,7 +52,7 @@ class NodeMarker { } /** - * Creates a vertex in the abstract DAG representing a dataflow operator + * Creates a new DAG vertex representing a dataflow operator * * @param label The operator label * @param inputs The operator input(s). It can be either a single channel or a list of channels. @@ -66,7 +66,7 @@ class NodeMarker { } /** - * Creates a vertex in the abstract DAG representing a dataflow channel source. + * Creates a vertex in the DAG representing a dataflow channel source. * * @param label The node description * @param source Either a dataflow channel or a list of channel. diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 42ee522185..a94ded210f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -18,14 +18,9 @@ package nextflow.processor import static nextflow.processor.TaskStatus.* -import java.nio.file.Files import java.nio.file.NoSuchFileException -import groovy.json.JsonBuilder import groovy.util.logging.Slf4j -import nextflow.dag.ConcreteDAG -import nextflow.extension.FilesEx -import nextflow.script.params.FileOutParam import nextflow.trace.TraceRecord /** * Actions to handle the underlying job running the user task. @@ -218,29 +213,6 @@ abstract class TaskHandler { return record } - void writeMetaFile() { - final record = [ - hash: task.hash.toString(), - inputs: task.getInputFilesMap().collect { name, path -> - [ - name: name, - path: path.toString(), - predecessor: ConcreteDAG.getPredecessorHash(path) - ] - }, - outputs: task.getOutputsByType(FileOutParam).values().flatten().collect { path -> - [ - name: path.name, - path: path.toString(), - size: Files.size(path), - checksum: FilesEx.getChecksum(path) - ] - } - ] - - task.workDir.resolve(TaskRun.CMD_META).text = new JsonBuilder(record).toString() - } - /** * Determine if a process can be forked i.e. can launch * a parallel task execution. This is only enforced when diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 00f38a443a..88e173d33d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -543,7 +543,7 @@ class TaskProcessor { def invoke = new InvokeTaskAdapter(this, opInputs.size()) session.allOperators << (operator = new DataflowOperator(group, params, invoke)) - // notify the creation of a new process in the abstract DAG + // notify the creation of a new vertex the execution DAG NodeMarker.addProcessNode(this, config.getInputs(), config.getOutputs()) // fix issue #41 diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 703109d9c6..a45382a144 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -550,7 +550,6 @@ class TaskRun implements Cloneable { static final public String CMD_STAGE = '.command.stage' static final public String CMD_TRACE = '.command.trace' static final public String CMD_ENV = '.command.env' - static final public String CMD_META = '.command.meta.json' String toString( ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index cd7bd72d7e..dac68d9adb 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -94,7 +94,6 @@ class DefaultObserverFactory implements TraceObserverFactory { if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME def traceFile = (fileName as Path).complete() def observer = new GraphObserver(traceFile) - config.navigate('dag.type') { observer.type = it ?: 'abstract' } config.navigate('dag.overwrite') { observer.overwrite = it } result << observer } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy index 23c565277d..0a61d43eda 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy @@ -23,7 +23,6 @@ import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.CytoscapeHtmlRenderer import nextflow.dag.DAG -import nextflow.dag.ConcreteDAG import nextflow.dag.DagRenderer import nextflow.dag.DotRenderer import nextflow.dag.GexfRenderer @@ -46,11 +45,7 @@ class GraphObserver implements TraceObserver { private Path file - private String type = 'abstract' - - private DAG abstractDag - - private ConcreteDAG concreteDag + private DAG dag private String name @@ -71,8 +66,7 @@ class GraphObserver implements TraceObserver { @Override void onFlowCreate(Session session) { - this.abstractDag = session.dag - this.concreteDag = session.concreteDag + this.dag = session.dag // check file existance final attrs = FileHelper.readAttributes(file) if( attrs ) { @@ -83,40 +77,16 @@ class GraphObserver implements TraceObserver { } } - @Override - void onProcessSubmit(TaskHandler handler, TraceRecord trace) { - concreteDag.addTask( handler.task ) - } - - @Override - void onProcessComplete(TaskHandler handler, TraceRecord trace) { - concreteDag.addTaskOutputs( handler.task ) - } - - @Override - void onProcessCached(TaskHandler handler, TraceRecord trace) { - concreteDag.addTask( handler.task ) - concreteDag.addTaskOutputs( handler.task ) - } - @Override void onFlowComplete() { - if( type == 'abstract' ) { - // -- normalise the DAG - abstractDag.normalize() - // -- render it to a file - createRenderer().renderAbstractGraph(abstractDag,file) - } - else if( type == 'concrete' ) { - createRenderer().renderConcreteGraph(concreteDag,file) - } - else { - log.warn("Invalid DAG type '${type}', should be 'abstract' or 'concrete'") - } + // -- normalise the DAG + dag.normalize() + // -- render it to a file + createRender().renderDocument(dag,file) } @PackageScope - DagRenderer createRenderer() { + DagRenderer createRender() { if( format == 'dot' ) new DotRenderer(name) @@ -133,6 +103,28 @@ class GraphObserver implements TraceObserver { new GraphvizRenderer(name, format) } + + @Override + void onProcessCreate(TaskProcessor process) { + + } + + + @Override + void onProcessSubmit(TaskHandler handler, TraceRecord trace) { + + } + + @Override + void onProcessStart(TaskHandler handler, TraceRecord trace) { + + } + + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace) { + + } + @Override boolean enableMetrics() { return false diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy index e487d8746d..33a4327853 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -25,7 +25,6 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.DAG -import nextflow.dag.ConcreteDAG import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy deleted file mode 100644 index a666372169..0000000000 --- a/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2013-2023, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.dag - -import java.nio.file.Paths - -import com.google.common.hash.HashCode -import nextflow.processor.TaskRun -import spock.lang.Specification -/** - * - * @author Ben Sherman - */ -class ConcreteDAGTest extends Specification { - - def 'should add task nodes and outputs' () { - - given: - def task1 = Mock(TaskRun) { - getHash() >> HashCode.fromString('00112233445566778899aabbccddeeff') - getName() >> 'foo' - getInputFilesMap() >> [ - 'data.txt': Paths.get('/inputs/data.txt') - ] - getOutputsByType(_) >> [ - 'data.foo': Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - ] - } - def task2 = Mock(TaskRun) { - getHash() >> HashCode.fromString('aabbccddeeff00112233445566778899') - getName() >> 'bar' - getInputFilesMap() >> [ - 'data.foo': Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - ] - getOutputsByType(_) >> [ - 'data.bar': Paths.get('/work/aa/bbccddeeff00112233445566778899/data.bar') - ] - } - def dag = new ConcreteDAG() - - when: - dag.addTask( task1 ) - dag.addTask( task2 ) - def node1 = dag.nodes['00112233445566778899aabbccddeeff'] - def node2 = dag.nodes['aabbccddeeff00112233445566778899'] - then: - node1.index == 0 - node1.label == '[00/112233] foo' - node1.inputs.size() == 1 - node1.inputs[0].name == 'data.txt' - node1.inputs[0].path == Paths.get('/inputs/data.txt') - node1.inputs[0].predecessor == null - node2.index == 1 - node2.label == '[aa/bbccdd] bar' - node2.inputs.size() == 1 - node2.inputs[0].name == 'data.foo' - node2.inputs[0].path == Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - node2.inputs[0].predecessor == '00112233445566778899aabbccddeeff' - - when: - dag.addTaskOutputs( task1 ) - dag.addTaskOutputs( task2 ) - then: - node1.outputs.size() == 1 - node1.outputs[0].name == 'data.foo' - node1.outputs[0].path == Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - node2.outputs.size() == 1 - node2.outputs[0].name == 'data.bar' - node2.outputs[0].path == Paths.get('/work/aa/bbccddeeff00112233445566778899/data.bar') - } - -} diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy index 7e5921aeef..2ff86ebaf7 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy @@ -54,7 +54,7 @@ class DotRendererTest extends Specification { dag.normalize() when: - new DotRenderer('TheGraph').renderAbstractGraph(dag, file) + new DotRenderer('TheGraph').renderDocument(dag, file) then: file.text == ''' diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy index 6d4a9e3539..f112732b2e 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy @@ -47,7 +47,7 @@ class GexfRendererTest extends Specification { dag.normalize() when: - new GexfRenderer('TheGraph').renderAbstractGraph(dag, file.toPath()) + new GexfRenderer('TheGraph').renderDocument(dag, file.toPath()) then: def graph = new XmlSlurper().parse(file); assert graph.name() == 'gexf' diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy index b0d1226727..220422ac77 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy @@ -15,14 +15,13 @@ */ package nextflow.dag - import java.nio.file.Files -import java.nio.file.Paths import groovyx.gpars.dataflow.DataflowQueue -import nextflow.Session import spock.lang.Specification +import nextflow.Session + /** * * @author Ben Sherman @@ -33,7 +32,7 @@ class MermaidRendererTest extends Specification { new Session() } - def 'should render an abstract graph using the `mmd` format' () { + def 'should render a graph using the `mmd` format' () { given: def file = Files.createTempFile('test', null) def ch1 = new DataflowQueue() @@ -47,7 +46,7 @@ class MermaidRendererTest extends Specification { dag.normalize() when: - new MermaidRenderer().renderAbstractGraph(dag, file) + new MermaidRenderer().renderDocument(dag, file) then: file.text == ''' @@ -65,65 +64,4 @@ class MermaidRendererTest extends Specification { cleanup: file.delete() } - - def 'should render a concrete graph using the `mmd` format' () { - given: - def file = Files.createTempFile('test', null) - - def dag = Mock(ConcreteDAG) { - nodes >> [ - '012345': new ConcreteDAG.Task( - index: 1, - label: 'foo', - inputs: [ - new ConcreteDAG.Input( - name: 'data.txt', - path: Paths.get('/inputs/data.txt'), - predecessor: null - ) - ], - outputs: [ - new ConcreteDAG.Output( - name: 'data.foo', - path: Paths.get('/work/012345/data.foo'), - ) - ] - ), - 'abcdef': new ConcreteDAG.Task( - index: 2, - label: 'bar', - inputs: [ - new ConcreteDAG.Input( - name: 'data.foo', - path: Paths.get('/work/012345/data.foo'), - predecessor: '012345' - ) - ], - outputs: [ - new ConcreteDAG.Output( - name: 'data.bar', - path: Paths.get('/work/abcdef/data.bar'), - ) - ] - ) - ] - } - - when: - new MermaidRenderer().renderConcreteGraph(dag, file) - then: - file.text == - ''' - flowchart TD - t1["foo"] - i1(( )) -->|data.txt| t1 - t2["bar"] - t1 -->|data.foo| t2 - t2 -->|data.bar| o1(( )) - ''' - .stripIndent().leftTrim() - - cleanup: - file.delete() - } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index ad94ce47a7..1846fb0838 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -16,10 +16,8 @@ package nextflow.processor -import java.nio.file.Paths import java.util.concurrent.atomic.LongAdder -import com.google.common.hash.HashCode import nextflow.Session import nextflow.executor.Executor import nextflow.util.Duration @@ -137,29 +135,6 @@ class TaskHandlerTest extends Specification { } - def 'should write meta file' () { - - given: - def folder = File.createTempDir() - def outputFile = new File(folder, 'bar.txt') ; outputFile.text = 'bar' - def task = Mock(TaskRun) { - hash >> HashCode.fromString('aabbccddeeff00112233445566778899') - workDir >> folder.toPath() - getInputFilesMap() >> [ 'foo.txt': Paths.get('/tmp/00/112233445566778899aabbccddeeff/foo.txt') ] - getOutputsByType(_) >> [ 'bar.txt': outputFile.toPath() ] - } - def handler = [:] as TaskHandler - handler.task = task - - when: - handler.writeMetaFile() - then: - task.workDir.resolve(TaskRun.CMD_META).text == """{"hash":"aabbccddeeff00112233445566778899","inputs":[{"name":"foo.txt","path":"/tmp/00/112233445566778899aabbccddeeff/foo.txt","predecessor":"00112233445566778899aabbccddeeff"}],"outputs":[{"name":"bar.txt","path":"${folder}/bar.txt","size":3,"checksum":"37b51d194a7513e45b56f6524f2d51f2"}]}""" - - cleanup: - folder.delete() - } - LongAdder _adder(Integer x) { if( x != null ) { def adder = new LongAdder() diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy index 384c737944..f3d13bc507 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy @@ -35,7 +35,7 @@ import test.TestHelper */ class GraphObserverTest extends Specification { - DAG dag + DAG test_dag def setup() { new Session() @@ -45,28 +45,28 @@ class GraphObserverTest extends Specification { def ch2 = new DataflowQueue() def ch3 = new DataflowQueue() - dag = new DAG() + test_dag = new DAG() - dag.addVertex( + test_dag.addVertex( DAG.Type.ORIGIN, 'Source', null, [ new DAG.ChannelHandler(channel: src, label: 'src') ] ) - dag.addVertex( + test_dag.addVertex( DAG.Type.PROCESS, 'Process 1', [ new DAG.ChannelHandler(channel: src, label: 'Source') ], [ new DAG.ChannelHandler(channel: ch1, label: 'Channel 1') ] ) - dag.addVertex( + test_dag.addVertex( DAG.Type.OPERATOR, 'Filter', [ new DAG.ChannelHandler(channel: ch1, label: 'Channel 1') ], [ new DAG.ChannelHandler(channel: ch2, label: 'Channel 2') ] ) - dag.addVertex( + test_dag.addVertex( DAG.Type.PROCESS, 'Process 2', [ new DAG.ChannelHandler(channel: ch2, label: 'Channel 2') ], @@ -78,7 +78,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf_','.dot') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -105,7 +105,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.html') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -133,7 +133,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.svg') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -151,7 +151,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.png') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -168,7 +168,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.pdf') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -185,7 +185,7 @@ class GraphObserverTest extends Specification { def folder = Files.createTempDirectory('test') def file = folder.resolve('nope') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -216,34 +216,34 @@ class GraphObserverTest extends Specification { then: observer.name == 'hello-world' observer.format == 'dot' - observer.createRenderer() instanceof DotRenderer + observer.createRender() instanceof DotRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.html')) then: observer.name == 'TheGraph' observer.format == 'html' - observer.createRenderer() instanceof CytoscapeHtmlRenderer + observer.createRender() instanceof CytoscapeHtmlRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.mmd')) then: observer.name == 'TheGraph' observer.format == 'mmd' - observer.createRenderer() instanceof MermaidRenderer + observer.createRender() instanceof MermaidRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.SVG')) then: observer.name == 'TheGraph' observer.format == 'svg' - observer.createRenderer() instanceof GraphvizRenderer + observer.createRender() instanceof GraphvizRenderer when: observer = new GraphObserver(Paths.get('/path/to/anonymous')) then: observer.name == 'anonymous' observer.format == 'dot' - observer.createRenderer() instanceof DotRenderer + observer.createRender() instanceof DotRenderer } } diff --git a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy index 026d0dbc92..389dab5107 100644 --- a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy +++ b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy @@ -34,14 +34,12 @@ import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileTime import java.nio.file.attribute.PosixFilePermission import java.nio.file.attribute.PosixFilePermissions -import java.security.MessageDigest import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.transform.stc.ClosureParams import groovy.transform.stc.FromString import groovy.util.logging.Slf4j -import nextflow.file.ETagAwareFile import nextflow.file.FileHelper import nextflow.file.FileSystemPathFactory import nextflow.io.ByteBufferBackedInputStream @@ -1601,22 +1599,4 @@ class FilesEx { static String getScheme(Path path) { path.getFileSystem().provider().getScheme() } - - static String getChecksum(Path path) { - if( Files.isDirectory(path) ) - return null - - if( path instanceof ETagAwareFile ) - return ((ETagAwareFile)path).getETag() - - final is = Files.newInputStream(path) - final md = MessageDigest.getInstance('MD5') - final buf = new byte[16 << 10] - - int len - while( (len=is.read(buf)) != -1 ) - md.update(buf, 0, len) - - new BigInteger(1, md.digest()).toString(16) - } } diff --git a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy deleted file mode 100644 index f1c40073b3..0000000000 --- a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2013-2023, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package nextflow.file - -/** - * Defines the interface for files that have an ETag - * - * @author Ben Sherman - */ -interface ETagAwareFile { - - String getETag() - -} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java index a7b7a4e1ed..5ce117ac6f 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java @@ -42,14 +42,13 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import nextflow.file.ETagAwareFile; import nextflow.file.TagAwareFile; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; -public class S3Path implements Path, ETagAwareFile, TagAwareFile { +public class S3Path implements Path, TagAwareFile { public static final String PATH_SEPARATOR = "/"; /** @@ -555,14 +554,6 @@ public String getContentType() { return contentType; } - @Override - public String getETag() { - return fileSystem - .getClient() - .getObjectMetadata(getBucket(), getKey()) - .getETag(); - } - public String getStorageClass() { return storageClass; } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy index ba07cda22f..a955888d70 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy @@ -46,8 +46,6 @@ class AzFileAttributes implements BasicFileAttributes { private String objectId - private String etag - static AzFileAttributes root() { new AzFileAttributes(size: 0, objectId: '/', directory: true) } @@ -62,7 +60,6 @@ class AzFileAttributes implements BasicFileAttributes { updateTime = time(props.getLastModified()) directory = client.blobName.endsWith('/') size = props.getBlobSize() - etag = props.getETag() } AzFileAttributes(String containerName, BlobItem item) { @@ -72,7 +69,6 @@ class AzFileAttributes implements BasicFileAttributes { creationTime = time(item.properties.getCreationTime()) updateTime = time(item.properties.getLastModified()) size = item.properties.getContentLength() - etag = item.properties.getETag() } } @@ -148,10 +144,6 @@ class AzFileAttributes implements BasicFileAttributes { return objectId } - String getETag() { - return etag - } - @Override boolean equals( Object obj ) { if( this.class != obj?.class ) return false diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy index 3bdd222f6b..2f654b4ad8 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy @@ -29,7 +29,6 @@ import com.azure.storage.blob.models.BlobItem import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.PackageScope -import nextflow.file.ETagAwareFile /** * Implements Azure path object @@ -38,7 +37,7 @@ import nextflow.file.ETagAwareFile */ @CompileStatic @EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true) -class AzPath implements Path, ETagAwareFile { +class AzPath implements Path { private AzFileSystem fs @@ -306,11 +305,6 @@ class AzPath implements Path, ETagAwareFile { return this.toString() <=> other.toString() } - @Override - String getETag() { - return attributes.getETag() - } - String getContainerName() { if( path.isAbsolute() ) { path.nameCount==0 ? '/' : path.getName(0)