Skip to content

Commit

Permalink
CLI: First proof-of-concept implementation of file dump for workchains.
Browse files Browse the repository at this point in the history
This commit builds on the [pull request](aiidateam#6276) by @qiaojunfeng. It implements the following changes:
- `_get_input_filename` function is removed
- `workchain inputsave` is split up and modified, with the recursive logic to traverse the `ProcessNodes` of the workchain moved to `_recursive_get_node_path`, the directory creation moved to `_workchain_maketree`, and the file dumping moved to `workchain_filedump`
  - `_recursive_get_node_path` builds up a list of tuples of "paths" and `CalcJobNodes`. The "paths" are based on the layout of the workchain, and utilize the `link_labels`, `process_labels`, as well as the "iteration" counter in the `link_labels`
    -> This might not be the best data structure here, but allows for extending the return value during recursion
    -> Rather than using the `ProcessNodes` directly, one could also only use the `pks` and load the nodes when needed
  - In the `PwBandsWorkChain` used for development, the "top level", processes had the `link_labels` set, so they were missing any numbering. Thus, I added it via `_number_path_elements`. Right now, this is just a quick fix, as it just works for the top-level, though, such a function could possibly take care of the numbering of all levels. Ideally, one would extract it directly from the data contained in the `WorkChain`, but I think that's difficult if some steps might be missing the iteration counter in their label.
  - Eventually I think it would be nice to be able to just create the empty directory tree, without dumping input/output files, so the `_workchain_maketree` is somewhat of a placeholder for that
- `calcjob_inputdump` and `calcjob_outputdump` added to to `cmd_calcjob`

So far, there's not really any error handling, and the code contains probably quite some issues (for example, the "path" naming breaks in complex cases like the `SelfConsistentHubbardWorkChain`), though, I wanted to get some feedback, and ensure I'm somewhat on a reasonable trajectory before generalizing and improving things. Regarding our discussion in PR aiidateam#6276, for working on an implementation of a *complete* version that makes the steps fully re-submittable, that might be an additional, future step, in which @sphuber could hopefully provide me some pointers (for now, I added a warning that about that). The current commands don't require any plugin, only `core` and the data.

The result of `verdi workchain filedump <wc_pk> --path ./wc-<wc_pk>` from an exemplary `PwBandsWorkChain`:

```shell
Warning: Caution: No provenance. The retrieved input/output files are not guaranteed to be complete for a full restart of the given workchain. Instead, this utility is intended for easy inspection of the files that were involved in its execution. For restarting workchains, see the `get_builder_restart` method instead.
./wc-3057/
├── 01-relax
│   ├── 01-PwBaseWC
│   │   └── 01-PwCalc
│   │       ├── aiida.in
│   │       ├── aiida.out
│   │       ├── _aiidasubmit.sh
│   │       ├── data-file-schema.xml
│   │       ├── _scheduler-stderr.txt
│   │       └── _scheduler-stdout.txt
│   └── 02-PwBaseWC
│       └── 01-PwCalc
│           ├── aiida.in
│           ├── aiida.out
│           ├── _aiidasubmit.sh
│           ├── data-file-schema.xml
│           ├── _scheduler-stderr.txt
│           └── _scheduler-stdout.txt
├── 02-scf
│   └── 01-PwCalc
│       ├── aiida.in
│       ├── aiida.out
│       ├── _aiidasubmit.sh
│       ├── data-file-schema.xml
│       ├── _scheduler-stderr.txt
│       └── _scheduler-stdout.txt
└── 03-bands
    └── 01-PwCalc
        ├── aiida.in
        ├── aiida.out
        ├── _aiidasubmit.sh
        ├── data-file-schema.xml
        ├── _scheduler-stderr.txt
        └── _scheduler-stdout.txt

9 directories, 24 files
```
  • Loading branch information
GeigerJ2 committed Feb 19, 2024
1 parent 1410179 commit 6b10519
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 53 deletions.
44 changes: 44 additions & 0 deletions src/aiida/cmdline/commands/cmd_calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,47 @@ def get_remote_and_path(calcjob, path=None):
f'nor does its associated process class `{calcjob.process_class.__class__.__name__}`\n'
'Please specify a path explicitly.'
)


@verdi_calcjob.command('inputdump')
@arguments.CALCULATION('calcjob', type=CalculationParamType(sub_classes=('aiida.node:process.calculation.calcjob',)))
@click.option(
'--path',
'-p',
type=click.Path(),
default=None,
show_default=True,
help='The directory to save the dumped input files.',
)
def calcjob_inputdump(calcjob, path):
from pathlib import Path

