Skip to content

Commit

Permalink
Add compress /extract endpoints and add recursive ls (#100)
Browse files Browse the repository at this point in the history
* Add new endpoint to the client

* Small fixes

* Add version for recursive

* Add unittests
  • Loading branch information
ekouts authored Apr 29, 2024
1 parent efb8044 commit 06c7be2
Show file tree
Hide file tree
Showing 5 changed files with 630 additions and 5 deletions.
153 changes: 150 additions & 3 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import time

from contextlib import nullcontext
from typing import Any, ContextManager, Optional, overload, Sequence, Tuple, List
from typing import Any, ContextManager, Optional, overload, Sequence, List
from requests.compat import json # type: ignore
from packaging.version import Version, parse

import firecrest.FirecrestException as fe
import firecrest.types as t
from firecrest.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload
from firecrest.utilities import time_block


if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -633,19 +632,26 @@ async def filesystems(self, system_name: Optional[str] = None) -> dict[str, List

# Utilities
async def list_files(
self, machine: str, target_path: str, show_hidden: bool = False
self, machine: str, target_path: str, show_hidden: bool = False,
recursive: bool = False
) -> List[t.LsFile]:
"""Returns a list of files in a directory.
:param machine: the machine name where the filesystem belongs to
:param target_path: the absolute target path
:param show_hidden: show hidden files
:param recursive: recursively list directories encountered
:calls: GET `/utilities/ls`
.. warning:: The argument ``recursive`` is available only for FirecREST>=1.16.0
"""
params: dict[str, Any] = {"targetPath": f"{target_path}"}
if show_hidden is True:
params["showhidden"] = show_hidden

if recursive is True:
params["recursive"] = recursive

resp = await self._get_request(
endpoint="/utilities/ls",
additional_headers={"X-Machine-Name": machine},
Expand Down Expand Up @@ -758,6 +764,52 @@ async def copy(self, machine: str, source_path: str, target_path: str) -> str:
self._json_response([resp], 201)
return target_path

async def compress(self, machine: str, source_path: str, target_path: str) -> str:
"""Compress files using gzip compression.
You can name the output file as you like, but typically these files have a .tar.gz extension.
When successful, the method returns a string with the path of the newly created file.
:param machine: the machine name where the filesystem belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:calls: POST `/utilities/compress`
.. warning:: This is available only for FirecREST>=1.16.0
"""
resp = await self._post_request(
endpoint="/utilities/compress",
additional_headers={"X-Machine-Name": machine},
data={"targetPath": target_path, "sourcePath": source_path},
)
self._json_response([resp], 201)
return target_path

async def extract(self, machine: str, source_path: str, target_path: str, extension: str = "auto") -> str:
"""Extract files.
If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath.
Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`.
When successful, the method returns a string with the path of the newly created file.
:param machine: the machine name where the filesystem belongs to
:param source_path: the absolute path of the file to be extracted
:param target_path: the absolute target path where the `source_path` is extracted
:param file_extension: possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`
:calls: POST `/utilities/extract`
.. warning:: This is available only for FirecREST>=1.16.0
"""
resp = await self._post_request(
endpoint="/utilities/extract",
additional_headers={"X-Machine-Name": machine},
data={
"targetPath": target_path,
"sourcePath": source_path,
"extension": extension
},
)
self._json_response([resp], 201)
return target_path

async def file_type(self, machine: str, target_path: str) -> str:
"""Uses the `file` linux application to determine the type of a file.
Expand Down Expand Up @@ -1364,6 +1416,7 @@ async def _internal_transfer(
stage_out_job_id,
account,
ret_response,
extension=None,
):
data = {"targetPath": target_path}
if source_path:
Expand All @@ -1381,6 +1434,9 @@ async def _internal_transfer(
if account:
data["account"] = account

if extension:
data["extension"] = extension

resp = await self._post_request(
endpoint=endpoint, additional_headers={"X-Machine-Name": machine}, data=data
)
Expand Down Expand Up @@ -1473,6 +1529,97 @@ async def submit_copy_job(
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")

async def submit_compress_job(
self,
machine: str,
source_path: str,
target_path: str,
job_name: Optional[str] = None,
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
"""Compress files using gzip compression.
You can name the output file as you like, but typically these files have a .tar.gz extension.
Possible to stage-out jobs providing the SLURM Id of a production job.
:param machine: the machine name where the scheduler belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:param job_name: job name
:param time: limit on the total run time of the job. Acceptable time formats 'minutes', 'minutes:seconds', 'hours:minutes:seconds', 'days-hours', 'days-hours:minutes' and 'days-hours:minutes:seconds'.
:param stage_out_job_id: transfer data after job with ID {stage_out_job_id} is completed
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/compress`
GET `/tasks/{taskid}`
.. warning:: This is available only for FirecREST>=1.16.0
"""
resp: List[requests.Response] = []
endpoint = "/storage/xfer-internal/compress"
json_response = await self._internal_transfer(
endpoint,
machine,
source_path,
target_path,
job_name,
time,
stage_out_job_id,
account,
resp,
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")

async def submit_extract_job(
self,
machine: str,
source_path: str,
target_path: str,
extension: str = "auto",
job_name: Optional[str] = None,
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
"""Extract files.
If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath.
Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`.
Possible to stage-out jobs providing the SLURM Id of a production job.
:param machine: the machine name where the scheduler belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:param extension: file extension, possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`
:param job_name: job name
:param time: limit on the total run time of the job. Acceptable time formats 'minutes', 'minutes:seconds', 'hours:minutes:seconds', 'days-hours', 'days-hours:minutes' and 'days-hours:minutes:seconds'.
:param stage_out_job_id: transfer data after job with ID {stage_out_job_id} is completed
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/extract`
GET `/tasks/{taskid}`
.. warning:: This is available only for FirecREST>=1.16.0
"""
resp: List[requests.Response] = []
endpoint = "/storage/xfer-internal/extract"
json_response = await self._internal_transfer(
endpoint,
machine,
source_path,
target_path,
job_name,
time,
stage_out_job_id,
account,
resp,
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")

async def submit_rsync_job(
self,
machine: str,
Expand Down
Loading

0 comments on commit 06c7be2

Please sign in to comment.