Skip to content

Commit

Permalink
Extend compress and extract functions to handle large directories (#109)
Browse files Browse the repository at this point in the history
* wip

* wip

* Add timers in async client

* Stash

* Don't merge reqs by default

* Bring back the old merge_request

* Fix internal dictionaries keys

* Move decorator to wrapper GET function

* Fix tasks endpoint in reference

* Extend compress func in basic client

* Extend compress func in basic client

* Extend compress and extract func in async client

* Fix typo

* Use exponential backoff for polling

* Add docs for `fail_on_timeout`

* Update docs text and make timeout string a global var

* Add comment in the code to explain the `fail_on_timeout` logic
  • Loading branch information
ekouts authored Jun 19, 2024
1 parent 4537886 commit 0cfdc3e
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 43 deletions.
172 changes: 155 additions & 17 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
import httpx
from io import BytesIO
import itertools
import jwt
import logging
import os
Expand All @@ -27,7 +28,7 @@
import firecrest.FirecrestException as fe
import firecrest.types as t
from firecrest.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload
from firecrest.utilities import time_block
from firecrest.utilities import time_block, slurm_state_completed


if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -91,6 +92,7 @@ class AsyncFirecrest:
"""

TOO_MANY_REQUESTS_CODE = 429
TIMEOUT_STR = "Command has finished with timeout signal"

def _retry_requests(func):
async def wrapper(*args, **kwargs):
Expand Down Expand Up @@ -826,14 +828,23 @@ async def copy(self, machine: str, source_path: str, target_path: str) -> str:
self._json_response([resp], 201)
return target_path

async def compress(self, machine: str, source_path: str, target_path: str) -> str:
async def compress(
self,
machine: str,
source_path: str,
target_path: str,
fail_on_timeout: bool = True
) -> str:
"""Compress files using gzip compression.
You can name the output file as you like, but typically these files have a .tar.gz extension.
When successful, the method returns a string with the path of the newly created file.
:param machine: the machine name where the filesystem belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:param dereference: follow symbolic links
:param fail_on_timeout: if `True` on timeout, this method will raise an
exception and won't fall back to submitting a long running job
:calls: POST `/utilities/compress`
.. warning:: This is available only for FirecREST>=1.16.0
Expand All @@ -843,10 +854,75 @@ async def compress(self, machine: str, source_path: str, target_path: str) -> st
additional_headers={"X-Machine-Name": machine},
data={"targetPath": target_path, "sourcePath": source_path},
)
self._json_response([resp], 201)
# - If the response is 201, the request was successful so we can
# return the target path
# - If `fail_on_timeout==True` we let `_json_response` take care of
# possible errors by raising an exception
# - If the response is 400 and the error message is the timeout
# message, we will submit a job to compress the file
if (
resp.status_code == 201 or
fail_on_timeout or
resp.status_code != 400 or
resp.json().get('error', '') != self.TIMEOUT_STR
):
self._json_response([resp], 201)
else:
logger.debug(
f"Compression of {source_path} to {target_path} has finished "
f"with timeout signal. Will submit a job to compress the "
f"file."
)
job_info = await self.submit_compress_job(
machine,
source_path,
target_path
)
jobid = job_info['jobid']
active_jobs = await self.poll_active(
machine,
[jobid]
)
intervals = (2**i for i in itertools.count(start=0))
while (
active_jobs and
not slurm_state_completed(active_jobs[0]['state'])
):
await asyncio.sleep(next(intervals))
active_jobs = await self.poll_active(
machine,
[jobid]
)

if (
active_jobs and
active_jobs[0]['state'] != 'COMPLETED'
):
raise Exception(
f"compression job (jobid={jobid}) finished with "
f"state {active_jobs[0]['state']}"
)

err_output = await self.head(
machine,
job_info['job_file_err']
)
if (err_output != ''):
raise Exception(
f"compression job (jobid={jobid}) has failed: "
f"{err_output}"
)

return target_path