if path is None:
path = '.' / Path(f'cj-{calcjob.pk}')

try:
calcjob.base.repository.copy_tree(Path(path).resolve())
except:
raise


@verdi_calcjob.command('outputdump')
@arguments.CALCULATION('calcjob', type=CalculationParamType(sub_classes=('aiida.node:process.calculation.calcjob',)))
@click.option(
'--path',
'-p',
type=click.Path(),
default=None,
show_default=True,
help='The directory to save the dumped output files.',
)
def calcjob_outputdump(calcjob, path):
from pathlib import Path

if path is None:
path = '.' / Path(f'cj-{calcjob}')

try:
calcjob.outputs.retrieved.copy_tree(path.resolve())
except:
raise
210 changes: 157 additions & 53 deletions src/aiida/cmdline/commands/cmd_workchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@
###########################################################################
"""`verdi workchain` commands."""

import re
from pathlib import Path
from typing import List, Optional, Tuple

import click

from aiida import orm
from aiida.cmdline.commands.cmd_calcjob import calcjob_inputdump, calcjob_outputdump
from aiida.cmdline.commands.cmd_verdi import verdi
from aiida.cmdline.params import arguments
from aiida.cmdline.params.types import WorkflowParamType
from aiida.cmdline.utils import echo
from aiida.common import LinkType
from aiida.engine.processes.calcjobs.calcjob import CalcJob
from aiida.engine.processes.workchains.workchain import WorkChainNode
from aiida.orm import ProcessNode
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode

# TODO I have several other cli functions that are useful for
# my own work, but somehow it's not easy to merge them
Expand All @@ -37,73 +46,168 @@
# workchain cli commands are similar, we should try to merge them into one.


@verdi.group('workchain')
def verdi_workchain():
"""Inspect and manage workchains."""
def _recursive_get_node_path(
called: ProcessNode,
recursion_path: Optional[Path] = Path('.'),
path_node_list: Optional[List[Tuple[Path, ProcessNode]]] = None,
) -> List[Tuple[Path, ProcessNode]]:
"""Recursively retrieves CalcJobs of WorkChain and generates "paths".
This function recursively traces down a WorkChain to the lowest-level
CalcJobs and generates the corresponding "paths" based on the structure of
the WorkChain. These are returned together with the corresponding
CalcJobNodes as a list of tuples.
Taken from: https://github.com/aiidateam/aiida-core/pull/6276/ and modified.
Args:
called (ProcessNode): Current ProcessNode, can belong to Calculation
or WorkflowNode.
recursion_path (Optional[Path], optional): Parent path at each recursive
function call. Defaults to cwd.
path_node_list (Optional[tuple], optional): List of tuples containing
the "path" based on the workchain structure, and the actual CalcJobNode.
Is populated during the recursive function calls.
Returns:
List[Tuple[Path, ProcessNode]]: Filled path_node_list.
"""
if path_node_list is None:
path_node_list = []

links = called.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all()
links.sort(key=lambda x: x.node.ctime)

for link in links:
link_label = link.link_label
called = link.node

process_label = called.process_label.replace('WorkChain', 'WC').replace('Calculation', 'Calc')

if 'iteration_' in link_label:
link_label = link_label.replace('iteration_', '')
path_label = f'{link_label}-{process_label}'
else:
path_label = link_label

save_path = recursion_path / path_label

if isinstance(called, WorkChainNode):
_recursive_get_node_path(called, recursion_path=save_path, path_node_list=path_node_list)

elif isinstance(called, CalcJobNode):
path_node_list.append((save_path, called))

return path_node_list


def _get_input_filename(calcjob: orm.CalcJobNode) -> str:
"""Get the input filename of a CalcJob."""
def _number_path_elements(path_list: List[Path], parent_path: Path) -> List[Path]:
# Could make this function iterate through all path parts to create the
# actual numbering hierarchy, if that is needed. Right now, all numbers
# below the top-level are directoly taken from the `iteration_`
# care of by AiiDA via `iteration`

# TODO: Turn this into general function to take care of numbering with
# variable depth
"""Utility to add numbering of the steps of the WorkChain.
Returns:
List: Updated list of PosixPaths with the relative numbering of each
step added.
"""
main_counter = 0
top_label = ''
modified_paths = []

for posix_path in path_list:
current_label = posix_path.parts[0]
if current_label != top_label:
main_counter += 1
top_label = current_label

path_parts = posix_path.parts
if not re.match(r'^0\d', current_label):
numbered_parent = f'{main_counter:02d}-{path_parts[0]}'
modified_path_parts = [parent_path, numbered_parent, *path_parts[1:]]
else:
modified_path_parts = [parent_path, *path_parts]

