From 297d87f1ffc8cfaf763a6f7bf49276daaa4fc661 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Fri, 17 May 2024 12:28:17 +0200 Subject: [PATCH] Hack stream-to-packed --- src/aiida/cmdline/commands/cmd_profile.py | 49 +++++++++++++++++++ src/aiida/repository/backend/abstract.py | 2 +- .../repository/backend/disk_object_store.py | 11 +++++ src/aiida/tools/archive/imports.py | 28 ++++++++++- 4 files changed, 88 insertions(+), 2 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_profile.py b/src/aiida/cmdline/commands/cmd_profile.py index 0d22b9025b..62685a2613 100644 --- a/src/aiida/cmdline/commands/cmd_profile.py +++ b/src/aiida/cmdline/commands/cmd_profile.py @@ -197,3 +197,52 @@ def profile_delete(force, delete_data, profiles): get_config().delete_profile(profile.name, delete_storage=delete_data) echo.echo_success(f'Profile `{profile.name}` was deleted.') + +@verdi_profile.command('flush') +@arguments.PROFILES(required=True) +@options.FORCE(help='Skip any prompts for confirmation.') +@click.option( + '--dry-run', + is_flag=True, + help='Run the maintenance in dry-run mode to print actions that would be taken without actually executing them.', +) +def profile_flush(force, profiles, dry_run): + """Delete data of one or more profiles. + + The PROFILES argument takes one or multiple profile names of which the storage will be deleted. + """ + + from aiida.common.exceptions import LockingProfileError + from aiida.manage.manager import get_manager + from aiida.orm import Group, Node, QueryBuilder + from aiida.plugins import StorageFactory + from aiida.tools import delete_nodes + + manager = get_manager() + storage = manager.get_profile_storage() + + for profile in profiles: + if not force: + echo.echo_warning('This operation cannot be undone, are you sure you want to continue?', nl=False) + + if not force and not click.confirm(''): + echo.echo_report(f'Deleting of `{profile.name}` cancelled.') + continue + + else: + + # Delete nodes + qb = QueryBuilder() + qb.append(Node) + nodes = qb.all() + node_ids = [node[0].pk for node in nodes] + delete_nodes(node_ids, dry_run=dry_run) + + # Delete groups + groups = Group.collection.all() + for group in groups: + Group.collection.delete(group.pk) + + storage.maintain(full=True, dry_run=False, compress=dry_run) + + # Users and Computers? diff --git a/src/aiida/repository/backend/abstract.py b/src/aiida/repository/backend/abstract.py index 7d0c554269..96abcea382 100644 --- a/src/aiida/repository/backend/abstract.py +++ b/src/aiida/repository/backend/abstract.py @@ -80,7 +80,7 @@ def put_object_from_filelike(self, handle: BinaryIO) -> str: and not self.is_readable_byte_stream(handle) ): raise TypeError(f'handle does not seem to be a byte stream: {type(handle)}.') - return self._put_object_from_filelike(handle) + return self._put_object_from_filelike_packed(handle) @abc.abstractmethod def _put_object_from_filelike(self, handle: BinaryIO) -> str: diff --git a/src/aiida/repository/backend/disk_object_store.py b/src/aiida/repository/backend/disk_object_store.py index b8928be575..1ef6cf4b47 100644 --- a/src/aiida/repository/backend/disk_object_store.py +++ b/src/aiida/repository/backend/disk_object_store.py @@ -87,9 +87,20 @@ def _put_object_from_filelike(self, handle: t.BinaryIO) -> str: :return: the generated fully qualified identifier for the object within the repository. :raises TypeError: if the handle is not a byte stream. """ + raise SystemExit("using normal from_filelike") with self._container as container: return container.add_streamed_object(handle) + def _put_object_from_filelike_packed(self, handle: t.BinaryIO) -> str: + """Store the byte contents of a file in the repository. + + :param handle: filelike object with the byte content to be stored. + :return: the generated fully qualified identifier for the object within the repository. + :raises TypeError: if the handle is not a byte stream. + """ + with self._container as container: + return container.add_streamed_object_to_pack(handle) + def has_objects(self, keys: t.List[str]) -> t.List[bool]: with self._container as container: return container.has_objects(keys) diff --git a/src/aiida/tools/archive/imports.py b/src/aiida/tools/archive/imports.py index bdc47519f0..65b1cfd885 100644 --- a/src/aiida/tools/archive/imports.py +++ b/src/aiida/tools/archive/imports.py @@ -195,13 +195,17 @@ def import_archive( ) new_repo_keys = _get_new_object_keys(archive_format.key_format, backend_from, backend, query_params) + # IMPORT_LOGGER.report('HELLO') if test_run: # exit before we write anything to the database or repository raise ImportTestRun('test run complete') # now the transaction has been successfully populated, but not committed, we add the repository files # if the commit fails, this is not so much an issue, since the files can be removed on repo maintenance - _add_files_to_repo(backend_from, backend, new_repo_keys) + # _add_files_to_repo(backend_from, backend, new_repo_keys) + IMPORT_LOGGER.report('Using: _add_files_to_repo_packed') + + _add_files_to_repo_packed(backend_from, backend, new_repo_keys) IMPORT_LOGGER.report('Committing transaction to database...') @@ -1183,3 +1187,25 @@ def _add_files_to_repo(backend_from: StorageBackend, backend_to: StorageBackend, f'Archive repository key is different to backend key: {key!r} != {backend_key!r}' ) progress.update() + +# This is probably not having any effect here, instead, I defined _put_object_from_filelike_packed in +# AbstractRepositoryBackend +def _add_files_to_repo_packed(backend_from: StorageBackend, backend_to: StorageBackend, new_keys: Set[str]) -> None: + """Add the new files to the repository.""" + if not new_keys: + raise Exception("No new keys.") + return None + + repository_to = backend_to.get_repository() + repository_from = backend_from.get_repository() + # import sys + # raise Exception(f"from: {type(repository_from)} | to: {type(repository_to)}") + # E.g. from: ZipfileBackendRepository to DiskObjectStoreRepositoryBackend + with get_progress_reporter()(desc='Adding archive files to repository', total=len(new_keys)) as progress: + for key, handle in repository_from.iter_object_streams(new_keys): # type: ignore[arg-type] + backend_key = repository_to.put_object_from_filelike(handle) + if backend_key != key: + raise ImportValidationError( + f'Archive repository key is different to backend key: {key!r} != {backend_key!r}' + ) + progress.update()