Skip to content

Commit

Permalink
First trial implementation of streaming to packed.
Browse files Browse the repository at this point in the history
  • Loading branch information
GeigerJ2 committed May 24, 2024
1 parent e952d77 commit 5a2aea5
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 2 deletions.
10 changes: 9 additions & 1 deletion src/aiida/cmdline/commands/cmd_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ class ExtrasImportCode(Enum):
is_flag=True,
help='Determine entities to import, but do not actually import them. Deprecated, please use `--dry-run` instead.',
)
@click.option(
'--packed',
is_flag=True,
default=False,
help='Stream repository files directly to `packed`, rather than `loose`. Might provide speedup for large archives.',
)
@options.DRY_RUN(help='Determine entities to import, but do not actually import them.')
@decorators.with_dbenv()
@click.pass_context
Expand All @@ -375,6 +381,7 @@ def import_archive(
group,
test_run,
dry_run,
packed,
):
"""Import archived data to a profile.
Expand Down Expand Up @@ -407,6 +414,7 @@ def import_archive(
'create_group': import_group,
'group': group,
'test_run': dry_run,
'packed': packed,
}

for archive, web_based in all_archives:
Expand Down Expand Up @@ -466,7 +474,7 @@ def _gather_imports(archives, webpages) -> List[Tuple[str, bool]]:


def _import_archive_and_migrate(
ctx: click.Context, archive: str, web_based: bool, import_kwargs: dict, try_migration: bool
ctx: click.Context, archive: str, web_based: bool, import_kwargs: dict, try_migration: bool, packed: bool = False
):
"""Perform the archive import.
Expand Down
41 changes: 41 additions & 0 deletions src/aiida/cmdline/commands/cmd_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,44 @@ def profile_delete(force, delete_data, profiles):

get_config().delete_profile(profile.name, delete_storage=delete_data)
echo.echo_success(f'Profile `{profile.name}` was deleted.')

@verdi_profile.command('flush')
@arguments.PROFILE(required=True)
@options.FORCE(help='Skip any prompts for confirmation.')
def profile_flush(force, profile):
"""Delete data of one or more profiles.
The PROFILES argument takes one or multiple profile names of which the storage will be deleted.
"""

from aiida.manage.manager import get_manager
from aiida.orm import Group, Node, QueryBuilder
from aiida.tools import delete_nodes

manager = get_manager()
storage = manager.get_profile_storage()

if not force:
echo.echo_warning('This operation cannot be undone, are you sure you want to continue?', nl=False)

if not force and not click.confirm(''):
echo.echo_report(f'Deleting of `{profile.name}` cancelled.')

else:

# Delete nodes
qb = QueryBuilder()
qb.append(Node)
nodes = qb.all()
node_ids = [node[0].pk for node in nodes]
delete_nodes(node_ids, dry_run=False)

# Delete groups
groups = Group.collection.all()
for group in groups:
Group.collection.delete(group.pk)

# Possibly further cleanup
storage.maintain(full=True, dry_run=False)

# Users and Computers?
18 changes: 18 additions & 0 deletions src/aiida/repository/backend/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ def put_object_from_filelike(self, handle: BinaryIO) -> str:
def _put_object_from_filelike(self, handle: BinaryIO) -> str:
pass

def put_objects_from_filelike_packed(self, handle_list) -> str:
"""Store the byte contents of a file in the repository.
:param handle: filelike object with the byte content to be stored.
:return: the generated fully qualified identifier for the object within the repository.
:raises TypeError: if the handle is not a byte stream.
"""
# if (
# not isinstance(handle, io.BufferedIOBase) # type: ignore[redundant-expr,unreachable]
# and not self.is_readable_byte_stream(handle)
# ):
# raise TypeError(f'handle does not seem to be a byte stream: {type(handle)}.')
return self._put_objects_from_filelike_packed(handle_list)

# @abc.abstractmethod
# def _put_objects_from_filelike_packed(self, handle_list: list) -> str:
# pass

def put_object_from_file(self, filepath: Union[str, pathlib.Path]) -> str:
"""Store a new object with contents of the file located at `filepath` on this file system.
Expand Down
9 changes: 9 additions & 0 deletions src/aiida/repository/backend/disk_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def _put_object_from_filelike(self, handle: t.BinaryIO) -> str:
with self._container as container:
return container.add_streamed_object(handle)

def _put_objects_from_filelike_packed(self, handle_list) -> str:
"""Store the byte contents of a list of files in the repository.
"""
with self._container as container:
return container.add_streamed_objects_to_pack(
stream_list=handle_list,
)

def has_objects(self, keys: t.List[str]) -> t.List[bool]:
with self._container as container:
return container.has_objects(keys)
Expand Down
30 changes: 29 additions & 1 deletion src/aiida/tools/archive/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def import_archive(
group: Optional[orm.Group] = None,
test_run: bool = False,
backend: Optional[StorageBackend] = None,
packed: Optional[bool] = False
) -> Optional[int]:
"""Import an archive into the AiiDA backend.
Expand Down Expand Up @@ -201,7 +202,12 @@ def import_archive(

# now the transaction has been successfully populated, but not committed, we add the repository files
# if the commit fails, this is not so much an issue, since the files can be removed on repo maintenance
_add_files_to_repo(backend_from, backend, new_repo_keys)
if packed:
IMPORT_LOGGER.report('Adding repository files to `packed`')
_add_files_to_repo_packed(backend_from, backend, new_repo_keys)
else:
IMPORT_LOGGER.report('Adding repository files to `loose`')
_add_files_to_repo(backend_from, backend, new_repo_keys)

IMPORT_LOGGER.report('Committing transaction to database...')

Expand Down Expand Up @@ -1188,3 +1194,25 @@ def _add_files_to_repo(backend_from: StorageBackend, backend_to: StorageBackend,
f'Archive repository key is different to backend key: {key!r} != {backend_key!r}'
)
progress.update()


# This is probably not having any effect here, instead, I defined _put_object_from_filelike_packed in
# AbstractRepositoryBackend
def _add_files_to_repo_packed(backend_from: StorageBackend, backend_to: StorageBackend, new_keys: Set[str]) -> None:
"""Add the new files to the repository."""
if not new_keys:
return None

repository_to = backend_to.get_repository()
repository_from = backend_from.get_repository()
# print('repository_from', repository_from)
# print('repository_to', repository_to)

with get_progress_reporter()(desc='Adding archive files to repository', total=len(new_keys)) as progress:

from io import BytesIO

from_hashes = list(repository_from.list_objects())
from_bytes_io_list = [BytesIO(repository_from.get_object_content(from_hash)) for from_hash in from_hashes]

backend_key = repository_to.put_objects_from_filelike_packed(from_bytes_io_list)

0 comments on commit 5a2aea5

Please sign in to comment.