# TODO copied from
# https://github.com/aiidateam/aiida-core/blob/06ea130df8854f621e25853af6ac723c37397ed0/src/aiida/cmdline/commands/cmd_calcjob.py#L90-L106
# should be deduplicated
modified_path = Path(*modified_path_parts)
modified_paths.append(modified_path)
return modified_paths

# Get path from the given CalcJobNode if not defined by user
path = calcjob.get_option('input_filename')

# Get path from current process class of CalcJobNode if still not defined
if path is None:
fname = calcjob.process_class.spec_options.get('input_filename')
if fname and fname.has_default():
path = fname.default
def _workchain_maketree(numbered_paths: Tuple[Path]):
"""Generate directory tree from a tuple of appropriately labeled and
numbered relative `CalcJobNode` "paths".
if path is None:
# Still no path available
echo.echo_critical(
'"{}" and its process class "{}" do not define a default input file '
'(option "input_filename" not found).\n'
'Please specify a path explicitly.'.format(calcjob.__class__.__name__, calcjob.process_class.__name__)
)
return path
Args:
numbered_paths (Tuple[Path]): Labeled and numbered relative
`CalcJobNode` "paths", which were obtained from the structure of a workchain.
"""
# TODO: Eventually provide this as cli function to only generate the
# directory tree, without dumping the files. This could be done
# by giving the WorkChainNode as argument, but would then require calling
# the recursive function `_get_path_node_tuples` again, as well as adding
# the numbering, so would lead to code repetition. Not sure how to avoid
# that, for now.

@verdi_workchain.command('inputsave')
for numbered_path in numbered_paths:
numbered_path.mkdir(parents=True)


@verdi.group('workchain')
def verdi_workchain():
"""Inspect and manage workchains."""


@verdi_workchain.command('filedump')
@arguments.WORKFLOW('workchain', type=WorkflowParamType(sub_classes=('aiida.node:process.workflow.workchain',)))
@click.option(
'--path',
'-p',
type=click.Path(),
default='.',
show_default=True,
help='The directory to save all the input files.',
help='The parent directory to save the data of the workchain.',
)
@click.option(
'--mode',
'-m',
type=str,
default='all',
show_default=True,
help='Which files to dump? Options: input/output/all.',
)
@click.pass_context
def workchain_inputsave(ctx, workchain, path):
"""Save input files of a workchain."""
from contextlib import redirect_stdout
from pathlib import Path

from aiida.cmdline.commands.cmd_calcjob import calcjob_inputcat

dir_path = Path(path)
if not dir_path.exists():
dir_path.mkdir()

links = workchain.base.links.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)).all()
links.sort(key=lambda x: x.node.ctime)

for i, link in enumerate(links):
link_label = link.link_label
called = link.node
subdir_path = dir_path / f'{i+1}-{link_label}'
subdir_path.mkdir()

if isinstance(called, orm.WorkChainNode):
ctx.invoke(workchain_inputsave, workchain=called, path=subdir_path)
else:
save_path = subdir_path / _get_input_filename(called)
with open(save_path, 'w', encoding='utf-8') as handle:
with redirect_stdout(handle):
ctx.invoke(calcjob_inputcat, calcjob=called)
echo.echo(f'Saved to {save_path}')
def workchain_filedump(ctx, workchain, path, mode):
"""Dump input/output files of calcjobs run by the given workchain."""

echo.echo_warning(
'Caution: No provenance. The retrieved input/output files are not guaranteed to be complete '
'for a full restart of the given workchain. Instead, this utility is intended for easy inspection '
'of the files that were involved in its execution. For restarting workchains, see the `get_builder_restart` '
'method instead.'
)

path_node_list = _recursive_get_node_path(workchain)
paths = [_[0] for _ in path_node_list]
paths = _number_path_elements(path_list=paths, parent_path=path)
nodes = [_[1] for _ in path_node_list]

if not Path(path).is_dir():
# ctx.invoke(_workchain_maketree, workchain=workchain, path=path)
_workchain_maketree(numbered_paths=paths)

mode_functions = {
'input': [calcjob_inputdump],
'output': [calcjob_outputdump],
'all': [calcjob_inputdump, calcjob_outputdump],
}

if mode not in mode_functions:
raise KeyError(f'Provided mode not available, must be one of: {"/".join(mode_functions.keys())}.')

for calcjob_node_path, calcjob_node in zip(paths, nodes):
for func in mode_functions[mode]:
ctx.invoke(func, calcjob=calcjob_node, path=calcjob_node_path)

0 comments on commit 6b10519

Please sign in to comment.