From 0cfdc3e6acc803620b46b03ad3ce22a14625623d Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 19 Jun 2024 10:15:43 +0300 Subject: [PATCH] Extend compress and extract functions to handle large directories (#109) * wip * wip * Add timers in async client * Stash * Don't merge reqs by default * Bring back the old merge_request * Fix internal dictionaries keys * Move decorator to wrapper GET function * Fix tasks endpoint in reference * Extend compress func in basic client * Extend compress func in basic client * Extend compress and extract func in async client * Fix typo * Use exponential backoff for polling * Add docs for `fail_on_timeout` * Update docs text and make timeout string a global var * Add comment in the code to explain the `fail_on_timeout` logic --- firecrest/AsyncClient.py | 172 +++++++++++++++++++++++++++++++---- firecrest/BasicClient.py | 187 +++++++++++++++++++++++++++++++++----- firecrest/cli/__init__.py | 4 +- firecrest/utilities.py | 19 ++++ 4 files changed, 339 insertions(+), 43 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index f5c2615..2928c0a 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -9,6 +9,7 @@ import asyncio import httpx from io import BytesIO +import itertools import jwt import logging import os @@ -27,7 +28,7 @@ import firecrest.FirecrestException as fe import firecrest.types as t from firecrest.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload -from firecrest.utilities import time_block +from firecrest.utilities import time_block, slurm_state_completed if sys.version_info >= (3, 8): @@ -91,6 +92,7 @@ class AsyncFirecrest: """ TOO_MANY_REQUESTS_CODE = 429 + TIMEOUT_STR = "Command has finished with timeout signal" def _retry_requests(func): async def wrapper(*args, **kwargs): @@ -826,7 +828,13 @@ async def copy(self, machine: str, source_path: str, target_path: str) -> str: self._json_response([resp], 201) return target_path - async def compress(self, machine: str, source_path: str, target_path: str) -> str: + async def compress( + self, + machine: str, + source_path: str, + target_path: str, + fail_on_timeout: bool = True + ) -> str: """Compress files using gzip compression. You can name the output file as you like, but typically these files have a .tar.gz extension. When successful, the method returns a string with the path of the newly created file. @@ -834,6 +842,9 @@ async def compress(self, machine: str, source_path: str, target_path: str) -> st :param machine: the machine name where the filesystem belongs to :param source_path: the absolute source path :param target_path: the absolute target path + :param dereference: follow symbolic links + :param fail_on_timeout: if `True` on timeout, this method will raise an + exception and won't fall back to submitting a long running job :calls: POST `/utilities/compress` .. warning:: This is available only for FirecREST>=1.16.0 @@ -843,10 +854,75 @@ async def compress(self, machine: str, source_path: str, target_path: str) -> st additional_headers={"X-Machine-Name": machine}, data={"targetPath": target_path, "sourcePath": source_path}, ) - self._json_response([resp], 201) + # - If the response is 201, the request was successful so we can + # return the target path + # - If `fail_on_timeout==True` we let `_json_response` take care of + # possible errors by raising an exception + # - If the response is 400 and the error message is the timeout + # message, we will submit a job to compress the file + if ( + resp.status_code == 201 or + fail_on_timeout or + resp.status_code != 400 or + resp.json().get('error', '') != self.TIMEOUT_STR + ): + self._json_response([resp], 201) + else: + logger.debug( + f"Compression of {source_path} to {target_path} has finished " + f"with timeout signal. Will submit a job to compress the " + f"file." + ) + job_info = await self.submit_compress_job( + machine, + source_path, + target_path + ) + jobid = job_info['jobid'] + active_jobs = await self.poll_active( + machine, + [jobid] + ) + intervals = (2**i for i in itertools.count(start=0)) + while ( + active_jobs and + not slurm_state_completed(active_jobs[0]['state']) + ): + await asyncio.sleep(next(intervals)) + active_jobs = await self.poll_active( + machine, + [jobid] + ) + + if ( + active_jobs and + active_jobs[0]['state'] != 'COMPLETED' + ): + raise Exception( + f"compression job (jobid={jobid}) finished with " + f"state {active_jobs[0]['state']}" + ) + + err_output = await self.head( + machine, + job_info['job_file_err'] + ) + if (err_output != ''): + raise Exception( + f"compression job (jobid={jobid}) has failed: " + f"{err_output}" + ) + return target_path - async def extract(self, machine: str, source_path: str, target_path: str, extension: str = "auto") -> str: + async def extract( + self, + machine: str, + source_path: str, + target_path: str, + extension: str = "auto", + fail_on_timeout: bool = True + ) -> str: """Extract files. If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath. Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`. @@ -856,6 +932,8 @@ async def extract(self, machine: str, source_path: str, target_path: str, extens :param source_path: the absolute path of the file to be extracted :param target_path: the absolute target path where the `source_path` is extracted :param file_extension: possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2` + :param fail_on_timeout: if `True` on timeout, this method will raise an + exception and won't fall back to submitting a long running job :calls: POST `/utilities/extract` .. warning:: This is available only for FirecREST>=1.16.0 @@ -869,6 +947,66 @@ async def extract(self, machine: str, source_path: str, target_path: str, extens "extension": extension }, ) + # - If the response is 201, the request was successful so we can + # return the target path + # - If `fail_on_timeout==True` we let `_json_response` take care of + # possible errors by raising an exception + # - If the response is 400 and the error message is the timeout + # message, we will submit a job to compress the file + if ( + resp.status_code == 201 or + fail_on_timeout or + resp.status_code != 400 or + resp.json().get('error', '') != self.TIMEOUT_STR + ): + self._json_response([resp], 201) + else: + logger.debug( + f"Extraction of {source_path} to {target_path} has finished " + f"with timeout signal. Will submit a job to extract the " + f"file." + ) + + job_info = await self.submit_extract_job( + machine, + source_path, + target_path, + extension + ) + jobid = job_info['jobid'] + active_jobs = await self.poll_active( + machine, + [jobid] + ) + intervals = (2**i for i in itertools.count(start=0)) + while ( + active_jobs and + not slurm_state_completed(active_jobs[0]['state']) + ): + await asyncio.sleep(next(intervals)) + active_jobs = await self.poll_active( + machine, + [jobid] + ) + + if ( + active_jobs and + active_jobs[0]['state'] != 'COMPLETED' + ): + raise Exception( + f"extract job (jobid={jobid}) finished with" + f"state {active_jobs[0]['state']}" + ) + + err_output = await self.head( + machine, + job_info['job_file_err'] + ) + if (err_output != ''): + raise Exception( + f"extract job has failed: {err_output}" + ) + self._json_response([resp], 201) return target_path @@ -1169,7 +1307,7 @@ async def submit( :param env_vars: dictionary (varName, value) defining environment variables to be exported for the job :calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path` - GET `/tasks/{taskid}` + GET `/tasks` """ if [ script_str is None, @@ -1276,7 +1414,7 @@ async def poll( :param page_number: page number (if set to `None` the default value is 0) :calls: GET `/compute/acct` - GET `/tasks/{taskid}` + GET `/tasks` """ jobids = [str(j) for j in jobs] if jobs else [] params = {} @@ -1330,7 +1468,7 @@ async def poll_active( :param page_number: page number (if set to `None` the default value is 0) :calls: GET `/compute/jobs` - GET `/tasks/{taskid}` + GET `/tasks` """ jobs = jobs if jobs else [] jobids = {str(j) for j in jobs} @@ -1372,7 +1510,7 @@ async def nodes( :param nodes: specific compute nodes to query :calls: GET `/compute/nodes` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1402,7 +1540,7 @@ async def partitions( :param partitions: specific partitions nodes to query :calls: GET `/compute/partitions` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1430,7 +1568,7 @@ async def reservations( :param machine: the machine name where the scheduler belongs to :param reservations: specific reservations to query :calls: GET `/compute/reservations` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ params = {} @@ -1455,7 +1593,7 @@ async def cancel(self, machine: str, job_id: str | int) -> str: :param job_id: the ID of the job :calls: DELETE `/compute/jobs/{job_id}` - GET `/tasks/{taskid}` + GET `/tasks` """ resp = await self._delete_request( endpoint=f"/compute/jobs/{job_id}", @@ -1529,7 +1667,7 @@ async def submit_move_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/mv` - GET `/tasks/{taskid}` + GET `/tasks` """ resp: List[requests.Response] = [] endpoint = "/storage/xfer-internal/mv" @@ -1572,7 +1710,7 @@ async def submit_copy_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/cp` - GET `/tasks/{taskid}` + GET `/tasks` """ resp: List[requests.Response] = [] endpoint = "/storage/xfer-internal/cp" @@ -1614,7 +1752,7 @@ async def submit_compress_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/compress` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1661,7 +1799,7 @@ async def submit_extract_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/extract` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1706,7 +1844,7 @@ async def submit_rsync_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/rsync` - GET `/tasks/{taskid}` + GET `/tasks` """ resp: List[requests.Response] = [] endpoint = "/storage/xfer-internal/rsync" @@ -1747,7 +1885,7 @@ async def submit_delete_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/rm` - GET `/tasks/{taskid}` + GET `/tasks` """ resp: List[requests.Response] = [] endpoint = "/storage/xfer-internal/rm" diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 39d5e32..18b9ae4 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -25,7 +25,7 @@ import firecrest.FirecrestException as fe import firecrest.types as t from firecrest.ExternalStorage import ExternalUpload, ExternalDownload -from firecrest.utilities import time_block +from firecrest.utilities import time_block, slurm_state_completed if sys.version_info >= (3, 8): from typing import Literal @@ -60,6 +60,7 @@ class Firecrest: """ TOO_MANY_REQUESTS_CODE = 429 + TIMEOUT_STR = "Command has finished with timeout signal" def _retry_requests(func): def wrapper(*args, **kwargs): @@ -280,12 +281,12 @@ def _json_response( raise exc if status_code == 401: - logger.critical(f"Status of the response is 401") + logger.critical("Status of the response is 401") exc = fe.UnauthorizedException(responses) logger.critical(exc) raise exc elif status_code == 404: - logger.critical(f"Status of the response is 404") + logger.critical("Status of the response is 404") exc = fe.NotFound(responses) logger.critical(exc) raise exc @@ -594,14 +595,24 @@ def copy(self, machine: str, source_path: str, target_path: str) -> str: self._json_response([resp], 201) return target_path - def compress(self, machine: str, source_path: str, target_path: str) -> str: + def compress( + self, + machine: str, + source_path: str, + target_path: str, + fail_on_timeout: bool = True + ) -> str: """Compress files using gzip compression. - You can name the output file as you like, but typically these files have a .tar.gz extension. - When successful, the method returns a string with the path of the newly created file. + You can name the output file as you like, but typically these files + have a .tar.gz extension. When successful, the method returns a string + with the path of the newly created file. :param machine: the machine name where the filesystem belongs to :param source_path: the absolute source path :param target_path: the absolute target path + :param dereference: follow symbolic links + :param fail_on_timeout: if `True` on timeout, this method will raise an + exception and won't fall back to submitting a long running job :calls: POST `/utilities/compress` .. warning:: This is available only for FirecREST>=1.16.0 @@ -611,19 +622,88 @@ def compress(self, machine: str, source_path: str, target_path: str) -> str: additional_headers={"X-Machine-Name": machine}, data={"targetPath": target_path, "sourcePath": source_path}, ) - self._json_response([resp], 201) + # - If the response is 201, the request was successful so we can + # return the target path + # - If `fail_on_timeout==True` we let `_json_response` take care of + # possible errors by raising an exception + # - If the response is 400 and the error message is the timeout + # message, we will submit a job to compress the file + if ( + resp.status_code == 201 or + fail_on_timeout or + resp.status_code != 400 or + resp.json().get('error', '') != self.TIMEOUT_STR + ): + self._json_response([resp], 201) + else: + logger.debug( + f"Compression of {source_path} to {target_path} has finished " + f"with timeout signal. Will submit a job to compress the " + f"file." + ) + job_info = self.submit_compress_job( + machine, + source_path, + target_path + ) + jobid = job_info['jobid'] + active_jobs = self.poll_active( + machine, + [jobid] + ) + intervals = (2**i for i in itertools.count(start=0)) + while ( + active_jobs and + not slurm_state_completed(active_jobs[0]['state']) + ): + time.sleep(next(intervals)) + active_jobs = self.poll_active( + machine, + [jobid] + ) + + if ( + active_jobs and + active_jobs[0]['state'] != 'COMPLETED' + ): + raise Exception( + f"compression job (jobid={jobid}) finished with " + f"state {active_jobs[0]['state']}" + ) + + err_output = self.head( + machine, + job_info['job_file_err'] + ) + if (err_output != ''): + raise Exception( + f"compression job (jobid={jobid}) has failed: " + f"{err_output}" + ) + return target_path - def extract(self, machine: str, source_path: str, target_path: str, extension: str = "auto") -> str: + def extract( + self, + machine: str, + source_path: str, + target_path: str, + extension: str = "auto", + fail_on_timeout: bool = True + ) -> str: """Extract files. - If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath. + If you don't select the extension, FirecREST will try to guess the + right command based on the extension of the sourcePath. Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`. - When successful, the method returns a string with the path of the newly created file. + When successful, the method returns a string with the path of the + newly created file. :param machine: the machine name where the filesystem belongs to :param source_path: the absolute path of the file to be extracted :param target_path: the absolute target path where the `source_path` is extracted :param extension: file extension, possible values are `auto`, `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2` + :param fail_on_timeout: if `True` on timeout, this method will raise an + exception and won't fall back to submitting a long running job :calls: POST `/utilities/extract` .. warning:: This is available only for FirecREST>=1.16.0 @@ -637,7 +717,66 @@ def extract(self, machine: str, source_path: str, target_path: str, extension: s "extension": extension }, ) - self._json_response([resp], 201) + # - If the response is 201, the request was successful so we can + # return the target path + # - If `fail_on_timeout==True` we let `_json_response` take care of + # possible errors by raising an exception + # - If the response is 400 and the error message is the timeout + # message, we will submit a job to compress the file + if ( + resp.status_code == 201 or + fail_on_timeout or + resp.status_code != 400 or + resp.json().get('error', '') != self.TIMEOUT_STR + ): + self._json_response([resp], 201) + else: + logger.debug( + f"Extraction of {source_path} to {target_path} has finished " + f"with timeout signal. Will submit a job to extract the " + f"file." + ) + + job_info = self.submit_extract_job( + machine, + source_path, + target_path, + extension + ) + jobid = job_info['jobid'] + active_jobs = self.poll_active( + machine, + [jobid] + ) + intervals = (2**i for i in itertools.count(start=0)) + while ( + active_jobs and + not slurm_state_completed(active_jobs[0]['state']) + ): + time.sleep(next(intervals)) + active_jobs = self.poll_active( + machine, + [jobid] + ) + + if ( + active_jobs and + active_jobs[0]['state'] != 'COMPLETED' + ): + raise Exception( + f"extract job (jobid={jobid}) finished with" + f"state {active_jobs[0]['state']}" + ) + + err_output = self.head( + machine, + job_info['job_file_err'] + ) + if (err_output != ''): + raise Exception( + f"extract job has failed: {err_output}" + ) + return target_path def file_type(self, machine: str, target_path: str) -> str: @@ -1010,7 +1149,7 @@ def submit( :param env_vars: dictionary (varName, value) defining environment variables to be exported for the job :calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path` - GET `/tasks/{taskid}` + GET `/tasks` """ if [ script_str is None, @@ -1097,7 +1236,7 @@ def poll( :param page_number: page number (if set to `None` the default value is 0) :calls: GET `/compute/acct` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] if isinstance(jobs, str): @@ -1133,7 +1272,7 @@ def poll_active( :param page_number: page number (if set to `None` the default value is 0) :calls: GET `/compute/jobs` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] if isinstance(jobs, str): @@ -1162,7 +1301,7 @@ def nodes( :param nodes: specific compute nodes to query :calls: GET `/compute/nodes` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1194,7 +1333,7 @@ def partitions( :param nodes: specific compute nodes to query :calls: GET `/compute/partitions` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1224,7 +1363,7 @@ def reservations( :param machine: the machine name where the scheduler belongs to :param nodes: specific reservations to query :calls: GET `/compute/reservations` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ params = {} @@ -1251,7 +1390,7 @@ def cancel(self, machine: str, job_id: str | int) -> str: :param job_id: the ID of the job :calls: DELETE `/compute/jobs/{job_id}` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] resp = self._delete_request( @@ -1327,7 +1466,7 @@ def submit_move_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/mv` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] endpoint = "/storage/xfer-internal/mv" @@ -1370,7 +1509,7 @@ def submit_copy_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/cp` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] endpoint = "/storage/xfer-internal/cp" @@ -1413,7 +1552,7 @@ def submit_rsync_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/rsync` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] endpoint = "/storage/xfer-internal/rsync" @@ -1454,7 +1593,7 @@ def submit_delete_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/rm` - GET `/tasks/{taskid}` + GET `/tasks` """ self._current_method_requests = [] endpoint = "/storage/xfer-internal/rm" @@ -1530,7 +1669,7 @@ def submit_compress_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/compress` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ @@ -1577,7 +1716,7 @@ def submit_extract_job( :param account: name of the bank account to be used in SLURM. If not set, system default is taken. :calls: POST `/storage/xfer-internal/extract` - GET `/tasks/{taskid}` + GET `/tasks` .. warning:: This is available only for FirecREST>=1.16.0 """ diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index cd71662..243caa9 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -480,7 +480,7 @@ def compress( You can name the output file as you like, but typically these files have a .tar.gz extension. """ try: - client.compress(system, source, destination) + client.compress(system, source, destination, fail_on_timeout=False) except Exception as e: examine_exeption(e) raise typer.Exit(code=1) @@ -509,7 +509,7 @@ def extract( Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`. """ try: - client.extract(system, source, destination) + client.extract(system, source, destination, extension, fail_on_timeout=False) except Exception as e: examine_exeption(e) raise typer.Exit(code=1) diff --git a/firecrest/utilities.py b/firecrest/utilities.py index e363904..5bc1cba 100644 --- a/firecrest/utilities.py +++ b/firecrest/utilities.py @@ -1,6 +1,7 @@ import time from contextlib import contextmanager + @contextmanager def time_block(label, logger): start_time = time.time() @@ -9,3 +10,21 @@ def time_block(label, logger): finally: end_time = time.time() logger.debug(f"{label} took {end_time - start_time:.6f} seconds") + + +def slurm_state_completed(state): + completion_states = { + 'BOOT_FAIL', + 'CANCELLED', + 'COMPLETED', + 'DEADLINE', + 'FAILED', + 'NODE_FAIL', + 'OUT_OF_MEMORY', + 'PREEMPTED', + 'TIMEOUT', + } + if state: + return all(s in completion_states for s in state.split(',')) + + return False