async def extract(self, machine: str, source_path: str, target_path: str, extension: str = "auto") -> str:
async def extract(
self,
machine: str,
source_path: str,
target_path: str,
extension: str = "auto",
fail_on_timeout: bool = True
) -> str:
"""Extract files.
If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath.
Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`.
Expand All @@ -856,6 +932,8 @@ async def extract(self, machine: str, source_path: str, target_path: str, extens
:param source_path: the absolute path of the file to be extracted
:param target_path: the absolute target path where the `source_path` is extracted
:param file_extension: possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`
:param fail_on_timeout: if `True` on timeout, this method will raise an
exception and won't fall back to submitting a long running job
:calls: POST `/utilities/extract`
.. warning:: This is available only for FirecREST>=1.16.0
Expand All @@ -869,6 +947,66 @@ async def extract(self, machine: str, source_path: str, target_path: str, extens
"extension": extension
},
)
# - If the response is 201, the request was successful so we can
# return the target path
# - If `fail_on_timeout==True` we let `_json_response` take care of
# possible errors by raising an exception
# - If the response is 400 and the error message is the timeout
# message, we will submit a job to compress the file
if (
resp.status_code == 201 or
fail_on_timeout or
resp.status_code != 400 or
resp.json().get('error', '') != self.TIMEOUT_STR
):
self._json_response([resp], 201)
else:
logger.debug(
f"Extraction of {source_path} to {target_path} has finished "
f"with timeout signal. Will submit a job to extract the "
f"file."
)

job_info = await self.submit_extract_job(
machine,
source_path,
target_path,
extension
)
jobid = job_info['jobid']
active_jobs = await self.poll_active(
machine,
[jobid]
)
intervals = (2**i for i in itertools.count(start=0))
while (
active_jobs and
not slurm_state_completed(active_jobs[0]['state'])
):
await asyncio.sleep(next(intervals))
active_jobs = await self.poll_active(
machine,
[jobid]
)

if (
active_jobs and
active_jobs[0]['state'] != 'COMPLETED'
):
raise Exception(
f"extract job (jobid={jobid}) finished with"
f"state {active_jobs[0]['state']}"
)

err_output = await self.head(
machine,
job_info['job_file_err']
)
if (err_output != ''):
raise Exception(
f"extract job has failed: {err_output}"
)

self._json_response([resp], 201)
return target_path

Expand Down Expand Up @@ -1169,7 +1307,7 @@ async def submit(
:param env_vars: dictionary (varName, value) defining environment variables to be exported for the job
:calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path`
GET `/tasks/{taskid}`
GET `/tasks`
"""
if [
script_str is None,
Expand Down Expand Up @@ -1276,7 +1414,7 @@ async def poll(
:param page_number: page number (if set to `None` the default value is 0)
:calls: GET `/compute/acct`
GET `/tasks/{taskid}`
GET `/tasks`
"""
jobids = [str(j) for j in jobs] if jobs else []
params = {}
Expand Down Expand Up @@ -1330,7 +1468,7 @@ async def poll_active(
:param page_number: page number (if set to `None` the default value is 0)
:calls: GET `/compute/jobs`
GET `/tasks/{taskid}`
GET `/tasks`
"""
jobs = jobs if jobs else []
jobids = {str(j) for j in jobs}
Expand Down Expand Up @@ -1372,7 +1510,7 @@ async def nodes(
:param nodes: specific compute nodes to query
:calls: GET `/compute/nodes`
GET `/tasks/{taskid}`
GET `/tasks`
.. warning:: This is available only for FirecREST>=1.16.0
"""
Expand Down Expand Up @@ -1402,7 +1540,7 @@ async def partitions(
:param partitions: specific partitions nodes to query
:calls: GET `/compute/partitions`
GET `/tasks/{taskid}`
GET `/tasks`
.. warning:: This is available only for FirecREST>=1.16.0
"""
Expand Down Expand Up @@ -1430,7 +1568,7 @@ async def reservations(
:param machine: the machine name where the scheduler belongs to
:param reservations: specific reservations to query
:calls: GET `/compute/reservations`
GET `/tasks/{taskid}`
GET `/tasks`
.. warning:: This is available only for FirecREST>=1.16.0
"""
params = {}
Expand All @@ -1455,7 +1593,7 @@ async def cancel(self, machine: str, job_id: str | int) -> str:
:param job_id: the ID of the job
:calls: DELETE `/compute/jobs/{job_id}`
GET `/tasks/{taskid}`
GET `/tasks`
"""
resp = await self._delete_request(
endpoint=f"/compute/jobs/{job_id}",
Expand Down Expand Up @@ -1529,7 +1667,7 @@ async def submit_move_job(
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/mv`
GET `/tasks/{taskid}`
GET `/tasks`
"""
resp: List[requests.Response] = []
endpoint = "/storage/xfer-internal/mv"
Expand Down Expand Up @@ -1572,7 +1710,7 @@ async def submit_copy_job(
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/cp`
GET `/tasks/{taskid}`
GET `/tasks`
"""
resp: List[requests.Response] = []
endpoint = "/storage/xfer-internal/cp"
Expand Down Expand Up @@ -1614,7 +1752,7 @@ async def submit_compress_job(
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/compress`
GET `/tasks/{taskid}`
GET `/tasks`
.. warning:: This is available only for FirecREST>=1.16.0
"""
Expand Down Expand Up @@ -1661,7 +1799,7 @@ async def submit_extract_job(
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/extract`
GET `/tasks/{taskid}`
GET `/tasks`
.. warning:: This is available only for FirecREST>=1.16.0
"""
Expand Down Expand Up @@ -1706,7 +1844,7 @@ async def submit_rsync_job(
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/rsync`
GET `/tasks/{taskid}`
GET `/tasks`
"""
resp: List[requests.Response] = []
endpoint = "/storage/xfer-internal/rsync"
Expand Down Expand Up @@ -1747,7 +1885,7 @@ async def submit_delete_job(
:param account: name of the bank account to be used in SLURM. If not set, system default is taken.
:calls: POST `/storage/xfer-internal/rm`
GET `/tasks/{taskid}`
GET `/tasks`
"""
resp: List[requests.Response] = []
endpoint = "/storage/xfer-internal/rm"
Expand Down
Loading

0 comments on commit 0cfdc3e

Please sign in to comment.