Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport: AsyncTransport plugin #6626

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- python~=3.9
- alembic~=1.2
- archive-path~=0.4.2
- asyncssh~=2.19.0
- circus~=0.18.0
- click-spinner~=0.1.8
- click~=8.1
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [
dependencies = [
'alembic~=1.2',
'archive-path~=0.4.2',
"asyncssh~=2.19.0",
unkcpz marked this conversation as resolved.
Show resolved Hide resolved
'circus~=0.18.0',
'click-spinner~=0.1.8',
'click~=8.1',
Expand Down Expand Up @@ -175,6 +176,7 @@ requires-python = '>=3.9'
[project.entry-points.'aiida.transports']
'core.local' = 'aiida.transports.plugins.local:LocalTransport'
'core.ssh' = 'aiida.transports.plugins.ssh:SshTransport'
'core.ssh_async' = 'aiida.transports.plugins.ssh_async:AsyncSshTransport'
'core.ssh_auto' = 'aiida.transports.plugins.ssh_auto:SshAutoTransport'

[project.entry-points.'aiida.workflows']
Expand Down Expand Up @@ -308,6 +310,7 @@ module = 'tests.*'
ignore_missing_imports = true
module = [
'ase.*',
'asyncssh.*',
'bpython.*',
'bs4.*',
'CifFile.*',
Expand Down Expand Up @@ -388,6 +391,7 @@ testpaths = [
'tests'
]
timeout = 240
timeout_method = "thread"
xfail_strict = true

[tool.ruff]
Expand Down
64 changes: 32 additions & 32 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
if dry_run:
workdir = Path(folder.abspath)
else:
remote_user = transport.whoami()
remote_user = await transport.whoami_async()
remote_working_directory = computer.get_workdir().format(username=remote_user)
if not remote_working_directory.strip():
raise exceptions.ConfigurationError(
Expand All @@ -114,13 +114,13 @@
)

# If it already exists, no exception is raised
if not transport.path_exists(remote_working_directory):
if not await transport.path_exists_async(remote_working_directory):
logger.debug(
f'[submission of calculation {node.pk}] Path '
f'{remote_working_directory} does not exist, trying to create it'
)
try:
transport.makedirs(remote_working_directory)
await transport.makedirs_async(remote_working_directory)

Check warning on line 123 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L123

Added line #L123 was not covered by tests
except EnvironmentError as exc:
raise exceptions.ConfigurationError(
f'[submission of calculation {node.pk}] '
Expand All @@ -133,14 +133,14 @@
# and I do not have to know the logic, but I just need to
# read the absolute path from the calculation properties.
workdir = Path(remote_working_directory).joinpath(calc_info.uuid[:2], calc_info.uuid[2:4])
transport.makedirs(str(workdir), ignore_existing=True)
await transport.makedirs_async(workdir, ignore_existing=True)

try:
# The final directory may already exist, most likely because this function was already executed once, but
# failed and as a result was rescheduled by the engine. In this case it would be fine to delete the folder
# and create it from scratch, except that we cannot be sure that this the actual case. Therefore, to err on
# the safe side, we move the folder to the lost+found directory before recreating the folder from scratch
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(workdir.joinpath(calc_info.uuid[4:]))
except OSError:
# Move the existing directory to lost+found, log a warning and create a clean directory anyway
path_existing = os.path.join(str(workdir), calc_info.uuid[4:])
Expand All @@ -151,12 +151,12 @@
)

# Make sure the lost+found directory exists, then copy the existing folder there and delete the original
transport.mkdir(path_lost_found, ignore_existing=True)
transport.copytree(path_existing, path_target)
transport.rmtree(path_existing)
await transport.mkdir_async(path_lost_found, ignore_existing=True)
await transport.copytree_async(path_existing, path_target)
await transport.rmtree_async(path_existing)

Check warning on line 156 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L154-L156

Added lines #L154 - L156 were not covered by tests

# Now we can create a clean folder for this calculation
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(workdir.joinpath(calc_info.uuid[4:]))

Check warning on line 159 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L159

Added line #L159 was not covered by tests
finally:
workdir = workdir.joinpath(calc_info.uuid[4:])

Expand All @@ -171,11 +171,11 @@
# Note: this will possibly overwrite files
for root, dirnames, filenames in code.base.repository.walk():
# mkdir of root
transport.makedirs(str(workdir.joinpath(root)), ignore_existing=True)
await transport.makedirs_async(workdir.joinpath(root), ignore_existing=True)

# remotely mkdir first
for dirname in dirnames:
transport.makedirs(str(workdir.joinpath(root, dirname)), ignore_existing=True)
await transport.makedirs_async(workdir.joinpath(root, dirname), ignore_existing=True)

# Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in
# combination with the new `Transport.put_object_from_filelike`
Expand All @@ -185,11 +185,11 @@
content = code.base.repository.get_object_content(Path(root) / filename, mode='rb')
handle.write(content)
handle.flush()
transport.put(handle.name, str(workdir.joinpath(root, filename)))
await transport.put_async(handle.name, workdir.joinpath(root, filename))
if code.filepath_executable.is_absolute():
transport.chmod(str(code.filepath_executable), 0o755) # rwxr-xr-x
await transport.chmod_async(code.filepath_executable, 0o755) # rwxr-xr-x

Check warning on line 190 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L190

Added line #L190 was not covered by tests
else:
transport.chmod(str(workdir.joinpath(code.filepath_executable)), 0o755) # rwxr-xr-x
await transport.chmod_async(workdir.joinpath(code.filepath_executable), 0o755) # rwxr-xr-x

# local_copy_list is a list of tuples, each with (uuid, dest_path, rel_path)
# NOTE: validation of these lists are done inside calculation.presubmit()
Expand Down Expand Up @@ -288,7 +288,7 @@
f'remotely, directly on the machine {computer.label}'
)
try:
transport.copy(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.copy_async(remote_abs_path, workdir.joinpath(dest_rel_path))
except FileNotFoundError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
Expand All @@ -314,8 +314,8 @@
)
remote_dirname = Path(dest_rel_path).parent
try:
transport.makedirs(str(workdir.joinpath(remote_dirname)), ignore_existing=True)
transport.symlink(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.makedirs_async(workdir.joinpath(remote_dirname), ignore_existing=True)
await transport.symlink_async(remote_abs_path, workdir.joinpath(dest_rel_path))
except OSError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to create remote symlink '
Expand Down Expand Up @@ -356,14 +356,14 @@
# The logic below takes care of an edge case where the source is a file but the target is a directory. In
# this case, the v2.5.1 implementation would raise an `IsADirectoryError` exception, because it would try
# to open the directory in the sandbox folder as a file when writing the contents.
if file_type_source == FileType.FILE and target and transport.isdir(str(workdir.joinpath(target))):
if file_type_source == FileType.FILE and target and await transport.isdir_async(workdir.joinpath(target)):
raise IsADirectoryError

# In case the source filename is specified and it is a directory that already exists in the remote, we
# want to avoid nested directories in the target path to replicate the behavior of v2.5.1. This is done by
# setting the target filename to '.', which means the contents of the node will be copied in the top level
# of the temporary directory, whose contents are then copied into the target directory.
if filename and transport.isdir(str(workdir.joinpath(filename))):
if filename and await transport.isdir_async(workdir.joinpath(filename)):
filename_target = '.'

filepath_target = (dirpath / filename_target).resolve().absolute()
Expand All @@ -372,25 +372,25 @@
if file_type_source == FileType.DIRECTORY:
# If the source object is a directory, we copy its entire contents
data_node.base.repository.copy_tree(filepath_target, filename_source)
transport.put(
await transport.put_async(
f'{dirpath}/*',
str(workdir.joinpath(target)) if target else str(workdir.joinpath('.')),
workdir.joinpath(target) if target else workdir.joinpath('.'),
overwrite=True,
)
else:
# Otherwise, simply copy the file
with filepath_target.open('wb') as handle:
with data_node.base.repository.open(filename_source, 'rb') as source:
shutil.copyfileobj(source, handle)
transport.makedirs(str(workdir.joinpath(Path(target).parent)), ignore_existing=True)
transport.put(str(filepath_target), str(workdir.joinpath(target)))
await transport.makedirs_async(workdir.joinpath(Path(target).parent), ignore_existing=True)
await transport.put_async(filepath_target, workdir.joinpath(target))


async def _copy_sandbox_files(logger, node, transport, folder, workdir: Path):
"""Copy the contents of the sandbox folder to the working directory."""
for filename in folder.get_content_list():
logger.debug(f'[submission of calculation {node.pk}] copying file/folder {filename}...')
transport.put(folder.get_abs_path(filename), str(workdir.joinpath(filename)))
await transport.put_async(folder.get_abs_path(filename), workdir.joinpath(filename))


def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | ExitCode:
Expand Down Expand Up @@ -461,7 +461,7 @@
for source_filename in source_list:
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in transport.glob(str(source_basepath / source_filename)):
for globbed_filename in await transport.glob_async(source_basepath / source_filename):

Check warning on line 464 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L464

Added line #L464 was not covered by tests
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
Expand All @@ -470,10 +470,10 @@
for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
transport.makedirs(str(target_dirname), ignore_existing=True)
await transport.makedirs_async(target_dirname, ignore_existing=True)

Check warning on line 473 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L473

Added line #L473 was not covered by tests

try:
transport.copy(str(source_filepath), str(target_filepath))
await transport.copy_async(source_filepath, target_filepath)

Check warning on line 476 in src/aiida/engine/daemon/execmanager.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/daemon/execmanager.py#L476

Added line #L476 was not covered by tests
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
Expand Down Expand Up @@ -612,7 +612,7 @@
upto what level of the original remotepath nesting the files will be copied.

:param transport: the Transport instance.
:param folder: an absolute path to a folder that contains the files to copy.
:param folder: an absolute path to a folder that contains the files to retrieve.
:param retrieve_list: the list of files to retrieve.
"""
workdir = Path(calculation.get_remote_workdir())
Expand All @@ -621,7 +621,7 @@
tmp_rname, tmp_lname, depth = item
# if there are more than one file I do something differently
if transport.has_magic(tmp_rname):
remote_names = transport.glob(str(workdir.joinpath(tmp_rname)))
remote_names = await transport.glob_async(workdir.joinpath(tmp_rname))
local_names = []
for rem in remote_names:
# get the relative path so to make local_names relative
Expand All @@ -644,7 +644,7 @@
abs_item = item if item.startswith('/') else str(workdir.joinpath(item))

if transport.has_magic(abs_item):
remote_names = transport.glob(abs_item)
remote_names = await transport.glob_async(abs_item)
local_names = [os.path.split(rem)[1] for rem in remote_names]
else:
remote_names = [abs_item]
Expand All @@ -656,6 +656,6 @@
if rem.startswith('/'):
to_get = rem
else:
to_get = str(workdir.joinpath(rem))
to_get = workdir.joinpath(rem)

transport.get(to_get, os.path.join(folder, loc), ignore_nonexisting=True)
await transport.get_async(to_get, os.path.join(folder, loc), ignore_nonexisting=True)
4 changes: 2 additions & 2 deletions src/aiida/orm/computers.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,12 @@ def get_transport(self, user: Optional['User'] = None) -> 'Transport':
"""Return a Transport class, configured with all correct parameters.
The Transport is closed (meaning that if you want to run any operation with
it, you have to open it first (i.e., e.g. for a SSH transport, you have
to open a connection). To do this you can call ``transports.open()``, or simply
to open a connection). To do this you can call ``transport.open()``, or simply
run within a ``with`` statement::
transport = Computer.get_transport()
with transport:
print(transports.whoami())
print(transport.whoami())
:param user: if None, try to obtain a transport for the default user.
Otherwise, pass a valid User.
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/data/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ def listdir_withattributes(self, path='.'):
"""Connects to the remote folder and lists the directory content.

:param relpath: If 'relpath' is specified, lists the content of the given subfolder.
:return: a list of dictionaries, where the documentation is in :py:class:Transport.listdir_withattributes.
:return: a list of dictionaries, where the documentation
is in :py:class:Transport.listdir_withattributes.
"""
authinfo = self.get_authinfo()

Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ def get_authinfo(self) -> 'AuthInfo':
def get_transport(self) -> 'Transport':
"""Return the transport for this calculation.

:return: `Transport` configured with the `AuthInfo` associated to the computer of this node
:return: Transport configured
with the `AuthInfo` associated to the computer of this node
"""
return self.get_authinfo().get_transport()

Expand Down
4 changes: 2 additions & 2 deletions src/aiida/plugins/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def TransportFactory(entry_point_name: str, load: Literal[False]) -> EntryPoint:


def TransportFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoint, Type['Transport']]:
"""Return the `Transport` sub class registered under the given entry point.
"""Return the Transport sub class registered under the given entry point.

:param entry_point_name: the entry point name.
:param load: if True, load the matched entry point and return the loaded resource instead of the entry point itself.
Expand All @@ -435,7 +435,7 @@ def TransportFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoi
if not load:
return entry_point

if isclass(entry_point) and issubclass(entry_point, Transport):
if isclass(entry_point) and (issubclass(entry_point, Transport)):
return entry_point

raise_invalid_type_error(entry_point_name, entry_point_group, valid_classes)
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _get_submit_command(self, submit_script):
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = f'bash {submit_script} > /dev/null 2>&1 & echo $!'
submit_command = f'(bash {submit_script} > /dev/null 2>&1 & echo $!) &'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is related, can you move it to another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Command execution from asyncssh library required this annotation, otherwise will not await it, therefore this change is related to this PR.
I've checked this change and it has no effect on expected behavior of command execution in paramiko, so everything is safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write some comments on why this change does not affect the behavior of bash?
If I have a scheduler that is not direct but still run the bash command will it conflict with asyncssh?

Copy link
Contributor

@agoscinski agoscinski Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I can see happen is that the printed PID with echo $! is printed after the next command because it is run concurrently due to this change. This would be critical, if we would rely on the printed PID, so we would parse the PID from the echo command but get instead a different output. But as far as I checked the schedulers, we do not rely on the printed PID but retrieve the pid using some long ps command (see _get_joblist_command).

The gist is I don't think it interferes.


self.logger.info(f'submitting with: {submit_command}')

Expand Down
2 changes: 2 additions & 0 deletions src/aiida/tools/pytest_fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
aiida_computer,
aiida_computer_local,
aiida_computer_ssh,
aiida_computer_ssh_async,
aiida_localhost,
ssh_key,
)
Expand All @@ -33,6 +34,7 @@
'aiida_computer',
'aiida_computer_local',
'aiida_computer_ssh',
'aiida_computer_ssh_async',
'aiida_config',
'aiida_config_factory',
'aiida_config_tmp',
Expand Down
32 changes: 32 additions & 0 deletions src/aiida/tools/pytest_fixtures/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,38 @@ def factory(label: str | None = None, configure: bool = True) -> 'Computer':
return factory


@pytest.fixture
def aiida_computer_ssh_async(aiida_computer) -> t.Callable[[], 'Computer']:
"""Factory to return a :class:`aiida.orm.computers.Computer` instance with ``core.ssh_async`` transport.

Usage::

def test(aiida_computer_ssh):
computer = aiida_computer_ssh(label='some-label', configure=True)
assert computer.transport_type == 'core.ssh_async'
assert computer.is_configured

The factory has the following signature:

:param label: The computer label. If not specified, a random UUID4 is used.
:param configure: Boolean, if ``True``, ensures the computer is configured, otherwise the computer is returned
as is. Note that if a computer with the given label already exists and it was configured before, the
computer will not be "un-"configured. If an unconfigured computer is absolutely required, make sure to first
delete the existing computer or specify another label.
:return: A stored computer instance.
"""

def factory(label: str | None = None, configure: bool = True) -> 'Computer':
computer = aiida_computer(label=label, hostname='localhost', transport_type='core.ssh_async')

if configure:
computer.configure()

return computer

return factory


@pytest.fixture
def aiida_localhost(aiida_computer_local) -> 'Computer':
"""Return a :class:`aiida.orm.computers.Computer` instance representing localhost with ``core.local`` transport.
Expand Down
3 changes: 3 additions & 0 deletions src/aiida/transports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
from .transport import *

__all__ = (
'AsyncTransport',
'BlockingTransport',
'SshTransport',
'Transport',
'TransportPath',
'convert_to_bool',
'parse_sshconfig',
)
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/transports/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def transport_options(transport_type):
"""Decorate a command with all options for a computer configure subcommand for transport_type."""

def apply_options(func):
"""Decorate the command functionn with the appropriate options for the transport type."""
"""Decorate the command function with the appropriate options for the transport type."""
options_list = list_transport_options(transport_type)
options_list.reverse()
func = arguments.COMPUTER(callback=partial(match_comp_transport, transport_type=transport_type))(func)
Expand Down
Loading
Loading