From 6b10519cb3964682894ed9a2b3ce2c6d81abe727 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 19 Feb 2024 18:29:10 +0100 Subject: [PATCH] CLI: First proof-of-concept implementation of file dump for workchains. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit builds on the [pull request](https://github.com/aiidateam/aiida-core/pull/6276) by @qiaojunfeng. It implements the following changes: - `_get_input_filename` function is removed - `workchain inputsave` is split up and modified, with the recursive logic to traverse the `ProcessNodes` of the workchain moved to `_recursive_get_node_path`, the directory creation moved to `_workchain_maketree`, and the file dumping moved to `workchain_filedump` - `_recursive_get_node_path` builds up a list of tuples of "paths" and `CalcJobNodes`. The "paths" are based on the layout of the workchain, and utilize the `link_labels`, `process_labels`, as well as the "iteration" counter in the `link_labels` -> This might not be the best data structure here, but allows for extending the return value during recursion -> Rather than using the `ProcessNodes` directly, one could also only use the `pks` and load the nodes when needed - In the `PwBandsWorkChain` used for development, the "top level", processes had the `link_labels` set, so they were missing any numbering. Thus, I added it via `_number_path_elements`. Right now, this is just a quick fix, as it just works for the top-level, though, such a function could possibly take care of the numbering of all levels. Ideally, one would extract it directly from the data contained in the `WorkChain`, but I think that's difficult if some steps might be missing the iteration counter in their label. - Eventually I think it would be nice to be able to just create the empty directory tree, without dumping input/output files, so the `_workchain_maketree` is somewhat of a placeholder for that - `calcjob_inputdump` and `calcjob_outputdump` added to to `cmd_calcjob` So far, there's not really any error handling, and the code contains probably quite some issues (for example, the "path" naming breaks in complex cases like the `SelfConsistentHubbardWorkChain`), though, I wanted to get some feedback, and ensure I'm somewhat on a reasonable trajectory before generalizing and improving things. Regarding our discussion in PR #6276, for working on an implementation of a *complete* version that makes the steps fully re-submittable, that might be an additional, future step, in which @sphuber could hopefully provide me some pointers (for now, I added a warning that about that). The current commands don't require any plugin, only `core` and the data. The result of `verdi workchain filedump --path ./wc-` from an exemplary `PwBandsWorkChain`: ```shell Warning: Caution: No provenance. The retrieved input/output files are not guaranteed to be complete for a full restart of the given workchain. Instead, this utility is intended for easy inspection of the files that were involved in its execution. For restarting workchains, see the `get_builder_restart` method instead. ./wc-3057/ ├── 01-relax │   ├── 01-PwBaseWC │   │   └── 01-PwCalc │   │   ├── aiida.in │   │   ├── aiida.out │   │   ├── _aiidasubmit.sh │   │   ├── data-file-schema.xml │   │   ├── _scheduler-stderr.txt │   │   └── _scheduler-stdout.txt │   └── 02-PwBaseWC │   └── 01-PwCalc │   ├── aiida.in │   ├── aiida.out │   ├── _aiidasubmit.sh │   ├── data-file-schema.xml │   ├── _scheduler-stderr.txt │   └── _scheduler-stdout.txt ├── 02-scf │   └── 01-PwCalc │   ├── aiida.in │   ├── aiida.out │   ├── _aiidasubmit.sh │   ├── data-file-schema.xml │   ├── _scheduler-stderr.txt │   └── _scheduler-stdout.txt └── 03-bands └── 01-PwCalc ├── aiida.in ├── aiida.out ├── _aiidasubmit.sh ├── data-file-schema.xml ├── _scheduler-stderr.txt └── _scheduler-stdout.txt 9 directories, 24 files ``` --- src/aiida/cmdline/commands/cmd_calcjob.py | 44 ++++ src/aiida/cmdline/commands/cmd_workchain.py | 210 +++++++++++++++----- 2 files changed, 201 insertions(+), 53 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_calcjob.py b/src/aiida/cmdline/commands/cmd_calcjob.py index edb7bb5f7b..7a788b337e 100644 --- a/src/aiida/cmdline/commands/cmd_calcjob.py +++ b/src/aiida/cmdline/commands/cmd_calcjob.py @@ -347,3 +347,47 @@ def get_remote_and_path(calcjob, path=None): f'nor does its associated process class `{calcjob.process_class.__class__.__name__}`\n' 'Please specify a path explicitly.' ) + + +@verdi_calcjob.command('inputdump') +@arguments.CALCULATION('calcjob', type=CalculationParamType(sub_classes=('aiida.node:process.calculation.calcjob',))) +@click.option( + '--path', + '-p', + type=click.Path(), + default=None, + show_default=True, + help='The directory to save the dumped input files.', +) +def calcjob_inputdump(calcjob, path): + from pathlib import Path + + if path is None: + path = '.' / Path(f'cj-{calcjob.pk}') + + try: + calcjob.base.repository.copy_tree(Path(path).resolve()) + except: + raise + + +@verdi_calcjob.command('outputdump') +@arguments.CALCULATION('calcjob', type=CalculationParamType(sub_classes=('aiida.node:process.calculation.calcjob',))) +@click.option( + '--path', + '-p', + type=click.Path(), + default=None, + show_default=True, + help='The directory to save the dumped output files.', +) +def calcjob_outputdump(calcjob, path): + from pathlib import Path + + if path is None: + path = '.' / Path(f'cj-{calcjob}') + + try: + calcjob.outputs.retrieved.copy_tree(path.resolve()) + except: + raise diff --git a/src/aiida/cmdline/commands/cmd_workchain.py b/src/aiida/cmdline/commands/cmd_workchain.py index 9f9dc36312..91c756d2be 100644 --- a/src/aiida/cmdline/commands/cmd_workchain.py +++ b/src/aiida/cmdline/commands/cmd_workchain.py @@ -8,14 +8,23 @@ ########################################################################### """`verdi workchain` commands.""" +import re +from pathlib import Path +from typing import List, Optional, Tuple + import click from aiida import orm +from aiida.cmdline.commands.cmd_calcjob import calcjob_inputdump, calcjob_outputdump from aiida.cmdline.commands.cmd_verdi import verdi from aiida.cmdline.params import arguments from aiida.cmdline.params.types import WorkflowParamType from aiida.cmdline.utils import echo from aiida.common import LinkType +from aiida.engine.processes.calcjobs.calcjob import CalcJob +from aiida.engine.processes.workchains.workchain import WorkChainNode +from aiida.orm import ProcessNode +from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode # TODO I have several other cli functions that are useful for # my own work, but somehow it's not easy to merge them @@ -37,38 +46,122 @@ # workchain cli commands are similar, we should try to merge them into one. -@verdi.group('workchain') -def verdi_workchain(): - """Inspect and manage workchains.""" +def _recursive_get_node_path( + called: ProcessNode, + recursion_path: Optional[Path] = Path('.'), + path_node_list: Optional[List[Tuple[Path, ProcessNode]]] = None, +) -> List[Tuple[Path, ProcessNode]]: + """Recursively retrieves CalcJobs of WorkChain and generates "paths". + + This function recursively traces down a WorkChain to the lowest-level + CalcJobs and generates the corresponding "paths" based on the structure of + the WorkChain. These are returned together with the corresponding + CalcJobNodes as a list of tuples. + Taken from: https://github.com/aiidateam/aiida-core/pull/6276/ and modified. + + Args: + called (ProcessNode): Current ProcessNode, can belong to Calculation + or WorkflowNode. + recursion_path (Optional[Path], optional): Parent path at each recursive + function call. Defaults to cwd. + path_node_list (Optional[tuple], optional): List of tuples containing + the "path" based on the workchain structure, and the actual CalcJobNode. + Is populated during the recursive function calls. + + Returns: + List[Tuple[Path, ProcessNode]]: Filled path_node_list. + """ + if path_node_list is None: + path_node_list = [] + + links = called.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all() + links.sort(key=lambda x: x.node.ctime) + + for link in links: + link_label = link.link_label + called = link.node + + process_label = called.process_label.replace('WorkChain', 'WC').replace('Calculation', 'Calc') + + if 'iteration_' in link_label: + link_label = link_label.replace('iteration_', '') + path_label = f'{link_label}-{process_label}' + else: + path_label = link_label + + save_path = recursion_path / path_label + + if isinstance(called, WorkChainNode): + _recursive_get_node_path(called, recursion_path=save_path, path_node_list=path_node_list) + + elif isinstance(called, CalcJobNode): + path_node_list.append((save_path, called)) + + return path_node_list -def _get_input_filename(calcjob: orm.CalcJobNode) -> str: - """Get the input filename of a CalcJob.""" +def _number_path_elements(path_list: List[Path], parent_path: Path) -> List[Path]: + # Could make this function iterate through all path parts to create the + # actual numbering hierarchy, if that is needed. Right now, all numbers + # below the top-level are directoly taken from the `iteration_` + # care of by AiiDA via `iteration` + + # TODO: Turn this into general function to take care of numbering with + # variable depth + """Utility to add numbering of the steps of the WorkChain. + + Returns: + List: Updated list of PosixPaths with the relative numbering of each + step added. + """ + main_counter = 0 + top_label = '' + modified_paths = [] + + for posix_path in path_list: + current_label = posix_path.parts[0] + if current_label != top_label: + main_counter += 1 + top_label = current_label + + path_parts = posix_path.parts + if not re.match(r'^0\d', current_label): + numbered_parent = f'{main_counter:02d}-{path_parts[0]}' + modified_path_parts = [parent_path, numbered_parent, *path_parts[1:]] + else: + modified_path_parts = [parent_path, *path_parts] - # TODO copied from - # https://github.com/aiidateam/aiida-core/blob/06ea130df8854f621e25853af6ac723c37397ed0/src/aiida/cmdline/commands/cmd_calcjob.py#L90-L106 - # should be deduplicated + modified_path = Path(*modified_path_parts) + modified_paths.append(modified_path) + return modified_paths - # Get path from the given CalcJobNode if not defined by user - path = calcjob.get_option('input_filename') - # Get path from current process class of CalcJobNode if still not defined - if path is None: - fname = calcjob.process_class.spec_options.get('input_filename') - if fname and fname.has_default(): - path = fname.default +def _workchain_maketree(numbered_paths: Tuple[Path]): + """Generate directory tree from a tuple of appropriately labeled and + numbered relative `CalcJobNode` "paths". - if path is None: - # Still no path available - echo.echo_critical( - '"{}" and its process class "{}" do not define a default input file ' - '(option "input_filename" not found).\n' - 'Please specify a path explicitly.'.format(calcjob.__class__.__name__, calcjob.process_class.__name__) - ) - return path + Args: + numbered_paths (Tuple[Path]): Labeled and numbered relative + `CalcJobNode` "paths", which were obtained from the structure of a workchain. + """ + # TODO: Eventually provide this as cli function to only generate the + # directory tree, without dumping the files. This could be done + # by giving the WorkChainNode as argument, but would then require calling + # the recursive function `_get_path_node_tuples` again, as well as adding + # the numbering, so would lead to code repetition. Not sure how to avoid + # that, for now. -@verdi_workchain.command('inputsave') + for numbered_path in numbered_paths: + numbered_path.mkdir(parents=True) + + +@verdi.group('workchain') +def verdi_workchain(): + """Inspect and manage workchains.""" + + +@verdi_workchain.command('filedump') @arguments.WORKFLOW('workchain', type=WorkflowParamType(sub_classes=('aiida.node:process.workflow.workchain',))) @click.option( '--path', @@ -76,34 +169,45 @@ def _get_input_filename(calcjob: orm.CalcJobNode) -> str: type=click.Path(), default='.', show_default=True, - help='The directory to save all the input files.', + help='The parent directory to save the data of the workchain.', +) +@click.option( + '--mode', + '-m', + type=str, + default='all', + show_default=True, + help='Which files to dump? Options: input/output/all.', ) @click.pass_context -def workchain_inputsave(ctx, workchain, path): - """Save input files of a workchain.""" - from contextlib import redirect_stdout - from pathlib import Path - - from aiida.cmdline.commands.cmd_calcjob import calcjob_inputcat - - dir_path = Path(path) - if not dir_path.exists(): - dir_path.mkdir() - - links = workchain.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all() - links.sort(key=lambda x: x.node.ctime) - - for i, link in enumerate(links): - link_label = link.link_label - called = link.node - subdir_path = dir_path / f'{i+1}-{link_label}' - subdir_path.mkdir() - - if isinstance(called, orm.WorkChainNode): - ctx.invoke(workchain_inputsave, workchain=called, path=subdir_path) - else: - save_path = subdir_path / _get_input_filename(called) - with open(save_path, 'w', encoding='utf-8') as handle: - with redirect_stdout(handle): - ctx.invoke(calcjob_inputcat, calcjob=called) - echo.echo(f'Saved to {save_path}') +def workchain_filedump(ctx, workchain, path, mode): + """Dump input/output files of calcjobs run by the given workchain.""" + + echo.echo_warning( + 'Caution: No provenance. The retrieved input/output files are not guaranteed to be complete ' + 'for a full restart of the given workchain. Instead, this utility is intended for easy inspection ' + 'of the files that were involved in its execution. For restarting workchains, see the `get_builder_restart` ' + 'method instead.' + ) + + path_node_list = _recursive_get_node_path(workchain) + paths = [_[0] for _ in path_node_list] + paths = _number_path_elements(path_list=paths, parent_path=path) + nodes = [_[1] for _ in path_node_list] + + if not Path(path).is_dir(): + # ctx.invoke(_workchain_maketree, workchain=workchain, path=path) + _workchain_maketree(numbered_paths=paths) + + mode_functions = { + 'input': [calcjob_inputdump], + 'output': [calcjob_outputdump], + 'all': [calcjob_inputdump, calcjob_outputdump], + } + + if mode not in mode_functions: + raise KeyError(f'Provided mode not available, must be one of: {"/".join(mode_functions.keys())}.') + + for calcjob_node_path, calcjob_node in zip(paths, nodes): + for func in mode_functions[mode]: + ctx.invoke(func, calcjob=calcjob_node, path=calcjob_node_path)