From 1f5ded117af802635b92b0f559d965a5736dad05 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 16 Dec 2024 13:47:19 +0100 Subject: [PATCH] Log retrievals, orphan , zombies, db fixes rework and improvements * Autosubmit config version compatibility fix ( rebasing ) Reverting a conflict in the rebase to the other option II ( tests are only failing in the CI/CD ) Reverting a conflict in the rebase to the other option updated typing added a missing return Changed err_code function to return the err_code Rebase fix Squashed zombies/orphan and database fixes. PIPELINE Fix doc test Fix pipeline Added comments and typing (II ) Added comments and typing Test pipeline ( now with output) Test pipeline Update as version Fix tests, added pytest-xdist !475 vertical regression test [Single job] Test working, retrials working Changes to stat, db and retrials (WIP) Added database test, fixed some issues reduced time.sleeps Mock exists() so it works in the pipeline Fix events queue being reset in a platform failure clean import added more tests Changed the timeout removed debug message Added tests for signals cleanup code Added cleanup signal changes to signals and waitpid A new orphan/zombies code approach WIP: Looking good, some more fixes neccesaries and ready to be pushed WIP: Zombies/orphan processors logs fix * Pipeline failing, debug (I) * Fix Stat_0 file being deleted when it shouldn't * Fixed issues with the db and logs not being recovered in different run scenearios * Update VERSION * added process_jobs_to_submit fixes runtime issue * Added few todos that requires to change critical stuff * Fix runtime issue, recovery now shows a better output * Improved package test * Improved package test * Closing file descriptors on reconnect. * Fixes an issue with logs with this sequence of commands: `create + run + create + recovery + run` * Intregation test added, fixed few bugs * Fixed DB test * Now the db test also checks the stat and job_names in the filesystem * feedback * Changed test name, Added generic functions in the test to expand it with other run options * Added few todos * disabled the check exit code as it is not working on the pipeline and not important right now * Adding more chunks to the wrapper success * More detailed test * Add CI and dependabot GitHub actions (#2021) * add CI and dependabot gh actions * disable linting temporarily (cherry picked from commit 8a1b2dfaad7f57adedbf9da86c0da9c0f3a2fb27) * Improved test output in case of failure * more info * more info * Test adding +x * added chmod after sending * chmod added --------- Co-authored-by: dbeltran Co-authored-by: Luiggi Tenorio --------- Co-authored-by: dbeltran Co-authored-by: Luiggi Tenorio --------- Co-authored-by: dbeltran Co-authored-by: Luiggi Tenorio --- VERSION | 2 +- autosubmit/__init__.py | 7 +- autosubmit/autosubmit.py | 128 ++---- autosubmit/helpers/utils.py | 17 - .../experiment_history_db_manager.py | 41 +- autosubmit/history/experiment_history.py | 38 +- autosubmit/job/job.py | 171 ++++++-- autosubmit/job/job_common.py | 4 +- autosubmit/job/job_list.py | 105 +++-- autosubmit/job/job_packager.py | 57 +-- autosubmit/job/job_packages.py | 55 ++- autosubmit/platforms/ecplatform.py | 41 +- autosubmit/platforms/locplatform.py | 76 +++- autosubmit/platforms/paramiko_platform.py | 26 +- autosubmit/platforms/pjmplatform.py | 7 +- autosubmit/platforms/platform.py | 192 ++++++--- autosubmit/platforms/sgeplatform.py | 13 +- autosubmit/platforms/slurmplatform.py | 57 ++- .../platforms/wrappers/wrapper_builder.py | 21 +- .../platforms/wrappers/wrapper_factory.py | 52 +++ bin/autosubmit | 7 +- test/unit/test_database_regression.py | 264 ------------ test/unit/test_job.py | 21 + test/unit/test_job_package.py | 17 +- test/unit/test_job_pytest.py | 66 +++ test/unit/test_log_recovery.py | 160 +++++++ test/unit/test_packages.py | 50 +++ test/unit/test_pjm_platform_pytest.py | 88 ++++ test/unit/test_run_command_intregation.py | 402 ++++++++++++++++++ test/unit/test_slurm_platform_pytest.py | 88 ++++ 30 files changed, 1572 insertions(+), 701 deletions(-) delete mode 100644 test/unit/test_database_regression.py create mode 100644 test/unit/test_log_recovery.py create mode 100644 test/unit/test_packages.py create mode 100644 test/unit/test_pjm_platform_pytest.py create mode 100644 test/unit/test_run_command_intregation.py create mode 100644 test/unit/test_slurm_platform_pytest.py diff --git a/VERSION b/VERSION index 152e4522c..b05079e54 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.11 +4.1.12 diff --git a/autosubmit/__init__.py b/autosubmit/__init__.py index 3281913d2..0942a1ae7 100644 --- a/autosubmit/__init__.py +++ b/autosubmit/__init__.py @@ -22,7 +22,7 @@ def delete_lock_file(base_path: str = Log.file_path, lock_file: str = 'autosubmi Path(base_path, lock_file).unlink(missing_ok=True) -def exit_from_error(e: BaseException) -> None: +def exit_from_error(e: BaseException) -> int: """Called by ``Autosubmit`` when an exception is raised during a command execution. Prints the exception in ``DEBUG`` level. @@ -40,6 +40,7 @@ def exit_from_error(e: BaseException) -> None: :type e: BaseException :return: None """ + err_code = 1 trace = traceback.format_exc() try: Log.debug(trace) @@ -60,11 +61,13 @@ def exit_from_error(e: BaseException) -> None: if e.trace: Log.debug("Trace: {0}", str(e.trace)) Log.critical("{1} [eCode={0}]", e.code, e.message) + err_code = e.code if not is_portalocker_error and not is_autosubmit_error: msg = "Unexpected error: {0}.\n Please report it to Autosubmit Developers through Git" args = [str(e)] Log.critical(msg.format(*args)) + err_code = 7000 Log.info("More info at https://autosubmit.readthedocs.io/en/master/troubleshooting/error-codes.html") - _exit(1) + return err_code diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 740628161..b21e32f0d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -85,7 +85,7 @@ import autosubmit.history.utils as HUtils import autosubmit.helpers.autosubmit_helper as AutosubmitHelper import autosubmit.statistics.utils as StatisticsUtils -from autosubmit.helpers.utils import proccess_id, terminate_child_process, check_jobs_file_exists +from autosubmit.helpers.utils import proccess_id, check_jobs_file_exists from contextlib import suppress @@ -1743,46 +1743,6 @@ def generate_scripts_andor_wrappers(as_conf, job_list, jobs_filtered, packages_p for job in job_list.get_job_list(): job.status = Status.WAITING - - @staticmethod - def terminate_child_process(expid, platform = None): - # get pid of the main process - pid = os.getpid() - # In case some one used 4.1.6 or 4.1.5 - process_ids = proccess_id(expid,"run", single_instance = False, platform = platform) - if process_ids: - for process_id in [ process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) - process_ids = proccess_id(expid,"log", single_instance = False, platform = platform) - # 4.1.7 + - if process_ids: - for process_id in [ process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) - - - @staticmethod - def terminate(all_threads): - # Closing threads on Ctrl+C - Log.info( - "Looking for active threads before closing Autosubmit. Ending the program before these threads finish may result in unexpected behavior. This procedure will last until all threads have finished or the program has waited for more than 30 seconds.") - timeout = 0 - active_threads = True - while active_threads and timeout <= 60: - active_threads = False - for thread in all_threads: - if "JOB_" in thread.name: - if thread.is_alive(): - active_threads = True - Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format( - thread.name, 60 - timeout)) - break - if active_threads: - sleep(10) - timeout += 10 - - @staticmethod def manage_wrapper_job(as_conf, job_list, platform, wrapper_id, save=False): check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() @@ -1983,7 +1943,7 @@ def process_historical_data_iteration(job_list,job_changes_tracker, expid): return exp_history @staticmethod def prepare_run(expid, notransitive=False, start_time=None, start_after=None, - run_only_members=None, recover = False, check_scripts= False): + run_only_members=None, recover = False, check_scripts= False, submitter=None): """ Prepare the run of the experiment. :param expid: a string with the experiment id. @@ -1992,6 +1952,7 @@ def prepare_run(expid, notransitive=False, start_time=None, start_after=None, :param start_after: a string with the experiment id to start after. :param run_only_members: a string with the members to run. :param recover: a boolean to indicate if the experiment is recovering from a failure. + :param submitter: the actual loaded platforms if any :return: a tuple """ host = platform.node() @@ -2026,8 +1987,9 @@ def prepare_run(expid, notransitive=False, start_time=None, start_after=None, # Loads the communication lib, always paramiko. # Paramiko is the only way to communicate with the remote machines. Previously we had also Saga. - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) + if not submitter: + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) # Tries to load the job_list from disk, discarding any changes in running time ( if recovery ). # Could also load a backup from previous iteration. # The submit ready functions will cancel all job submitted if one submitted in that iteration had issues, so it should be safe to recover from a backup without losing job ids @@ -2140,7 +2102,7 @@ def prepare_run(expid, notransitive=False, start_time=None, start_after=None, Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf) return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False else: - return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True + return job_list, submitter, None, None, as_conf, platforms_to_test, packages_persistence, True @staticmethod def get_iteration_info(as_conf,job_list): """ @@ -2161,7 +2123,12 @@ def get_iteration_info(as_conf,job_list): Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Number of retrials: {0}", default_retrials) return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime - + + @staticmethod + def check_logs_status(job_list, as_conf, new_run): + for job in job_list.get_completed_failed_without_logs(): + job_list.update_log_status(job, as_conf, new_run) + @staticmethod def run_experiment(expid, notransitive=False, start_time=None, start_after=None, run_only_members=None, profile=False): """ @@ -2220,14 +2187,16 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None, max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) recovery_retrials = 0 + Autosubmit.check_logs_status(job_list, as_conf, new_run=True) while job_list.get_active(): + for platform in platforms_to_test: # Send keep_alive signal + platform.work_event.set() for job in [job for job in job_list.get_job_list() if job.status == Status.READY]: job.update_parameters(as_conf, {}) did_run = True try: if Autosubmit.exit: - terminate_child_process(expid) - Autosubmit.terminate(threading.enumerate()) + Autosubmit.check_logs_status(job_list, as_conf, new_run=False) if job_list.get_failed(): return 1 return 0 @@ -2297,6 +2266,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None, "Couldn't recover the Historical database, AS will continue without it, GUI may be affected") job_changes_tracker = {} if Autosubmit.exit: + Autosubmit.check_logs_status(job_list, as_conf, new_run=False) job_list.save() as_conf.save() time.sleep(safetysleeptime) @@ -2332,7 +2302,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None, start_time, start_after, run_only_members, - recover=True) + recover=True, submitter = submitter) except AutosubmitError as e: recovery = False Log.result("Recover of job_list has fail {0}".format(e.message)) @@ -2383,33 +2353,27 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None, except BaseException: raise # If this happens, there is a bug in the code or an exception not-well caught Log.result("No more jobs to run.") - if not did_run and len(job_list.get_completed_without_logs()) > 0: - #connect to platforms + # search hint - finished run + job_list.save() + if not did_run and len(job_list.get_completed_failed_without_logs()) > 0: # Revise if there is any log unrecovered from previous run Log.info(f"Connecting to the platforms, to recover missing logs") submitter = Autosubmit._get_submitter(as_conf) submitter.load_platforms(as_conf) if submitter.platforms is None: raise AutosubmitCritical("No platforms configured!!!", 7014) - platforms = [value for value in submitter.platforms.values()] - Autosubmit.restore_platforms(platforms, as_conf=as_conf, expid=expid) - # Wait for all remaining threads of I/O, close remaining connections - # search hint - finished run + platforms_to_test = [value for value in submitter.platforms.values()] + Autosubmit.restore_platforms(platforms_to_test, as_conf=as_conf, expid=expid) Log.info("Waiting for all logs to be updated") - # get all threads - threads = threading.enumerate() - # print name - timeout = as_conf.experiment_data.get("CONFIG",{}).get("LAST_LOGS_TIMEOUT", 180) - for remaining in range(timeout, 0, -1): - if len(job_list.get_completed_without_logs()) == 0: - break - for job in job_list.get_completed_without_logs(): - job.platform = submitter.platforms[job.platform_name.upper()] - job_list.update_log_status(job, as_conf) - sleep(1) - if remaining % 10 == 0: - Log.info(f"Timeout: {remaining}") - - # Updating job data header with current information when experiment ends + for p in platforms_to_test: + if p.log_recovery_process: + p.cleanup_event.set() # Send cleanup event + p.log_recovery_process.join() + Autosubmit.check_logs_status(job_list, as_conf, new_run=False) + job_list.save() + if len(job_list.get_completed_failed_without_logs()) == 0: + Log.result(f"Autosubmit recovered all job logs.") + else: + Log.warning(f"Autosubmit couldn't recover the following job logs: {[job.name for job in job_list.get_completed_failed_without_logs()]}") try: exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) @@ -2420,9 +2384,8 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None, Autosubmit.database_fix(expid) except Exception as e: pass - terminate_child_process(expid) - for platform in platforms_to_test: - platform.closeConnection() + for p in platforms_to_test: + p.closeConnection() if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") else: @@ -2434,13 +2397,10 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None, except Exception: Log.warning("Database is locked") except BaseLockException: - terminate_child_process(expid) raise except AutosubmitCritical: - terminate_child_process(expid) raise except BaseException: - terminate_child_process(expid) raise finally: if profile: @@ -2596,7 +2556,6 @@ def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence raise except BaseException as e: raise - raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, @@ -3004,9 +2963,8 @@ def recovery(expid, noplot, save, all_jobs, hide, group_by=None, expand=list(), "Experiment can't be recovered due being {0} active jobs in your experiment, If you want to recover the experiment, please use the flag -f and all active jobs will be cancelled".format( len(current_active_jobs)), 7000) Log.debug("Job list restored from {0} files", pkl_dir) - except BaseException as e: - raise AutosubmitCritical( - "Couldn't restore the job_list or packages, check if the filesystem is having issues", 7040, str(e)) + except Exception: + raise Log.info('Recovering experiment {0}'.format(expid)) try: for job in job_list.get_job_list(): @@ -3040,13 +2998,8 @@ def recovery(expid, noplot, save, all_jobs, hide, group_by=None, expand=list(), job.status = Status.COMPLETED Log.info( "CHANGED job '{0}' status to COMPLETED".format(job.name)) - # Log.status("CHANGED job '{0}' status to COMPLETED".format(job.name)) - - if not no_recover_logs: - try: - job.platform.get_logs_files(expid, job.remote_logs) - except Exception as e: - pass + job.recover_last_ready_date() + job.recover_last_log_name() elif job.status != Status.SUSPENDED: job.status = Status.WAITING job._fail_count = 0 @@ -6115,4 +6068,3 @@ def retrieve_expids(): if status in Status.VALUE_TO_KEY.values(): job.status = Status.KEY_TO_VALUE[status] job_list.save() - terminate_child_process(expid) diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index 6d13cb53b..ce5d53da7 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -2,7 +2,6 @@ import os import pwd import re -import signal import subprocess from itertools import zip_longest @@ -36,21 +35,6 @@ def check_jobs_file_exists(as_conf, current_section_name=None): if missing_files: raise AutosubmitCritical(f"Templates not found:\n{missing_files}", 7011) -def terminate_child_process(expid, platform=None): - # get pid of the main process - pid = os.getpid() - # In case someone used 4.1.6 or 4.1.5 - process_ids = proccess_id(expid, "run", single_instance=False, platform=platform) - if process_ids: - for process_id in [process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) - process_ids = proccess_id(expid, "log", single_instance=False, platform=platform) - # 4.1.7 + - if process_ids: - for process_id in [process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) def proccess_id(expid=None, command="run", single_instance=True, platform=None): # Retrieve the process id of the autosubmit process @@ -161,7 +145,6 @@ def restore_platforms(platform_to_test, mail_notify=False, as_conf=None, expid=N else: raise AutosubmitCritical("Issues while checking the connectivity of platforms.", 7010, issues + "\n" + ssh_config_issues) - # Source: https://github.com/cylc/cylc-flow/blob/a722b265ad0bd68bc5366a8a90b1dbc76b9cd282/cylc/flow/tui/util.py#L226 class NaturalSort: """An object to use as a sort key for sorting strings as a human would. diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 1e8127bef..9d205f933 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -17,12 +17,13 @@ # along with Autosubmit. If not, see . import os import textwrap +from typing import Any + import autosubmit.history.utils as HUtils from . import database_models as Models from autosubmit.history.data_classes.job_data import JobData from autosubmit.history.data_classes.experiment_run import ExperimentRun from .database_manager import DatabaseManager, DEFAULT_JOBDATA_DIR - CURRENT_DB_VERSION = 18 DB_EXPERIMENT_HEADER_SCHEMA_CHANGES = 14 DB_VERSION_SCHEMA_CHANGES = 12 @@ -216,11 +217,20 @@ def update_job_data_dc_by_id(self, job_data_dc): self._update_job_data_by_id(job_data_dc) return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name) - def update_job_data_dc_by_job_id_name(self, job_data_dc): - """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ + def update_job_data_dc_by_job_id_name(self, job_data_dc: Any) -> Any: + """ + Update JobData data class. Returns the latest row from job_data by job_name. + + Args: + job_data_dc (JobData): The JobData data class instance containing job_id and job_name. + + Returns: + Any: The latest row from job_data corresponding to the given job_id and job_name. + """ self._update_job_data_by_id(job_data_dc) + # Return the latest row from job_data by job_id and job_name return self.get_job_data_by_job_id_name(job_data_dc.job_id, job_data_dc.job_name) - + def update_list_job_data_dc_by_each_id(self, job_data_dcs): """ Return length of updated list. """ for job_data_dc in job_data_dcs: @@ -316,10 +326,16 @@ def update_many_job_data_change_status(self, changes): statement = ''' UPDATE job_data SET modified=?, status=?, rowstatus=? WHERE id=? ''' self.execute_many_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, changes) - def _update_job_data_by_id(self, job_data_dc): + def _update_job_data_by_id(self, job_data_dc: Any) -> None: """ - Update job_data table with data class JobData. + Update job_data table with data class JobData. Update last, submit, start, finish, modified, job_id, status, energy, extra_data, nnodes, ncpus, rowstatus, out, err by id. + + Args: + job_data_dc (JobData): The JobData data class instance containing job data to be updated. + + Returns: + None """ statement = ''' UPDATE job_data SET last=?, submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, @@ -351,8 +367,17 @@ def _get_job_data_last_by_run_id_and_finished(self, run_id): job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [Models.JobDataRow(*row) for row in job_data_rows] - def get_job_data_by_job_id_name(self, job_id, job_name): - """ Get List of Models.JobDataRow for job_id """ + def get_job_data_by_job_id_name(self, job_id: int, job_name: str) -> JobData: + """ + Get the latest JobData for a given job_id and job_name. + + Args: + job_id (int): The job ID. + job_name (str): The job name. + + Returns: + JobData: The latest JobData instance. + """ statement = self.get_built_select_statement("job_data", "job_id=? AND job_name=? ORDER BY counter") arguments = (int(job_id), str(job_name),) job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index e657d3729..1c30013a3 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -93,9 +93,23 @@ def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallc return None - def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, - children=""): + def write_start_time(self, job_name: str, start: int = 0, status: str = "UNKNOWN", qos: str = "debug", job_id: int = 0, wrapper_queue: str = None, wrapper_code: str = None, children: str = "") -> JobData: + """ + Updates the start time and other details of a job in the database. + + Args: + job_name (str): The name of the job. + start (int, optional): The start time of the job. Default to 0. + status (str, optional): The status of the job. Defaults to "UNKNOWN". + qos (str, optional): The quality of service. Default to "debug". + job_id (int, optional): The job ID. Default to 0. + wrapper_queue (Optional[str], optional): The wrapper queue. Defaults to None. + wrapper_code (Optional[str], optional): The wrapper code. Defaults to None. + children (str, optional): The children. Default to an empty string. + + Returns: + JobData: The result of updating the job data, or None if an exception occurs. + """ try: job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: @@ -111,9 +125,21 @@ def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclo self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, - wrapper_queue=None, wrapper_code=None, children=""): + def write_finish_time(self, job_name: str, finish: int = 0, status: str = "UNKNOWN", job_id: int = 0, out_file: str = None, err_file: str = None) -> JobData: + """ + Updates the finish time and other details of a job in the database. + + Args: + job_name (str): The name of the job. + finish (int, optional): The finish time of the job. Default to 0. + status (str, optional): The status of the job. Defaults to "UNKNOWN". + job_id (int, optional): The job ID. Default to 0. + out_file (Optional[str], optional): The output file path. Defaults to None. + err_file (Optional[str], optional): The error file path. Defaults to None. + + Returns: + JobData: The result of updating the job data, or None if an exception occurs. + """ try: job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 507f3fb75..b6543d08a 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -25,7 +25,6 @@ from pathlib import Path from autosubmit.job import job_utils -from contextlib import suppress import copy import datetime import json @@ -38,7 +37,7 @@ from functools import reduce from threading import Thread from time import sleep -from typing import List, Union +from typing import List, Union, Dict, Any from autosubmit.helpers.parameters import autosubmit_parameter, autosubmit_parameters from autosubmit.history.experiment_history import ExperimentHistory @@ -47,10 +46,10 @@ from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk from autosubmit.job.job_utils import get_job_package_code, get_split_size_unit, get_split_size from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter +from autosubmit.platforms.platform import Platform from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from log.log import Log, AutosubmitCritical, AutosubmitError +from log.log import Log, AutosubmitCritical Log.get_logger("Autosubmit") @@ -130,7 +129,8 @@ def wrapper(*args, **kwargs): } ) class Job(object): - """Class to handle all the tasks with Jobs at HPC. + """ + Class to handle all the tasks with Jobs at HPC. A job is created by default with a name, a jobid, a status and a type. It can have children and parents. The inheritance reflects the dependency between jobs. @@ -222,6 +222,7 @@ def __init__(self, name, job_id, status, priority): self.parameters = None self._tmp_path = os.path.join( BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR) + self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}") self.write_start = False self._platform = None self.check = 'true' @@ -274,7 +275,6 @@ def _init_runtime_parameters(self): self._memory_per_task = '' self.log_retrieved = False self.start_time_timestamp = time.time() - self.end_time_placeholder = time.time() self.processors_per_node = "" self.stat_file = self.script_name[:-4] + "_STAT_0" @@ -879,7 +879,7 @@ def processors_per_node(self, value): """Number of processors per node that the job can use.""" self._processors_per_node = value - def set_ready_date(self): + def set_ready_date(self) -> None: """ Sets the ready start date for the job """ @@ -979,14 +979,16 @@ def has_parents(self): """ return self.parents.__len__() - def _get_from_stat(self, index, fail_count =-1): + def _get_from_stat(self, index: int, fail_count: int =-1) -> int: """ - Returns value from given row index position in STAT file associated to job + Returns value from given row index position in STAT file associated to job. - :param index: row position to retrieve - :type index: int - :return: value in index position - :rtype: int + Args: + index (int): Row position to retrieve. + fail_count (int, optional): Fail count to determine the STAT file name. Default to self.stat_file for non-wrapped jobs. + + Returns: + int: Value in the index position, or 0 if the file or index does not exist. """ if fail_count == -1: logname = os.path.join(self._tmp_path, self.stat_file) @@ -1128,12 +1130,25 @@ def retrieve_external_retrials_logfiles(self, platform): log_retrieved = False self.log_retrieved = log_retrieved - def retrieve_internal_retrials_logfiles(self, platform): + def retrieve_internal_retrials_logfiles(self, platform: Platform) -> int: + """ + Retrieves internal retrials log files for the given platform. + This function is used when the job is inside a vertical wrapper. + + Args: + platform (Platform): The platform object to interact with. + + Returns: + int: The last retrial index where logs were successfully retrieved. + """ log_retrieved = False last_retrial = 0 try: for i in range(0, int(self.retrials + 1)): + # Update local logs to give a name to the recovered log self.update_local_logs(count=i, update_submit_time=False) + + # Backup the remote log name in case that the log couldn't be recovered. backup_log = copy.copy(self.remote_logs) self.remote_logs = self.get_new_remotelog_name(i) if self.check_remote_log_exists(platform): @@ -1147,19 +1162,39 @@ def retrieve_internal_retrials_logfiles(self, platform): break except: pass + self.log_retrieved = log_retrieved if self.log_retrieved: self.platform.processed_wrapper_logs.add(self.wrapper_name) return last_retrial - def write_stats(self, last_retrial): - # write stats + def update_stat_file(self): + if self.wrapper_type != "vertical": + self.stat_file = f"{self.script_name[:-4]}_STAT_{self.fail_count}" + else: + self.stat_file = f"{self.script_name[:-4]}_STAT_0" + + def write_stats(self, last_retrial: int) -> None: + """ + + Gathers the stat file, writes statistics into the job_data.db, and updates the total_stat file. + Considers whether the job is a vertical wrapper and the number of retrials to gather. + + Args: + last_retrial (int): The last retrial count. + + Returns: + None + + """ + # Write stats for vertical wrappers if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones for i in range(0, int(last_retrial + 1)): self.platform.get_stat_file(self, count=i) self.write_vertical_time(i) self.inc_fail_count() + # Update the logs with Autosubmit Job ID Brand try: for local_log in self.local_logs: @@ -1168,12 +1203,12 @@ def write_stats(self, last_retrial): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) else: + # Update local logs without updating the submit time self.update_local_logs(update_submit_time=False) self.platform.get_stat_file(self) self.write_submit_time() self.write_start_time() self.write_end_time(self.status == Status.COMPLETED) - Log.result(f"{self.fail_count} retrials of job:{self.name} and {self.id} has been inserted in the db") # Update the logs with Autosubmit Job ID Brand try: for local_log in self.local_logs: @@ -1182,12 +1217,16 @@ def write_stats(self, last_retrial): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) - def retrieve_logfiles(self, platform, raise_error=False): + def retrieve_logfiles(self, platform: Any, raise_error: bool = False) -> Dict[str, int]: """ - Retrieves log files from remote host - :param platform: HPCPlatform object - :param raise_error: boolean, if True, raises an error if the log files are not retrieved - :return: dict with finish timestamps per job + Retrieves log files from remote host. + + Args: + platform (Platform): HPCPlatform object. + raise_error (bool): If True, raises an error if the log files are not retrieved. + + Returns: + Dict[str, int]: Dictionary with finish timestamps per job. """ backup_logname = copy.copy(self.local_logs) if self.wrapper_type == "vertical": @@ -1330,17 +1369,17 @@ def update_children_status(self): child.status = Status.FAILED children += list(child.children) - def check_completion(self, default_status=Status.FAILED,over_wallclock=False): + def check_completion(self, default_status=Status.FAILED, over_wallclock=False): """ Check the presence of *COMPLETED* file. Change status to COMPLETED if *COMPLETED* file exists and to FAILED otherwise. :param over_wallclock: - :param default_status: status to set if job is not completed. By default, is FAILED + :param default_status: status to set if job is not completed. By default, it is FAILED :type default_status: Status """ completed_file = os.path.join(self._tmp_path, self.name + '_COMPLETED') completed_file_location = os.path.join(self._tmp_path, f"LOG_{self.expid}", self.name + '_COMPLETED') - + # I'm not fan of this but, it is the only way of doing it without a rework. if os.path.exists(completed_file) or os.path.exists(completed_file_location): if not over_wallclock: self.status = Status.COMPLETED @@ -1904,11 +1943,7 @@ def update_job_parameters(self,as_conf, parameters): self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") self.script = as_conf.jobs_data[self.section].get("SCRIPT", "") self.x11 = False if str(as_conf.jobs_data[self.section].get("X11", False)).lower() == "false" else True - if self.wrapper_type != "vertical" and self.packed: - self.stat_file = f"{self.script_name[:-4]}_STAT_{self.fail_count}" - else: - self.stat_file = f"{self.script_name[:-4]}_STAT_0" - + self.update_stat_file() if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -2311,9 +2346,7 @@ def write_start_time(self, count=-1, vertical_wrapper=False): f.write(date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S')) # Writing database exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), qos=self.queue, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) return True @@ -2332,6 +2365,7 @@ def write_end_time(self, completed, count=-1): :param count: number of retrials :type count: int """ + end_time = self.check_end_time(count) path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') @@ -2356,10 +2390,7 @@ def write_end_time(self, completed, count=-1): out, err = self.local_logs # Launch first as simple non-threaded function exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, - wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) + job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, job_id=self.id, out_file=out, err_file=err) # Launch second as threaded function only for slurm if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": @@ -2388,9 +2419,7 @@ def write_total_stat_by_retries(self, total_stats, first_retrial = False): # Launch first as simple non-threaded function exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), qos=self.queue, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) if not first_retrial: exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) @@ -2399,10 +2428,7 @@ def write_total_stat_by_retries(self, total_stats, first_retrial = False): platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - job_data_dc = exp_history.write_finish_time(self.name, finish=total_stats[1], status=total_stats[2], ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, - wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) + job_data_dc = exp_history.write_finish_time(self.name, finish=total_stats[1], status=total_stats[2], job_id=self.id, out_file=out, err_file=err) # Launch second as threaded function only for slurm if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform)) @@ -2475,6 +2501,51 @@ def synchronize_logs(self, platform, remote_logs, local_logs, last = True): self.local_logs = local_logs self.remote_logs = copy.deepcopy(local_logs) + def _recover_last_log_name_from_filesystem(self) -> bool: + """ + Recovers the log name for the job from the filesystem. + :return: True if the log name was already recovered, False otherwise + :rtype: bool + """ + log_name = sorted(list(self._log_path.glob(f"{self.name}*")), key=lambda x: x.stat().st_mtime) + log_name = log_name[-1] if log_name else None + if log_name: + file_timestamp = int(datetime.datetime.fromtimestamp(log_name.stat().st_mtime).strftime("%Y%m%d%H%M%S")) + if self.ready_date and file_timestamp >= int(self.ready_date): + self.local_logs = (log_name.with_suffix(".out").name, log_name.with_suffix(".err").name) + self.remote_logs = copy.deepcopy(self.local_logs) + return True + self.local_logs = (f"{self.name}.out.{self._fail_count}", f"{self.name}.err.{self._fail_count}") + self.remote_logs = copy.deepcopy(self.local_logs) + return False + + def recover_last_log_name(self): + """ + Recovers the last log name for the job + """ + if not self.updated_log: + self.updated_log = self._recover_last_log_name_from_filesystem() + # TODO: After PostgreSQL migration, implement _recover_last_log_from_db() to retrieve the last log from the database. + + def recover_last_ready_date(self) -> None: + """ + Recovers the last ready date for this job + """ + if not self.ready_date: + stat_file = Path(f"{self._tmp_path}/{self.name}_TOTAL_STATS") + if stat_file.exists(): + output_by_lines = stat_file.read_text().splitlines() + if output_by_lines: + line_info = output_by_lines[-1].split(" ") + if line_info and line_info[0].isdigit(): + self.ready_date = line_info[0] + else: + self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + Log.debug(f"Failed to recover ready date for the job {self.name}") + else: # Default to last mod time + self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + Log.debug(f"Failed to recover ready date for the job {self.name}") + class WrapperJob(Job): """ @@ -2599,6 +2670,13 @@ def check_inner_jobs_completed(self, jobs): job.new_status = Status.COMPLETED job.updated_log = False job.update_status(self.as_config) + # for job in self.job_list: + # if job not in completed_jobs and job in self.inner_jobs_running: + # job.new_status = Status.FAILED + # job.packed = False + # else: + # job.new_status = Status.WAITING + # job.packed = False for job in completed_jobs: self.running_jobs_start.pop(job, None) @@ -2704,8 +2782,7 @@ def _check_running_jobs(self): self._platform.send_file(multiple_checker_inner_jobs, False) command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}" else: - command = os.path.join( - self._platform.get_files_path(), "inner_jobs_checker.sh") + command = f"cd {self._platform.get_files_path()}; ./inner_jobs_checker.sh; cd {os.getcwd()}" # wait = 2 retries = 5 @@ -2793,7 +2870,7 @@ def cancel_failed_wrapper_job(self): self._platform.send_command( self._platform.cancel_cmd + " " + str(self.id)) except: - Log.debug(f'Job with {self.id} was finished before canceling it') + Log.info(f'Job with {self.id} was finished before canceling it') self._check_running_jobs() for job in self.inner_jobs_running: job.status = Status.FAILED diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 92dc1b1bd..5897ec0fa 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -127,7 +127,7 @@ def as_header(scheduler_header, executable): set -xuve job_name_ptrn='%CURRENT_LOGDIR%/%JOBNAME%' - echo $(date +%s) > ${job_name_ptrn}_STAT + echo $(date +%s) > ${job_name_ptrn}_STAT_%FAIL_COUNT% ################### # AS CHECKPOINT FUNCTION @@ -154,7 +154,7 @@ def as_tailer(): # Autosubmit tailer ################### set -xuve - echo $(date +%s) >> ${job_name_ptrn}_STAT + echo $(date +%s) >> ${job_name_ptrn}_STAT_%FAIL_COUNT% touch ${job_name_ptrn}_COMPLETED exit 0 diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b32b044ed..58f3fbf47 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -24,7 +24,7 @@ from contextlib import suppress from shutil import move from threading import Thread -from typing import List, Dict, Tuple +from typing import List, Dict, Tuple, Any from pathlib import Path import math @@ -37,7 +37,7 @@ import networkx as nx from bscearth.utils.date import date2str, parse_date from networkx import DiGraph -from time import localtime, strftime, mktime, time +from time import localtime, mktime, time import autosubmit.database.db_structure as DbStructure from autosubmit.helpers.data_transfer import JobRow @@ -48,6 +48,7 @@ from autosubmit.job.job_packages import JobPackageThread from autosubmit.job.job_utils import Dependency, _get_submitter from autosubmit.job.job_utils import transitive_reduction +from autosubmit.platforms.platform import Platform from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig from log.log import AutosubmitCritical, AutosubmitError, Log @@ -1220,6 +1221,7 @@ def get_filters_to_apply(self, job, dependency): if filters_to_apply.get("CHUNKS_TO","none") == "none" and filters_to_apply.get("MEMBERS_TO","none") == "none" and filters_to_apply.get("DATES_TO","none") == "none" and filters_to_apply.get("SPLITS_TO","none") == "none": filters_to_apply = {} filters_to_apply.pop("FROM_STEP", None) + filters_to_apply.pop("ANY_FINAL_STATUS_IS_VALID", None) # If the selected filter is "natural" for all filters_to, trigger the natural dependency calculation all_natural = True @@ -1780,35 +1782,21 @@ def get_completed(self, platform=None, wrapper=False): else: return completed_jobs - def get_completed_without_logs(self, platform=None): + def get_completed_failed_without_logs(self, platform: Any = None) -> List[Any]: """ - Returns a list of completed jobs without updated logs + Returns a list of completed or failed jobs without updated logs. - :param platform: job platform - :type platform: HPCPlatform - :return: completed jobs - :rtype: list - """ - - completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and - job.status == Status.COMPLETED and job.updated_log is False ] + Args: + platform Platform: Job platform, defaults to None. - return completed_jobs - - def get_completed_without_logs(self, platform=None): - """ - Returns a list of completed jobs without updated logs - - :param platform: job platform - :type platform: HPCPlatform - :return: completed jobs - :rtype: list + Returns: + List[Job]: List of completed and failed jobs without updated logs. """ - completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and - job.status == Status.COMPLETED and job.updated_log is False ] + completed_failed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and + (job.status == Status.COMPLETED or job.status == Status.FAILED) and job.updated_log is False ] - return completed_jobs + return completed_failed_jobs def get_uncompleted(self, platform=None, wrapper=False): """ @@ -2678,33 +2666,46 @@ def _count_parents_status(job: Job, target_status: str) -> Tuple[List[Job], List non_completed_parents_current.append(parent[0]) return non_completed_parents_current, completed_parents - def update_log_status(self, job, as_conf): + def update_log_status(self, job, as_conf, new_run=False): """ Updates the log err and log out. """ - if not hasattr(job,"updated_log") or not job.updated_log: # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet) - # order path_to_logs by name and get the two last element - log_file = False - if hasattr(job, "x11") and job.x11: - job.updated_log = True - return - if job.wrapper_type == "vertical" and job.fail_count > 0: - for log_recovered in self.path_to_logs.glob(f"{job.name}.*._{job.fail_count}.out"): - if job.local_logs[0][-4] in log_recovered.name: - log_file = True - break - else: - for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"): - if job.local_logs[0] == log_recovered.name: - log_file = True - break + if not hasattr(job, "updated_log"): # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet) + job.updated_log = False + elif job.updated_log: + return + if hasattr(job, "x11") and job.x11: # X11 has it log writted in the run.out file. No need to check for log files as there are none + job.updated_log = True + return + log_recovered = self.check_if_log_is_recovered(job) + if log_recovered: + job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") # we only want the last one + job.updated_log = True + elif new_run and not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": + job.platform.add_job_to_log_recover(job) + return log_recovered + + def check_if_log_is_recovered(self, job: Job) -> Path: + """ + Check if the log is recovered. + + Conditions: + - File must exist. + - File timestamp should be greater than the job ready_date, otherwise it is from a previous run. + + Args: + job (Job): The job object to check the log for. + + Returns: + Path: The path to the recovered log file if found, otherwise None. + """ - if log_file: - if not hasattr(job, "ready_start_date") or not job.ready_start_date or job.local_logs[0] >= job.ready_start_date: # hasattr for backward compatibility - job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") - job.updated_log = True - if not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": - job.platform.add_job_to_log_recover(job) + if not hasattr(job, "updated_log") or not job.updated_log: + for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"): + file_timestamp = int(datetime.datetime.fromtimestamp(log_recovered.stat().st_mtime).strftime("%Y%m%d%H%M%S")) + if job.ready_date and file_timestamp >= int(job.ready_date): + return log_recovered + return None def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False): # type: (AutosubmitConfig, bool, bool, object, bool) -> bool @@ -2725,7 +2726,6 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter if self.update_from_file(store_change): save = store_change Log.debug('Updating FAILED jobs') - write_log_status = False if not first_time: for job in self.get_failed(): job.packed = False @@ -2790,7 +2790,6 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter for job in self.check_special_status(): job.status = Status.READY # Run start time in format (YYYYMMDDHH:MM:SS) from current time - job.ready_start_date = strftime("%Y%m%d%H%M%S") job.id = None job.packed = False job.wrapper_type = None @@ -2802,8 +2801,6 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter # Log name has this format: # a02o_20000101_fc0_2_SIM.20240212115021.err # $jobname.$(YYYYMMDDHHMMSS).err or .out - if not first_time: - self.update_log_status(job, as_conf) if job.synchronize is not None and len(str(job.synchronize)) > 0: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): @@ -2892,7 +2889,6 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter job.status = Status.READY job.packed = False # Run start time in format (YYYYMMDDHH:MM:SS) from current time - job.ready_start_date = strftime("%Y%m%d%H%M%S") job.packed = False job.hold = False save = True @@ -2985,6 +2981,9 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter job.status = Status.SKIPPED save = True # save = True + # Needed so the main process can know if the job was downloaded + for job in self.get_ready(): + job.set_ready_date() self.update_two_step_jobs() Log.debug('Update finished') return save diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 50ffc81c1..b2315a20a 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -17,6 +17,8 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import collections + +from autosubmit.job.job import Job from log.log import Log, AutosubmitCritical from autosubmit.job.job_common import Status, Type from bscearth.utils.date import sum_str_hours @@ -28,7 +30,6 @@ from typing import List from contextlib import suppress - class JobPackager(object): """ Main class that manages Job wrapping. @@ -545,6 +546,10 @@ def build_packages(self): section_jobs_to_submit = dict() for job in [job for job in jobs_ready]: + for event in job.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() + if job.section not in section_jobs_to_submit: # This is to fix TOTAL_JOBS when is set at job_level # Only for non-wrapped jobs job.update_parameters(self._as_config, {}) # Ensure to have the correct processors for the wrapper building code @@ -604,7 +609,7 @@ def build_packages(self): elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section, wrapper_info=current_info)) else: - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) + built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits, wrapper_info=current_info) if len(built_packages_tmp) > 0: Log.result(f"Built {len(built_packages_tmp)} wrappers for {wrapper_name}") packages_to_submit, max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp, packages_to_submit, max_jobs_to_submit, wrapper_limits, any_simple_packages) @@ -833,8 +838,13 @@ def build_vertical_package(self, job, wrapper_info): stack = [(job, 1)] while stack: job, level = stack.pop() - if level % 10 == 0 and level > 0: + # Less verbose + if level % 50 == 0 and level > 0: Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") + for event in job.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() + if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= \ self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits[ "max"]: @@ -843,7 +853,8 @@ def build_vertical_package(self, job, wrapper_info): if child is not None and len(str(child)) > 0: child.update_parameters(wrapper_info[-1], {}) self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) - if self.total_wallclock <= self.max_wallclock: + # Local jobs could not have a wallclock defined + if self.total_wallclock <= self.max_wallclock or not self.max_wallclock: child.packed = True child.level = level self.jobs_list.append(child) @@ -945,32 +956,25 @@ def __init__(self, dict_jobs, ready_job, jobs_list, total_wallclock, max_jobs, w self.index = 0 - def get_wrappable_child(self, job): + def get_wrappable_child(self, job: Job) -> Job: """ - Goes through the jobs with the same date and member than the input job, and return the first that satisfies self._is_wrappable() + Goes through the jobs with the same date and member as the input job, and returns the first that satisfies self._is_wrappable(). - :param job: job to be evaluated. \n - :type job: Job Object \n - :return: job that is wrappable. \n - :rtype: Job Object + Args: + job (Job): Job to be evaluated. + + Returns: + Optional[Any]: Job that is wrappable, or None if no such job is found. """ sorted_jobs = self.sorted_jobs - + child = None for index in range(self.index, len(sorted_jobs)): - child = sorted_jobs[index] - if self._is_wrappable(child): + child_ = sorted_jobs[index] + if child_.name != job.name and self._is_wrappable(child_): + child = child_ self.index = index + 1 - return child - continue - return None - # Not passing tests but better wrappers result to check - # for child in job.children: - # if child.name != job.name: - # if self._is_wrappable(child): - # self.index = self.index + 1 - # return child - # continue - # return None + break + return child def _is_wrappable(self, job): """ @@ -1026,8 +1030,11 @@ def build_horizontal_package(self, horizontal_vertical=False,wrapper_info=[]): for section in jobs_by_section: current_package_by_section[section] = 0 for job in jobs_by_section[section]: - if jobs_processed % 10 == 0 and jobs_processed > 0: + if jobs_processed % 50 == 0 and jobs_processed > 0: Log.info(f"Wrapper package creation is still ongoing. So far {jobs_processed} jobs have been wrapped.") + for event in job.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() job.update_parameters(wrapper_info[-1], {}) if str(job.processors).isdigit() and str(job.nodes).isdigit() and int(job.nodes) > 0 and int(job.processors) <= 1: job.processors = 0 diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 17bffecc5..49dd3a5cc 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -31,7 +31,7 @@ from autosubmit.job.job import Job from bscearth.utils.date import sum_str_hours from threading import Thread, Lock -from typing import List +from typing import List, Dict import multiprocessing import tarfile import datetime @@ -219,7 +219,13 @@ def _send_files(self): def _do_submission(self,job_scripts=None, hold=False): """ Submit package to the platform. """ - + def process_jobs_to_submit(self, job_id: str, hold: bool = False) -> None: + for i, job in enumerate(self.jobs): + job.hold = hold + job.id = str(job_id) + job.status = Status.SUBMITTED + if hasattr(self, "name"): # TODO change this check for a property that checks if it is a wrapper or not, the same change has to be done in other parts of the code + job.wrapper_name = self.name class JobPackageSimple(JobPackageBase): """ @@ -230,6 +236,7 @@ def __init__(self, jobs): super(JobPackageSimple, self).__init__(jobs) self._job_scripts = {} self.export = jobs[0].export + # self.name = "simple_package" TODO this should be possible, but it crashes accross the code. Add a property that defines what is a package with wrappers def _create_scripts(self, configuration): for job in self.jobs: @@ -245,12 +252,20 @@ def _send_files(self): full_path = os.path.join(self._tmp_path,filename ) + "_" + job.name[5:] self.platform.send_file(os.path.join(self._tmp_path, full_path)) - def _do_submission(self, job_scripts="", hold=False): + def _do_submission(self, job_scripts: Dict[str, str] = "", hold: bool = False) -> None: + """ + Submits jobs to the platform, cleans previous run logs and stats files and updates job status. + + Args: + job_scripts (Dict[str, str]): Dictionary of job scripts, defaults to an empty string. + hold (bool): If True, the job won't immediately start, defaults to False. + """ if len(job_scripts) == 0: job_scripts = self._job_scripts for job in self.jobs: + # This sets the log names but also the submission time for non-vertical wrapped jobs. job.update_local_logs() - #CLEANS PREVIOUS RUN ON LOCAL + # Clean previous run logs on local log_completed = os.path.join(self._tmp_path, job.name + '_COMPLETED') log_stat = os.path.join(self._tmp_path, job.name + '_STAT') if os.path.exists(log_completed): @@ -259,12 +274,15 @@ def _do_submission(self, job_scripts="", hold=False): os.remove(log_stat) self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) + + # Submit job to the platform job.id = self.platform.submit_job(job, job_scripts[job.name], hold=hold, export = self.export) if job.id is None or not job.id: continue Log.info("{0} submitted", job.name) job.status = Status.SUBMITTED job.wrapper_name = job.name + job.id = str(job.id) @@ -343,7 +361,14 @@ def _send_files(self): self.platform.send_file(self._job_inputs[job.name]) self.platform.send_file(self._common_script) - def _do_submission(self, job_scripts=None, hold=False): + def _do_submission(self, job_scripts: Dict[str, str] = None, hold: bool = False) -> None: + """ + Submits jobs to the platform, cleans previous run logs, and updates job status. + + Args: + job_scripts (Optional[Dict[str, str]]): Dictionary of job scripts, defaults to None. + hold (bool): If True, holds the job submission, defaults to False. + """ for job in self.jobs: job.update_local_logs() self.platform.remove_stat_file(job) @@ -358,6 +383,7 @@ def _do_submission(self, job_scripts=None, hold=False): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) self.jobs[i].status = Status.SUBMITTED + # Identify to which wrapper this job belongs once it is in the recovery queue self.jobs[i].wrapper_name = self.name @@ -571,7 +597,14 @@ def _send_files(self): self.platform.send_file(self._common_script) - def _do_submission(self, job_scripts=None, hold=False): + def _do_submission(self, job_scripts: Dict[str, str] = None, hold: bool = False) -> None: + """ + Submits jobs to the platform, cleans previous run logs, and updates job status. + + Args: + job_scripts [Dict[str, str]]: Dictionary of job scripts, defaults to None. + hold (bool): If True, the job won't start inmediatly, defaults to False. + """ if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: @@ -660,7 +693,14 @@ def _send_files(self): self.platform.send_file(self._job_scripts[job.name]) self.platform.send_file(self._common_script) - def _do_submission(self, job_scripts=None, hold=False): + def _do_submission(self, job_scripts: Dict[str, str] = None, hold: bool = False) -> None: + """ + Submits jobs to the platform, cleans previous run logs, and updates job status. + + Args: + job_scripts (Optional[Dict[str, str]]): Dictionary of job scripts, defaults to None. + hold (bool): If True, the job won't start immediately, defaults to False. + """ for job in self.jobs: job.update_local_logs() self.platform.remove_stat_file(job) @@ -851,4 +891,3 @@ def _common_script_content(self): jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) - diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index dd5954a6f..45dbc88ca 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - # Copyright 2017-2020 Earth Sciences Department, BSC-CNS # This file is part of Autosubmit. @@ -18,6 +17,8 @@ # along with Autosubmit. If not, see . import os import subprocess +from typing import Any + from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException from log.log import Log,AutosubmitError from autosubmit.platforms.headers.ec_header import EcHeader @@ -169,12 +170,8 @@ def connect(self, as_conf, reconnect=False): self.connected = False except Exception: self.connected = False - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', - "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND","").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) + def restore_connection(self,as_conf): """ @@ -194,34 +191,14 @@ def restore_connection(self,as_conf): except Exception: self.connected = False - def test_connection(self,as_conf): + def test_connection(self, as_conf: Any) -> None: """ - In this case, it does nothing because connection is established for each command + Tests the connection using the provided configuration. - :return: True - :rtype: bool + Args: + as_conf (AutosubmitConfig): The configuration to use for testing the connection. """ - self.main_process_id = os.getpid() - output = subprocess.check_output(self._checkvalidcert_cmd, shell=True).decode(locale.getlocale()[1]) - if not output: - output = "" - try: - if output.lower().find("yes") != -1: - self.connected = True - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', - "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": - self.recover_job_logs() - return "OK" - else: - self.connected = False - return "Invalid certificate" - except Exception: - self.connected = False - return "Invalid certificate" - + self.connect(as_conf) def check_remote_permissions(self): try: diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 61f96832a..94e6c7aa2 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -21,13 +21,10 @@ from pathlib import Path from xml.dom.minidom import parseString import subprocess - from matplotlib.patches import PathPatch - from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.local_header import LocalHeader from autosubmit.platforms.wrappers.wrapper_factory import LocalWrapperFactory - from autosubmitconfigparser.config.basicconfig import BasicConfig from time import sleep from log.log import Log, AutosubmitError @@ -106,17 +103,19 @@ def jobs_in_queue(self): return [int(element.firstChild.nodeValue) for element in jobs_xml] def get_submit_cmd(self, job_script, job, hold=False, export=""): - if job: + if job: # Not intuitive at all, but if it is not a job, it is a wrapper wallclock = self.parse_time(job.wallclock) seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) else: - seconds = 24 * 3600 + # TODO for another branch this, it is to add a timeout to the wrapped jobs even if the wallclock is 0, default to 2 days + seconds = 60*60*24*2 if export == "none" or export == "None" or export is None or export == "": export = "" else: export += " ; " - command = self.get_call(job_script, job, export=export,timeout=seconds) + command = self.get_call(job_script, job, export=export, timeout=seconds) return f"cd {self.remote_log_dir} ; {command}" + def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) @@ -153,8 +152,18 @@ def send_command(self, command, ignore_log=False, x11 = False): return True - def send_file(self, filename, check=True): - command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}' + def send_file(self, filename: str, check: bool = True) -> bool: + """ + Sends a file to a specified location using a command. + + Args: + filename (str): The name of the file to send. + check (bool): Unused in this platform + + Returns: + bool: True if the file was sent successfully. + """ + command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}; chmod 770 {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}' try: subprocess.check_call(command, shell=True) except subprocess.CalledProcessError: @@ -164,7 +173,17 @@ def send_file(self, filename, check=True): raise return True - def remove_multiple_files(self, filenames): + def remove_multiple_files(self, filenames: str) -> str: + """ + Creates a shell script to remove multiple files in the remote and sets the appropriate permissions. + + Args: + filenames (str): A string containing the filenames to be removed. + + Returns: + str: An empty string. + """ + # This function is a copy of the slurm one log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) multiple_delete_previous_run = os.path.join( log_dir, "multiple_delete_previous_run.sh") @@ -198,20 +217,20 @@ def check_remote_permissions(self): return True # Moves .err .out - def check_file_exists(self, src, wrapper_failed=False, sleeptime=1, max_retries=1): + def check_file_exists(self, src: str, wrapper_failed: bool = False, sleeptime: int = 1, max_retries: int = 1) -> bool: """ - Checks if a file exists in the platform - :param src: source name - :type src: str - :param wrapper_failed: checks inner jobs files - :type wrapper_failed: bool - :param sleeptime: time to sleep - :type sleeptime: int - :param max_retries: maximum number of retries - :type max_retries: int - :return: True if the file exists, False otherwise - :rtype: bool + Checks if a file exists in the platform. + + Args: + src (str): Source name. + wrapper_failed (bool): Checks inner jobs files. Defaults to False. + sleeptime (int): Time to sleep between retries. Defaults to 1. + max_retries (int): Maximum number of retries. Defaults to 1. + + Returns: + bool: True if the file exists, False otherwise. """ + # This function has a short sleep as the files are locally sleeptime = 1 for i in range(max_retries): if os.path.isfile(os.path.join(self.get_files_path(), src)): @@ -281,7 +300,18 @@ def get_logs_files(self, exp_id, remote_logs): """ return - def check_completed_files(self, sections=None): + def check_completed_files(self, sections: str = None) -> str: + """ + Checks for completed files in the remote log directory. + This function is used to check inner_jobs of a wrapper. + + Args: + sections[str]: Space-separated string of sections to check for completed files. Defaults to None. + + Returns: + str: The output if the command is successful, None otherwise. + """ + # Clone of the slurm one. command = "find %s " % self.remote_log_dir if sections: for i, section in enumerate(sections.split()): @@ -294,4 +324,4 @@ def check_completed_files(self, sections=None): if self.send_command(command, True): return self._ssh_output else: - return None \ No newline at end of file + return None diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8b19730a8..eba3f9f95 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -95,6 +95,7 @@ def wrapper(self): return self._wrapper def reset(self): + self.closeConnection() self.connected = False self._ssh = None self._ssh_config = None @@ -527,13 +528,17 @@ def submit_job(self, job, script_name, hold=False, export="none"): Log.debug(f"Submitting job with the command: {cmd}") if cmd is None: return None - if self.send_command(cmd,x11=x11): + if self.send_command(cmd, x11=x11): x11 = False if job is None else job.x11 - job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=x11) - Log.debug("Job ID: {0}", job_id) + job_id = self.get_submitted_job_id(self.get_ssh_output(), x11 ) + if job: + Log.result(f"Job: {job.name} submitted with job_id: {job_id}") + else: + Log.result(f"Job submitted with job_id: {job_id}") return int(job_id) else: return None + def get_job_energy_cmd(self, job_id): return self.get_ssh_output() @@ -580,6 +585,7 @@ def parse_estimated_time(self, output): :rtype: """ raise NotImplementedError + def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False): """ Checks job running status @@ -597,6 +603,9 @@ def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold :rtype: autosubmit.job.job_common.Status """ + for event in job.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() job_id = job.id job_status = Status.UNKNOWN if type(job_id) is not int and type(job_id) is not str: @@ -622,11 +631,12 @@ def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: # The Local platform has only 0 or 1, so it neccesary to look for the completed file. - # Not sure why it is called over_wallclock but is the only way to return a value - if self.type == "local": # wrapper has a different check completion + if self.type == "local": if not job.is_wrapper: + # Not sure why it is called over_wallclock but is the only way to return a value job_status = job.check_completion(over_wallclock=True) else: + # wrapper has a different file name if Path(f"{self.remote_log_dir}/WRAPPER_FAILED").exists(): job_status = Status.FAILED else: @@ -667,8 +677,8 @@ def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold if job_status in [Status.FAILED, Status.COMPLETED, Status.UNKNOWN]: job.updated_log = False - # backup for end time in case that the stat file is not found - job.end_time_placeholder = int(time.time()) + # backup for end time in case that the second row of the stat file is not found due a failure + job.end_time_timestamp = int(time.time()) if job_status in [Status.RUNNING, Status.COMPLETED] and job.new_status in [Status.QUEUING, Status.SUBMITTED]: # backup for start time in case that the stat file is not found job.start_time_timestamp = int(time.time()) @@ -1237,7 +1247,7 @@ def get_call(self, job_script, job, export="none",timeout=-1): :return: command to execute script :rtype: str """ - if job: # If job is None, it is a wrapper + if job: # If job is None, it is a wrapper. ( 0 clarity there, to be improved in a rework TODO ) executable = '' if job.type == Type.BASH: executable = 'bash' diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 7291377b7..b769cf792 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -147,12 +147,7 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error sleep(10) for package in valid_packages_to_submit: - for job in package.jobs: - job.hold = hold - job.id = str(jobs_id[i]) - job.status = Status.SUBMITTED - job.wrapper_name = package.name - + package.process_jobs_to_submit(jobs_id[i], hold) i += 1 save = True except AutosubmitError as e: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 9af2ca484..61dd23760 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,44 +1,66 @@ import atexit import multiprocessing import queue # only for the exception -import psutil +from copy import copy +from os import _exit import setproctitle import locale import os import traceback from autosubmit.job.job_common import Status -from typing import List, Union, Callable +from typing import List, Union, Callable, Set, Any from autosubmit.helpers.parameters import autosubmit_parameter +from autosubmitconfigparser.config.configcommon import AutosubmitConfig from log.log import AutosubmitCritical, AutosubmitError, Log from multiprocessing import Process, Event from multiprocessing.queues import Queue import time -class UniqueQueue( - Queue): # The reason of this class is to avoid duplicates in the queue during the same run. That can happen if the log retrieval process didn't process it yet. +class UniqueQueue(Queue): + """ + A queue that avoids retrieves the same job and retrial during the same run. + """ + + def __init__(self, maxsize: int = -1, block: bool = True, timeout: float = None): + """ + Initializes the UniqueQueue. - def __init__(self, maxsize=-1, block=True, timeout=None): + Args: + maxsize (int): Maximum size of the queue. Defaults to -1 (infinite size). + block (bool): Whether to block when the queue is full. Defaults to True. + timeout (float): Timeout for blocking operations. Defaults to None. + """ self.block = block self.timeout = timeout self.all_items = set() # Won't be popped, so even if it is being processed by the log retrieval process, it won't be added again. super().__init__(maxsize, ctx=multiprocessing.get_context()) - def put(self, job, block=True, timeout=None): - if job.wrapper_type == "vertical": + def put(self, job: Any, block: bool = True, timeout: float = None) -> None: + """ + Puts a job into the queue if it is not a duplicate. + + Args: + job (Any): The job to be added to the queue. + block (bool): Whether to block when the queue is full. Defaults to True. + timeout (float): Timeout for blocking operations. Defaults to None. + """ + if job.wrapper_type == "vertical": # We gather all retrials at once unique_name = job.name else: - unique_name = job.name+str(job.fail_count) + unique_name = job.name+str(job.fail_count) # We gather retrial per retrial if unique_name not in self.all_items: self.all_items.add(unique_name) - super().put(job, block, timeout) + super().put(copy(job), block, timeout) # Without copy, the process seems to modify the job for other retrials.. My guess is that the object is not serialized until it is get from the queue. class Platform(object): """ Class to manage the connections to the different platforms. """ + # This is a list of the keep_alive events, used to send the signal outside the main loop of Autosubmit worker_events = list() + # Shared lock between the main process and a retrieval log process lock = multiprocessing.Lock() def __init__(self, expid, name, config, auth_password=None): @@ -106,6 +128,8 @@ def __init__(self, expid, name, config, auth_password=None): self.pw = auth_password else: self.pw = None + + # Retrieval log process variables self.recovery_queue = UniqueQueue() self.log_retrieval_process_active = False self.main_process_id = None @@ -118,6 +142,7 @@ def __init__(self, expid, name, config, auth_password=None): @classmethod def update_workers(cls, event_worker): + # This is visible on all instances simultaneosly. Is to send the keep alive signal. cls.worker_events.append(event_worker) @property @@ -619,11 +644,15 @@ def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_faile else: return False - def remove_stat_file(self, job): + def remove_stat_file(self, job: Any) -> bool: """ - Removes *STAT* files from remote - param job: job to check - type job: Job + Removes STAT files from remote. + + Args: + job (Job): Job to check. + + Returns: + bool: True if the file was removed, False otherwise. """ if self.delete_file(job.stat_file): Log.debug(f"{job.stat_file} have been removed") @@ -822,7 +851,11 @@ def submit_Script(self, hold=False): raise NotImplementedError def add_job_to_log_recover(self, job): - self.recovery_queue.put(job) + if job.id and int(job.id) != 0: + self.recovery_queue.put(job) + else: + Log.warning(f"Job {job.name} and retrial number:{job.fail_count} has no job id. Autosubmit will no record this retrial.") + job.updated_log = True def connect(self, as_conf, reconnect=False): raise NotImplementedError @@ -830,93 +863,131 @@ def connect(self, as_conf, reconnect=False): def restore_connection(self, as_conf): raise NotImplementedError - def spawn_log_retrieval_process(self, as_conf): + def spawn_log_retrieval_process(self, as_conf: Any) -> None: """ - This function, spawn a process to recover the logs of the jobs that have been submitted in this platform. + Spawns a process to recover the logs of the jobs that have been completed on this platform. + + Args: + as_conf (AutosubmitConfig): Configuration object for the platform. """ if not self.log_retrieval_process_active and ( as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": self.log_retrieval_process_active = True + + # Adds the keep_alive signal here to be accessible by all the classes Platform.update_workers(self.work_event) self.log_recovery_process = Process(target=self.recover_platform_job_logs, args=(), name=f"{self.name}_log_recovery") self.log_recovery_process.daemon = True self.log_recovery_process.start() + + # Prevents zombies os.waitpid(self.log_recovery_process.pid, os.WNOHANG) Log.result(f"Process {self.log_recovery_process.name} started with pid {self.log_recovery_process.pid}") + # Cleanup will be automatically prompt on control + c or a normal exit atexit.register(self.send_cleanup_signal) + atexit.register(self.closeConnection) - def send_cleanup_signal(self): + def send_cleanup_signal(self) -> None: + """ + Sends a cleanup signal to the log recovery process if it is alive. + This function is executed by the atexit module + """ if self.log_recovery_process and self.log_recovery_process.is_alive(): self.work_event.clear() self.cleanup_event.set() self.log_recovery_process.join() - def wait_for_work(self, sleep_time=60): + def wait_for_work(self, sleep_time: int = 60) -> bool: """ - This function waits for the work_event to be set or the cleanup_event to be set. + Waits for the work_event to be set or the cleanup_event to be set. + + Args: + sleep_time (int): Minimum time to wait in seconds. Defaults to 60. + + Returns: + bool: True if there is work to process, False otherwise. """ process_log = False for remaining in range(sleep_time, 0, -1): # Min time to wait unless clean-up signal is set time.sleep(1) if self.work_event.is_set() or not self.recovery_queue.empty(): process_log = True - if self.cleanup_event.is_set(): + if self.cleanup_event.is_set(): # Since is the last stuff to process, do it asap. process_log = True break - if not process_log: # If no work, wait until the keep_alive_timeout is reached or any signal is set to end the process. + + if not process_log: # If still no work, active wait until the keep_alive_timeout is reached or any signal is set to end the process. timeout = self.keep_alive_timeout - sleep_time - while timeout > 0 or not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): + while timeout > 0: + if self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): + break time.sleep(1) timeout -= 1 - if not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): - process_log = True + self.work_event.clear() return process_log - def recover_job_log(self, identifier, jobs_pending_to_process): + def recover_job_log(self, identifier: str, jobs_pending_to_process: Set[Any]) -> Set[Any]: + """ + Recovers log files for jobs from the recovery queue and retry failed jobs. + + Args: + identifier (str): Identifier for logging purposes. + jobs_pending_to_process (Set[Any]): Set of jobs that had issues during log retrieval. + + Returns: + Set[Any]: Updated set of jobs pending to process. + """ job = None - try: - while not self.recovery_queue.empty(): + + while not self.recovery_queue.empty(): + try: + job = self.recovery_queue.get( + timeout=1) # Should be non-empty, but added a timeout for other possible errors. + job.children = set() # Children can't be serialized, so we set it to an empty set for this process. + job.platform = self # Change the original platform to this process platform. + job._log_recovery_retries = 0 # Reset the log recovery retries. try: - job = self.recovery_queue.get( - timeout=1) # Should be non-empty, but added a timeout for other possible errors. - job.children = set() # Children can't be serialized, so we set it to an empty set for this process. - job.platform = self # change the original platform to this process platform. - job._log_recovery_retries = 0 # reset the log recovery retries. - Log.debug(f"{identifier} Recovering log files for job {job.name} and retrial:{job.fail_count}") job.retrieve_logfiles(self, raise_error=True) - if job.status == Status.FAILED: - Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}") - except queue.Empty: - pass - # This second while is to keep retring the failed jobs. - while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval - job = jobs_pending_to_process.pop() - job._log_recovery_retries += 1 - Log.debug( - f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}") - job.retrieve_logfiles(self, raise_error=True) - Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}") - except Exception as e: - Log.info(f"{identifier} Error while recovering logs: {str(e)}") - try: - if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever. + Log.result( + f"{identifier} Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.") + except: jobs_pending_to_process.add(job) - self.connected = False - Log.info(f"{identifier} Attempting to restore connection") - self.restore_connection(None) # Always restore the connection on a failure. - Log.result(f"{identifier} Sucessfully reconnected.") - except: + job._log_recovery_retries += 1 + Log.warning(f"{identifier} (Retrial) Failed to recover log for job '{job.name}' and retry:'{job.fail_count}'.") + except queue.Empty: pass + + if len(jobs_pending_to_process) > 0: # Restore the connection if there was an issue with one or more jobs. + self.restore_connection(None) + + # This second while is to keep retring the failed jobs. + # With the unique queue, the main process won't send the job again, so we have to store it here. + while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval + job = jobs_pending_to_process.pop() + job._log_recovery_retries += 1 + try: + job.retrieve_logfiles(self, raise_error=True) + job._log_recovery_retries += 1 + except: + if job._log_recovery_retries < 5: + jobs_pending_to_process.add(job) + Log.warning( + f"{identifier} (Retrial) Failed to recover log for job '{job.name}' and retry '{job.fail_count}'.") + Log.result( + f"{identifier} (Retrial) Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.") + if len(jobs_pending_to_process) > 0: + self.restore_connection(None) # Restore the connection if there was an issue with one or more jobs. + return jobs_pending_to_process - def recover_platform_job_logs(self): + def recover_platform_job_logs(self) -> None: """ - This function, recovers the logs of the jobs that have been submitted. - The exit of this process is controlled by the work_event and cleanup_events of the main process. + Recovers the logs of the jobs that have been submitted. + When this is executed as a process, the exit is controlled by the work_event and cleanup_events of the main process. """ setproctitle.setproctitle(f"autosubmit log {self.expid} recovery {self.name.lower()}") identifier = f"{self.name.lower()}(log_recovery):" @@ -927,10 +998,13 @@ def recover_platform_job_logs(self): Log.get_logger("Autosubmit") # Log needs to be initialised in the new process Log.result(f"{identifier} Sucessfully connected.") log_recovery_timeout = self.config.get("LOG_RECOVERY_TIMEOUT", 60) + # Keep alive signal timeout is 5 minutes, but the sleeptime is 60 seconds. self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5) while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)): jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process) - if self.cleanup_event.is_set(): # Check if main process is waiting for this child to end. + if self.cleanup_event.is_set(): # Check if the main process is waiting for this child to end. self.recover_job_log(identifier, jobs_pending_to_process) break - Log.info(f"{identifier} Exiting.") \ No newline at end of file + self.closeConnection() + Log.info(f"{identifier} Exiting.") + _exit(0) # Exit userspace after manually closing ssh sockets, recommended for child processes, the queue() and shared signals should be in charge of the main process. diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index f44fb6629..ccae7a659 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -122,12 +122,9 @@ def connect(self, as_conf, reconnect=False): :rtype: bool """ self.connected = True - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', - "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND","").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) # This platform may be deprecated, so ignore the change + + def restore_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command @@ -144,7 +141,7 @@ def test_connection(self,as_conf): :return: True :rtype: bool """ - self.main_process_id = os.getpid() self.connected = True - self.connected(as_conf,True) + self.connected(as_conf,True) # This platform may be deprecated, so ignore the change + diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 1884a6739..3a01ff186 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -98,7 +98,6 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error :return: """ try: - valid_packages_to_submit = [ package for package in valid_packages_to_submit if package.x11 != True] if len(valid_packages_to_submit) > 0: duplicated_jobs_already_checked = False @@ -152,21 +151,21 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error if jobs_id is None or len(jobs_id) <= 0: raise AutosubmitError( "Submission failed, this can be due a failure on the platform", 6015,"Jobs_id {0}".format(jobs_id)) - i = 0 if hold: sleep(10) - + jobid_index = 0 for package in valid_packages_to_submit: + current_package_id = str(jobs_id[jobid_index]) if hold: retries = 5 - package.jobs[0].id = str(jobs_id[i]) + package.jobs[0].id = current_package_id try: can_continue = True while can_continue and retries > 0: - cmd = package.jobs[0].platform.get_queue_status_cmd(jobs_id[i]) + cmd = package.jobs[0].platform.get_queue_status_cmd(current_package_id) package.jobs[0].platform.send_command(cmd) queue_status = package.jobs[0].platform._ssh_output - reason = package.jobs[0].platform.parse_queue_reason(queue_status, jobs_id[i]) + reason = package.jobs[0].platform.parse_queue_reason(queue_status, current_package_id) if reason == '(JobHeldAdmin)': can_continue = False elif reason == '(JobHeldUser)': @@ -176,21 +175,16 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error sleep(5) retries = retries - 1 if not can_continue: - package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) - i = i + 1 + package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(current_package_id)) + jobid_index += 1 continue if not self.hold_job(package.jobs[0]): - i = i + 1 + jobid_index += 1 continue except Exception as e: - failed_packages.append(jobs_id) + failed_packages.append(current_package_id) continue - for job in package.jobs: - job.hold = hold - job.id = str(jobs_id[i]) - job.status = Status.SUBMITTED - job.wrapper_name = package.name - + package.process_jobs_to_submit(current_package_id, hold) # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -204,7 +198,7 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error self.send_command(self.cancel_job(id_)) # This can be faster if we cancel all jobs at once but there is no cancel_all_jobs call right now so todo in future Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled') Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted') - i += 1 + jobid_index += 1 if len(failed_packages) > 0: for job_id in failed_packages: platform.send_command(platform.cancel_cmd + " {0}".format(job_id)) @@ -214,6 +208,8 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error raise except AutosubmitCritical as e: raise + except AttributeError: + raise except Exception as e: raise AutosubmitError("{0} submission failed".format(self.name), 6015, str(e)) return save,valid_packages_to_submit @@ -256,9 +252,12 @@ def submit_job(self, job, script_name, hold=False, export="none"): cmd = self.get_submit_cmd(script_name, job, hold=hold, export=export) if cmd is None: return None - if self.send_command(cmd,x11=x11): - job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=x11) - Log.debug("Job ID: {0}", job_id) + if self.send_command(cmd, x11=x11): + job_id = self.get_submitted_job_id(self.get_ssh_output(), x11=x11) + if job: + Log.result(f"Job: {job.name} submitted with job_id: {job_id}") + else: + Log.result(f"Job submitted with job_id: {job_id}") return int(job_id) else: return None @@ -653,7 +652,20 @@ def wrapper_header(self,**kwargs): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): + def check_file_exists(self, filename: str, wrapper_failed: bool = False, sleeptime: int = 5, max_retries: int = 3) -> bool: + """ + Checks if a file exists on the FTP server. + + Args: + filename (str): The name of the file to check. + wrapper_failed (bool): Whether the wrapper has failed. Defaults to False. + sleeptime (int): Time to sleep between retries in seconds. Defaults to 5. + max_retries (int): Maximum number of retries. Defaults to 3. + + Returns: + bool: True if the file exists, False otherwise. + """ + # Todo in a future refactor, check the sleeptime retrials of these function, previously it was waiting a lot of time file_exist = False retries = 0 while not file_exist and retries < max_retries: @@ -671,8 +683,7 @@ def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_ret retries = retries + 1 except BaseException as e: # Unrecoverable error if str(e).lower().find("garbage") != -1: - sleep(sleeptime) - sleeptime = sleeptime + 5 + sleep(2) retries = retries + 1 else: file_exist = False # won't exist diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 0135e2976..204c760d4 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -21,6 +21,8 @@ import string import textwrap +from typing import List + class WrapperDirector: """ @@ -443,7 +445,18 @@ def _indent(self, text, amount, ch=' '): return ''.join(padding + line for line in text.splitlines(True)) class PythonVerticalWrapperBuilder(PythonWrapperBuilder): - def build_sequential_threads_launcher(self, jobs_list, thread, footer=True): #fastlook + def build_sequential_threads_launcher(self, jobs_list: List[str], thread: str, footer: bool = True) -> str: + """ + Builds a part of the vertical wrapper cmd launcher script. + This script writes the start and finish time of each inner_job. + Args: + jobs_list (List[str]): List of job scripts. + thread (str): inner_job to be executed. + footer (bool): If True, includes the footer in the script. Defaults to True. + + Returns: + str: Part of the final vertical wrapper script. + """ sequential_threads_launcher = textwrap.dedent(""" failed_wrapper = os.path.join(os.getcwd(),wrapper_id) retrials = {2} @@ -462,7 +475,7 @@ def build_sequential_threads_launcher(self, jobs_list, thread, footer=True): #fa start = int(time.time()) current.join({3}) total_steps = total_steps + 1 - """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) + """).format(jobs_list, thread, self.retrials, str(self.wallclock_by_level),'\n'.ljust(13)) if footer: sequential_threads_launcher += self._indent(textwrap.dedent(""" @@ -531,7 +544,7 @@ def run(self): out_path = os.path.join(os.getcwd(), out) err_path = os.path.join(os.getcwd(), err) template_path = os.path.join(os.getcwd(), self.template) - command = f"timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}" + command = f"chmod +x {{template_path}}; timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}" print(command) getstatusoutput(command) @@ -998,4 +1011,4 @@ def build_srun_launcher(self, jobs_list, footer=True): def build_main(self): nodelist = self.build_nodes_list() srun_launcher = self.build_srun_launcher("scripts_list") - return nodelist, srun_launcher \ No newline at end of file + return nodelist, srun_launcher diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 1bd97c831..deba83344 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -141,6 +141,58 @@ def threads_directive(self, threads): raise NotImplemented(self.exception) +class LocalWrapperFactory(WrapperFactory): + + def vertical_wrapper(self, **kwargs): + return PythonVerticalWrapperBuilder(**kwargs) + + def horizontal_wrapper(self, **kwargs): + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) + + def hybrid_wrapper_horizontal_vertical(self, **kwargs): + return PythonHorizontalVerticalWrapperBuilder(**kwargs) + + def hybrid_wrapper_vertical_horizontal(self, **kwargs): + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) + + def reservation_directive(self, reservation): + return '#' + + def dependency_directive(self, dependency): + return '#' + + def queue_directive(self, queue): + return '#' + + def processors_directive(self, processors): + return '#' + + def nodes_directive(self, nodes): + return '#' + + def tasks_directive(self, tasks): + return '#' + + def partition_directive(self, partition): + return '#' + + def exclusive_directive(self, exclusive): + return '#' + + def threads_directive(self, threads): + return '#' + + def header_directives(self, **kwargs): + return "" + + class LocalWrapperFactory(WrapperFactory): def vertical_wrapper(self, **kwargs): diff --git a/bin/autosubmit b/bin/autosubmit index 6f53abf1f..6fcb569d6 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -36,10 +36,11 @@ def main(): return_value = Autosubmit.parse_args() delete_lock_file() return_value = return_value if type(return_value) is int else 0 - os._exit(return_value) except BaseException as e: - exit_from_error(e) + return_value = exit_from_error(e) + return return_value if __name__ == "__main__": - main() + exit_code = main() + sys.exit(exit_code) # Sys.exit ensures a proper cleanup of the program, while os._exit() does not. diff --git a/test/unit/test_database_regression.py b/test/unit/test_database_regression.py deleted file mode 100644 index 44079537c..000000000 --- a/test/unit/test_database_regression.py +++ /dev/null @@ -1,264 +0,0 @@ -import shutil - -import pytest -from pathlib import Path -from autosubmit.autosubmit import Autosubmit -from log.log import Log -import os -import pwd -from autosubmit.platforms.locplatform import LocalPlatform - -from test.unit.utils.common import create_database, init_expid -import sqlite3 - - -def _get_script_files_path() -> Path: - return Path(__file__).resolve().parent / 'files' - - -# Maybe this should be a regression test - -@pytest.fixture -def db_tmpdir(tmpdir_factory): - folder = tmpdir_factory.mktemp(f'db_tests') - os.mkdir(folder.join('scratch')) - os.mkdir(folder.join('db_tmp_dir')) - file_stat = os.stat(f"{folder.strpath}") - file_owner_id = file_stat.st_uid - file_owner = pwd.getpwuid(file_owner_id).pw_name - folder.owner = file_owner - - # Write an autosubmitrc file in the temporary directory - autosubmitrc = folder.join('autosubmitrc') - autosubmitrc.write(f''' -[database] -path = {folder} -filename = tests.db - -[local] -path = {folder} - -[globallogs] -path = {folder} - -[structures] -path = {folder} - -[historicdb] -path = {folder} - -[historiclog] -path = {folder} - -[defaultstats] -path = {folder} - -''') - os.environ['AUTOSUBMIT_CONFIGURATION'] = str(folder.join('autosubmitrc')) - create_database(str(folder.join('autosubmitrc'))) - assert "tests.db" in [Path(f).name for f in folder.listdir()] - init_expid(str(folder.join('autosubmitrc')), platform='local', create=False, test_type='test') - assert "t000" in [Path(f).name for f in folder.listdir()] - return folder - - -@pytest.fixture -def prepare_db(db_tmpdir): - # touch as_misc - # remove files under t000/conf - conf_folder = Path(f"{db_tmpdir.strpath}/t000/conf") - shutil.rmtree(conf_folder) - os.makedirs(conf_folder) - platforms_path = Path(f"{db_tmpdir.strpath}/t000/conf/platforms.yml") - main_path = Path(f"{db_tmpdir.strpath}/t000/conf/main.yml") - # Add each platform to test - with platforms_path.open('w') as f: - f.write(f""" -PLATFORMS: - dummy: - type: dummy - """) - - with main_path.open('w') as f: - f.write(f""" -EXPERIMENT: - # List of start dates - DATELIST: '20000101' - # List of members. - MEMBERS: fc0 - # Unit of the chunk size. Can be hour, day, month, or year. - CHUNKSIZEUNIT: month - # Size of each chunk. - CHUNKSIZE: '4' - # Number of chunks of the experiment. - NUMCHUNKS: '3' - CHUNKINI: '' - # Calendar used for the experiment. Can be standard or noleap. - CALENDAR: standard - -CONFIG: - # Current version of Autosubmit. - AUTOSUBMIT_VERSION: "" - # Total number of jobs in the workflow. - TOTALJOBS: 20 - # Maximum number of jobs permitted in the waiting status. - MAXWAITINGJOBS: 20 - SAFETYSLEEPTIME: 1 -DEFAULT: - # Job experiment ID. - EXPID: "t000" - # Default HPC platform name. - HPCARCH: "local" - #hint: use %PROJDIR% to point to the project folder (where the project is cloned) - # Custom configuration location. -project: - # Type of the project. - PROJECT_TYPE: None - # Folder to hold the project sources. - PROJECT_DESTINATION: local_project -""") - expid_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000") - dummy_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/dummy_dir") - real_data = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/real_data") - # We write some dummy data inside the scratch_dir - os.makedirs(expid_dir, exist_ok=True) - os.makedirs(dummy_dir, exist_ok=True) - os.makedirs(real_data, exist_ok=True) - - with open(dummy_dir.joinpath('dummy_file'), 'w') as f: - f.write('dummy data') - # create some dummy absolute symlinks in expid_dir to test migrate function - (real_data / 'dummy_symlink').symlink_to(dummy_dir / 'dummy_file') - return db_tmpdir - - -@pytest.mark.parametrize("jobs_data, expected_count, final_status", [ - # Success - (""" - JOBS: - job: - SCRIPT: | - echo "Hello World" - sleep 1 - PLATFORM: local - DEPENDENCIES: job-1 - RUNNING: chunk - wallclock: 00:01 - retrials: 2 - """, 1, "COMPLETED"), - # Success wrapper - (""" - JOBS: - job: - SCRIPT: | - echo "Hello World" - sleep 1 - DEPENDENCIES: job-1 - PLATFORM: local - RUNNING: chunk - wallclock: 00:01 - retrials: 2 - wrappers: - wrapper: - JOBS_IN_WRAPPER: job - TYPE: vertical - """, 1, "COMPLETED"), - # Failure - (""" - JOBS: - job: - SCRIPT: | - echo "Hello World" - sleep 1 - exit 1 - DEPENDENCIES: job-1 - PLATFORM: local - RUNNING: chunk - wallclock: 00:01 - retrials: 2 - """, 3, "FAILED"), - # Failure wrappers - (""" - JOBS: - job: - SCRIPT: | - echo "Hello World" - sleep 1 - exit 1 - PLATFORM: local - DEPENDENCIES: job-1 - RUNNING: chunk - wallclock: 00:10 - retrials: 2 - wrappers: - wrapper: - JOBS_IN_WRAPPER: job - TYPE: vertical - """, 3, "FAILED"), -], ids=["Success", "Success with wrapper", "Failure", "Failure with wrapper"]) -def test_db(db_tmpdir, prepare_db, jobs_data, expected_count, final_status, mocker): - # write jobs_data - jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") - with jobs_path.open('w') as f: - f.write(jobs_data) - - # Create - init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True, test_type='test') - - # This is set in _init_log which is not called - as_misc = Path(f"{db_tmpdir.strpath}/t000/conf/as_misc.yml") - with as_misc.open('w') as f: - f.write(f""" - AS_MISC: True - ASMISC: - COMMAND: run - AS_COMMAND: run - """) - - # Run the experiment - with mocker.patch('autosubmit.platforms.platform.max', return_value=20): - Autosubmit.run_experiment(expid='t000') - - # Test database exists. - job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") - autosubmit_db = Path(f"{db_tmpdir.strpath}/tests.db") - assert job_data.exists() - assert autosubmit_db.exists() - - # Check job_data info - conn = sqlite3.connect(job_data) - conn.row_factory = sqlite3.Row - c = conn.cursor() - c.execute("SELECT * FROM job_data") - rows = c.fetchall() - # Convert rows to a list of dictionaries - rows_as_dicts = [dict(row) for row in rows] - # Tune the print so it is more readable, so it is easier to debug in case of failure - column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] - column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col - in column_names] - print(f"Experiment folder: {db_tmpdir.strpath}") - header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) - print(f"\n{header}") - print("-" * len(header)) - # Print the rows - for row_dict in rows_as_dicts: # always print, for debug proposes - print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) - for row_dict in rows_as_dicts: - # Check that all fields contain data, except extra_data, children, and platform_output - # Check that submit, start and finish are > 0 - assert int(row_dict["submit"]) > 0 and int(row_dict["finish"]) != 1970010101 - assert int(row_dict["start"]) > 0 and int(row_dict["finish"]) != 1970010101 - assert int(row_dict["finish"]) > 0 and int(row_dict["finish"]) != 1970010101 - assert row_dict["status"] == final_status - for key in [key for key in row_dict.keys() if - key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: - assert str(row_dict[key]) != str("") - # Check that the job_data table has the expected number of entries - c.execute("SELECT job_name, COUNT(*) as count FROM job_data GROUP BY job_name") - count_rows = c.fetchall() - for row in count_rows: - assert int(row["count"]) == expected_count - # Close the cursor and connection - c.close() - conn.close() diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 9980e7f68..c56d8cd54 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -4,6 +4,9 @@ import sys import tempfile from pathlib import Path + +import pytest + from autosubmit.job.job_list_persistence import JobListPersistencePkl import datetime @@ -1441,3 +1444,21 @@ def read(self): DEFAULT_PLATFORMS_CONF = '' DEFAULT_JOBS_CONF = '' STRUCTURES_DIR = '/dummy/structures/dir' + + +def test_update_stat_file(): + job = Job("dummyname", 1, Status.WAITING, 0) + job.fail_count = 0 + job.script_name = "dummyname.cmd" + job.wrapper_type = None + job.update_stat_file() + assert job.stat_file == "dummyname_STAT_0" + job.fail_count = 1 + job.update_stat_file() + assert job.stat_file == "dummyname_STAT_1" + job.wrapper_type = "vertical" + job.update_stat_file() + assert job.stat_file == "dummyname_STAT_0" + job.fail_count = 0 + job.update_stat_file() + assert job.stat_file == "dummyname_STAT_0" diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index 6aa372354..4e7b7072f 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -1,24 +1,22 @@ from unittest import TestCase -import os from pathlib import Path import inspect import tempfile -from mock import MagicMock, ANY +from mock import MagicMock from mock import patch from autosubmit.job.job import Job from autosubmit.job.job_common import Status from autosubmit.job.job_list import JobList from autosubmit.job.job_list_persistence import JobListPersistenceDb -from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, jobs_in_wrapper_str +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmitconfigparser.config.yamlparser import YAMLParserFactory import pytest from autosubmit.job.job_packages import jobs_in_wrapper_str - class FakeBasicConfig: def __init__(self): pass @@ -65,7 +63,6 @@ def setUpWrappers(self,options): self.platform.total_jobs = 100 self.as_conf.experiment_data["WRAPPERS"]["WRAPPERS"] = options self._wrapper_factory.as_conf = self.as_conf - self.jobs[0].wallclock = "00:00" self.jobs[0]._threads = "1" self.jobs[0].tasks = "1" @@ -87,9 +84,6 @@ def setUpWrappers(self,options): self.jobs[1].processors = "9" self.jobs[1]._processors = "9" self.jobs[1]._platform = self.platform - - - self.wrapper_type = options.get('TYPE', 'vertical') self.wrapper_policy = options.get('POLICY', 'flexible') self.wrapper_method = options.get('METHOD', 'ASThread') @@ -98,9 +92,6 @@ def setUpWrappers(self,options): self.job_package_wrapper = JobPackageVertical(self.jobs,configuration=self.as_conf,wrapper_info=[self.wrapper_type,self.wrapper_policy,self.wrapper_method,self.jobs_in_wrapper,self.extensible_wallclock]) self.job_list._ordered_jobs_by_date_member["WRAPPERS"] = dict() - - - def setUp(self): self.platform = MagicMock() self.platform.queue = "debug" @@ -155,8 +146,6 @@ def test_default_parameters(self): self.assertEqual(self.job_package_wrapper.tasks, "40") self.assertEqual(self.job_package_wrapper.custom_directives, ['#SBATCH --mem=1000']) - - def test_job_package_default_init(self): with self.assertRaises(Exception): JobPackageSimple([]) @@ -226,4 +215,4 @@ def test_jobs_in_wrapper_str(mock_as_conf): # Arrange current_wrapper = "current_wrapper" result = jobs_in_wrapper_str(mock_as_conf, current_wrapper) - assert result == "job1&job2&job3" \ No newline at end of file + assert result == "job1&job2&job3" diff --git a/test/unit/test_job_pytest.py b/test/unit/test_job_pytest.py index 421429fbc..71e2db21e 100644 --- a/test/unit/test_job_pytest.py +++ b/test/unit/test_job_pytest.py @@ -1,7 +1,9 @@ +from datetime import datetime, timedelta import pytest from autosubmit.job.job import Job from autosubmit.platforms.psplatform import PsPlatform +from pathlib import Path @pytest.mark.parametrize('experiment_data, expected_data', [( @@ -50,3 +52,67 @@ def test_update_parameters_current_variables(autosubmit_config, experiment_data, job.update_parameters(as_conf, {}) for key, value in expected_data.items(): assert job.parameters[key] == value + + +@pytest.mark.parametrize('test_with_file, file_is_empty, last_line_empty', [ + (False, False, False), + (True, True, False), + (True, False, False), + (True, False, True) +], ids=["no file", "file is empty", "file is correct", "file last line is empty"]) +def test_recover_last_ready_date(tmpdir, test_with_file, file_is_empty, last_line_empty): + job = Job('dummy', '1', 0, 1) + job._tmp_path = Path(tmpdir) + stat_file = job._tmp_path.joinpath(f'{job.name}_TOTAL_STATS') + ready_time = datetime.now() + timedelta(minutes=5) + ready_date = int(ready_time.strftime("%Y%m%d%H%M%S")) + expected_date = None + if test_with_file: + if file_is_empty: + stat_file.touch() + expected_date = datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + else: + if last_line_empty: + with stat_file.open('w') as f: + f.write(" ") + expected_date = datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + else: + with stat_file.open('w') as f: + f.write(f"{ready_date} {ready_date} {ready_date} COMPLETED") + expected_date = str(ready_date) + job.ready_date = None + job.recover_last_ready_date() + assert job.ready_date == expected_date + + +@pytest.mark.parametrize('test_with_logfiles, file_timestamp_greater_than_ready_date', [ + (False, False), + (True, True), + (True, False), +], ids=["no file", "log timestamp >= ready_date", "log timestamp < ready_date"]) +def test_recover_last_log_name(tmpdir, test_with_logfiles, file_timestamp_greater_than_ready_date): + job = Job('dummy', '1', 0, 1) + job._log_path = Path(tmpdir) + expected_local_logs = (f"{job.name}.out.0", f"{job.name}.err.0") + if test_with_logfiles: + if file_timestamp_greater_than_ready_date: + ready_time = datetime.now() - timedelta(minutes=5) + job.ready_date = str(ready_time.strftime("%Y%m%d%H%M%S")) + log_name = job._log_path.joinpath(f'{job.name}_{job.ready_date}') + expected_update_log = True + expected_local_logs = (log_name.with_suffix('.out').name, log_name.with_suffix('.err').name) + else: + expected_update_log = False + ready_time = datetime.now() + timedelta(minutes=5) + job.ready_date = str(ready_time.strftime("%Y%m%d%H%M%S")) + log_name = job._log_path.joinpath(f'{job.name}_{job.ready_date}') + log_name.with_suffix('.out').touch() + log_name.with_suffix('.err').touch() + else: + expected_update_log = False + + job.updated_log = False + job.recover_last_log_name() + assert job.updated_log == expected_update_log + assert job.local_logs[0] == str(expected_local_logs[0]) + assert job.local_logs[1] == str(expected_local_logs[1]) diff --git a/test/unit/test_log_recovery.py b/test/unit/test_log_recovery.py new file mode 100644 index 000000000..86c62984b --- /dev/null +++ b/test/unit/test_log_recovery.py @@ -0,0 +1,160 @@ +import time +import pytest +from pathlib import Path +import os +import pwd +from autosubmit.job.job_common import Status +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmit.job.job import Job + + +def _get_script_files_path() -> Path: + return Path(__file__).resolve().parent / 'files' + + +@pytest.fixture +def current_tmpdir(tmpdir_factory): + folder = tmpdir_factory.mktemp(f'tests') + os.mkdir(folder.join('scratch')) + file_stat = os.stat(f"{folder.strpath}") + file_owner_id = file_stat.st_uid + file_owner = pwd.getpwuid(file_owner_id).pw_name + folder.owner = file_owner + return folder + + +@pytest.fixture +def prepare_test(current_tmpdir): + # touch as_misc + platforms_path = Path(f"{current_tmpdir.strpath}/platforms_t000.yml") + jobs_path = Path(f"{current_tmpdir.strpath}/jobs_t000.yml") + project = "whatever" + scratch_dir = f"{current_tmpdir.strpath}/scratch" + Path(f"{scratch_dir}/{project}/{current_tmpdir.owner}").mkdir(parents=True, exist_ok=True) + Path(f"{scratch_dir}/LOG_t000").mkdir(parents=True, exist_ok=True) + Path(f"{scratch_dir}/LOG_t000/t000.cmd.out.0").touch() + Path(f"{scratch_dir}/LOG_t000/t000.cmd.err.0").touch() + + # Add each platform to test + with platforms_path.open('w') as f: + f.write(f""" +PLATFORMS: + pytest-ps: + type: ps + host: 127.0.0.1 + user: {current_tmpdir.owner} + project: {project} + scratch_dir: {scratch_dir} + """) + # add a job of each platform type + with jobs_path.open('w') as f: + f.write(f""" +JOBS: + base: + SCRIPT: | + echo "Hello World" + echo sleep 5 + QUEUE: hpc + PLATFORM: pytest-ps + RUNNING: once + wallclock: 00:01 +EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '4' + # Number of chunks of the experiment. + NUMCHUNKS: '2' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + """) + return current_tmpdir + + +@pytest.fixture +def local(prepare_test): + # Init Local platform + from autosubmit.platforms.locplatform import LocalPlatform + config = { + 'LOCAL_ROOT_DIR': f"{prepare_test}/scratch", + 'LOCAL_TMP_DIR': f"{prepare_test}/scratch", + } + local = LocalPlatform(expid='t000', name='local', config=config) + return local + + +@pytest.fixture +def as_conf(prepare_test, mocker): + mocker.patch('pathlib.Path.exists', return_value=True) + as_conf = AutosubmitConfig("test") + as_conf.experiment_data = as_conf.load_config_file(as_conf.experiment_data, + Path(prepare_test.join('platforms_t000.yml'))) + as_conf.misc_data = {"AS_COMMAND": "run"} + return as_conf + + +def test_log_recovery_no_keep_alive(prepare_test, local, mocker, as_conf): + mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.spawn_log_retrieval_process(as_conf) + assert local.log_recovery_process.is_alive() + time.sleep(2) + assert local.log_recovery_process.is_alive() is False + local.cleanup_event.set() + + +def test_log_recovery_keep_alive(prepare_test, local, mocker, as_conf): + mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 + local.spawn_log_retrieval_process(as_conf) + assert local.log_recovery_process.is_alive() + local.work_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() + local.work_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() + time.sleep(1) + assert local.log_recovery_process.is_alive() is False + local.cleanup_event.set() + + +def test_log_recovery_keep_alive_cleanup(prepare_test, local, mocker, as_conf): + mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 + local.spawn_log_retrieval_process(as_conf) + assert local.log_recovery_process.is_alive() + local.work_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() + local.work_event.set() + local.cleanup_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() is False + local.cleanup_event.set() + + +def test_log_recovery_recover_log(prepare_test, local, mocker, as_conf): + print(prepare_test.strpath) + mocker.patch('autosubmit.platforms.platform.max', return_value=0) + local.keep_alive_timeout = 20 + mocker.patch('autosubmit.job.job.Job.write_stats') # Tested in test_run_command_intregation.py + local.spawn_log_retrieval_process(as_conf) + local.work_event.set() + job = Job('t000', '0000', Status.COMPLETED, 0) + job.name = 'test_job' + job.platform = local + job.platform_name = 'local' + job.local_logs = ("t000.cmd.out.moved", "t000.cmd.err.moved") + job._init_runtime_parameters() + local.work_event.set() + local.add_job_to_log_recover(job) + local.cleanup_event.set() + local.log_recovery_process.join(30) # should exit earlier. + assert local.log_recovery_process.is_alive() is False + assert Path(f"{prepare_test.strpath}/scratch/LOG_t000/t000.cmd.out.moved").exists() + assert Path(f"{prepare_test.strpath}/scratch/LOG_t000/t000.cmd.err.moved").exists() diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py new file mode 100644 index 000000000..8b3852511 --- /dev/null +++ b/test/unit/test_packages.py @@ -0,0 +1,50 @@ +import mock +import pytest +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.job.job import Job + + +@pytest.fixture +def create_packages(mocker, autosubmit_config): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + } + } + as_conf = autosubmit_config("a000", exp_data) + jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in jobs: + job._platform = mocker.MagicMock() + job._platform.name = "dummy" + job.platform_name = "dummy" + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple([jobs[0]]), + JobPackageVertical(jobs, configuration=as_conf), + JobPackageHorizontal(jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_jobs_to_submit(create_packages): + packages = create_packages + jobs_id = [1, 2, 3] + for i, package in enumerate(packages): # Equivalent to valid_packages_to_submit but without the ghost jobs check etc. + package.process_jobs_to_submit(jobs_id[i], False) + for job in package.jobs: # All jobs inside a package must have the same id. + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None diff --git a/test/unit/test_pjm_platform_pytest.py b/test/unit/test_pjm_platform_pytest.py new file mode 100644 index 000000000..4ba25be88 --- /dev/null +++ b/test/unit/test_pjm_platform_pytest.py @@ -0,0 +1,88 @@ +import pytest + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.pjmplatform import PJMPlatform + + +@pytest.fixture +def as_conf(autosubmit_config, tmpdir): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "PLATFORMS": { + "pytest-slurm": { + "type": "slurm", + "host": "localhost", + "user": "user", + "project": "project", + "scratch_dir": "/scratch", + "QUEUE": "queue", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "00:01", + "TEMP_DIR": "", + "MAX_PROCESSORS": 99999, + }, + }, + "LOCAL_ROOT_DIR": str(tmpdir), + "LOCAL_TMP_DIR": str(tmpdir), + "LOCAL_PROJ_DIR": str(tmpdir), + "LOCAL_ASLOG_DIR": str(tmpdir), + } + as_conf = autosubmit_config("dummy-expid", exp_data) + return as_conf + + +@pytest.fixture +def pjm_platform(as_conf): + platform = PJMPlatform(expid="dummy-expid", name='pytest-slurm', config=as_conf.experiment_data) + return platform + + +@pytest.fixture +def create_packages(as_conf, pjm_platform): + simple_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0)] + vertical_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + horizontal_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in simple_jobs + vertical_jobs + horizontal_jobs: + job._platform = pjm_platform + job._platform.name = pjm_platform.name + job.platform_name = pjm_platform.name + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(simple_jobs), + JobPackageVertical(vertical_jobs, configuration=as_conf), + JobPackageHorizontal(horizontal_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, pjm_platform, as_conf, create_packages): + valid_packages_to_submit = create_packages + failed_packages = [] + pjm_platform.get_jobid_by_jobname = mocker.MagicMock() + pjm_platform.send_command = mocker.MagicMock() + pjm_platform.submit_Script = mocker.MagicMock() + jobs_id = [1, 2, 3] + pjm_platform.submit_Script.return_value = jobs_id + pjm_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages) + for i, package in enumerate(valid_packages_to_submit): + for job in package.jobs: + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None + assert failed_packages == [] diff --git a/test/unit/test_run_command_intregation.py b/test/unit/test_run_command_intregation.py new file mode 100644 index 000000000..c2c4dcf41 --- /dev/null +++ b/test/unit/test_run_command_intregation.py @@ -0,0 +1,402 @@ +import shutil +import pytest +from pathlib import Path +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmit.autosubmit import Autosubmit +import os +import pwd +from test.unit.utils.common import create_database, init_expid +import sqlite3 + + +def _get_script_files_path() -> Path: + return Path(__file__).resolve().parent / 'files' + + +# TODO expand the tests to test Slurm, PSPlatform, Ecplatform whenever possible + +@pytest.fixture +def run_tmpdir(tmpdir_factory): + folder = tmpdir_factory.mktemp('run_tests') + os.mkdir(folder.join('scratch')) + os.mkdir(folder.join('run_tmp_dir')) + file_stat = os.stat(f"{folder.strpath}") + file_owner_id = file_stat.st_uid + file_owner = pwd.getpwuid(file_owner_id).pw_name + folder.owner = file_owner + + # Write an autosubmitrc file in the temporary directory + autosubmitrc = folder.join('autosubmitrc') + autosubmitrc.write(f''' +[database] +path = {folder} +filename = tests.db + +[local] +path = {folder} + +[globallogs] +path = {folder} + +[structures] +path = {folder} + +[historicdb] +path = {folder} + +[historiclog] +path = {folder} + +[defaultstats] +path = {folder} + +''') + os.environ['AUTOSUBMIT_CONFIGURATION'] = str(folder.join('autosubmitrc')) + create_database(str(folder.join('autosubmitrc'))) + assert "tests.db" in [Path(f).name for f in folder.listdir()] + init_expid(str(folder.join('autosubmitrc')), platform='local', create=False, test_type='test') + assert "t000" in [Path(f).name for f in folder.listdir()] + return folder + + +@pytest.fixture +def prepare_run(run_tmpdir): + # touch as_misc + # remove files under t000/conf + conf_folder = Path(f"{run_tmpdir.strpath}/t000/conf") + shutil.rmtree(conf_folder) + os.makedirs(conf_folder) + platforms_path = Path(f"{run_tmpdir.strpath}/t000/conf/platforms.yml") + main_path = Path(f"{run_tmpdir.strpath}/t000/conf/AAAmain.yml") + # Add each platform to test + with platforms_path.open('w') as f: + f.write(f""" +PLATFORMS: + dummy: + type: dummy + """) + + with main_path.open('w') as f: + f.write(""" +EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '2' + # Number of chunks of the experiment. + NUMCHUNKS: '3' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + +CONFIG: + # Current version of Autosubmit. + AUTOSUBMIT_VERSION: "" + # Total number of jobs in the workflow. + TOTALJOBS: 3 + # Maximum number of jobs permitted in the waiting status. + MAXWAITINGJOBS: 3 + SAFETYSLEEPTIME: 0 +DEFAULT: + # Job experiment ID. + EXPID: "t000" + # Default HPC platform name. + HPCARCH: "local" + #hint: use %PROJDIR% to point to the project folder (where the project is cloned) + # Custom configuration location. +project: + # Type of the project. + PROJECT_TYPE: None + # Folder to hold the project sources. + PROJECT_DESTINATION: local_project +""") + expid_dir = Path(f"{run_tmpdir.strpath}/scratch/whatever/{run_tmpdir.owner}/t000") + dummy_dir = Path(f"{run_tmpdir.strpath}/scratch/whatever/{run_tmpdir.owner}/t000/dummy_dir") + real_data = Path(f"{run_tmpdir.strpath}/scratch/whatever/{run_tmpdir.owner}/t000/real_data") + # We write some dummy data inside the scratch_dir + os.makedirs(expid_dir, exist_ok=True) + os.makedirs(dummy_dir, exist_ok=True) + os.makedirs(real_data, exist_ok=True) + + with open(dummy_dir.joinpath('dummy_file'), 'w') as f: + f.write('dummy data') + # create some dummy absolute symlinks in expid_dir to test migrate function + (real_data / 'dummy_symlink').symlink_to(dummy_dir / 'dummy_file') + return run_tmpdir + + +def check_db_fields(run_tmpdir, expected_entries, final_status) -> dict: + """ + Check that the database contains the expected number of entries, and that all fields contain data after a completed run. + """ + db_check_list = {} + # Test database exists. + job_data = Path(f"{run_tmpdir.strpath}/job_data_t000.db") + autosubmit_db = Path(f"{run_tmpdir.strpath}/tests.db") + db_check_list["JOB_DATA_EXIST"] = job_data.exists() + db_check_list["AUTOSUBMIT_DB_EXIST"] = autosubmit_db.exists() + + # Check job_data info + conn = sqlite3.connect(job_data) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute("SELECT * FROM job_data") + rows = c.fetchall() + db_check_list["JOB_DATA_ENTRIES"] = len(rows) == expected_entries + # Convert rows to a list of dictionaries + rows_as_dicts = [dict(row) for row in rows] + # Tune the print so it is more readable, so it is easier to debug in case of failure + db_check_list["JOB_DATA_FIELDS"] = {} + counter_by_name = {} + for row_dict in rows_as_dicts: + # Check that all fields contain data, except extra_data, children, and platform_output + # Check that submit, start and finish are > 0 + if row_dict["job_name"] not in counter_by_name: + counter_by_name[row_dict["job_name"]] = 0 + if row_dict["job_name"] not in db_check_list["JOB_DATA_FIELDS"]: + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]] = {} + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]][str(counter_by_name[row_dict["job_name"]])] = {} + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]][str(counter_by_name[row_dict["job_name"]])]["submit"] = row_dict["submit"] > 0 and row_dict["submit"] != 1970010101 + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]][str(counter_by_name[row_dict["job_name"]])]["start"] = row_dict["start"] > 0 and row_dict["start"] != 1970010101 + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]][str(counter_by_name[row_dict["job_name"]])]["finish"] = row_dict["finish"] > 0 and row_dict["finish"] != 1970010101 + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]][str(counter_by_name[row_dict["job_name"]])]["status"] = row_dict["status"] == final_status + empty_fields = [] + for key in [key for key in row_dict.keys() if + key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: + if str(row_dict[key]) == str(""): + empty_fields.append(key) + db_check_list["JOB_DATA_FIELDS"][row_dict["job_name"]][str(counter_by_name[row_dict["job_name"]])]["empty_fields"] = " ".join(empty_fields) + counter_by_name[row_dict["job_name"]] += 1 + print_db_results(db_check_list, rows_as_dicts, run_tmpdir) + c.close() + conn.close() + return db_check_list + + +def print_db_results(db_check_list, rows_as_dicts, run_tmpdir): + """ + Print the database check results. + """ + column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] + column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col + in column_names] + print(f"Experiment folder: {run_tmpdir.strpath}") + header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) + print(f"\n{header}") + print("-" * len(header)) + # Print the rows + for row_dict in rows_as_dicts: # always print, for debug proposes + print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) + # Print the results + print("\nDatabase check results:") + print(f"JOB_DATA_EXIST: {db_check_list['JOB_DATA_EXIST']}") + print(f"AUTOSUBMIT_DB_EXIST: {db_check_list['AUTOSUBMIT_DB_EXIST']}") + print(f"JOB_DATA_ENTRIES_ARE_CORRECT: {db_check_list['JOB_DATA_ENTRIES']}") + + for job_name in db_check_list["JOB_DATA_FIELDS"]: + for job_counter in db_check_list["JOB_DATA_FIELDS"][job_name]: + all_ok = True + for field in db_check_list["JOB_DATA_FIELDS"][job_name][job_counter]: + if field == "empty_fields": + if len(db_check_list['JOB_DATA_FIELDS'][job_name][job_counter][field]) > 0: + all_ok = False + print(f"{field} assert FAILED") + else: + if not db_check_list['JOB_DATA_FIELDS'][job_name][job_counter][field]: + all_ok = False + print(f"{field} assert FAILED") + if int(job_counter) > 0: + print(f"Job entry: {job_name} retrial: {job_counter} assert {str(all_ok).upper()}") + else: + print(f"Job entry: {job_name} assert {str(all_ok).upper()}") + + +def assert_db_fields(db_check_list): + """ + Assert that the database fields are correct. + """ + assert db_check_list["JOB_DATA_EXIST"] + assert db_check_list["AUTOSUBMIT_DB_EXIST"] + assert db_check_list["JOB_DATA_ENTRIES"] + for job_name in db_check_list["JOB_DATA_FIELDS"]: + for job_counter in db_check_list["JOB_DATA_FIELDS"][job_name]: + for field in db_check_list["JOB_DATA_FIELDS"][job_name][job_counter]: + if field == "empty_fields": + assert len(db_check_list['JOB_DATA_FIELDS'][job_name][job_counter][field]) == 0 + else: + assert db_check_list['JOB_DATA_FIELDS'][job_name][job_counter][field] + + +def assert_exit_code(final_status, exit_code): + """ + Check that the exit code is correct. + """ + if final_status == "FAILED": + assert exit_code > 0 + else: + assert exit_code == 0 + +def check_files_recovered(run_tmpdir, log_dir, expected_files) -> dict: + """ + Check that all files are recovered after a run. + """ + # Check logs recovered and all stat files exists. + as_conf = AutosubmitConfig("t000") + as_conf.reload() + retrials = as_conf.experiment_data['JOBS']['JOB'].get('RETRIALS', 0) + files_check_list = {} + for f in log_dir.glob('*'): + files_check_list[f.name] = not any(str(f).endswith(f".{i}.err") or str(f).endswith(f".{i}.out") for i in range(retrials + 1)) + stat_files = [str(f).split("_")[-1] for f in log_dir.glob('*') if "STAT" in str(f)] + for i in range(retrials + 1): + files_check_list[f"STAT_{i}"] = str(i) in stat_files + + print("\nFiles check results:") + all_ok = True + for file in files_check_list: + if not files_check_list[file]: + all_ok = False + print(f"{file} does not exists: {files_check_list[file]}") + if all_ok: + print("All log files downloaded are renamed correctly.") + else: + print("Some log files are not renamed correctly.") + files_err_out_found = [f for f in log_dir.glob('*') if (str(f).endswith(".err") or str(f).endswith(".out") or "retrial" in str(f).lower()) and "ASThread" not in str(f)] + files_check_list["EXPECTED_FILES"] = len(files_err_out_found) == expected_files + if not files_check_list["EXPECTED_FILES"]: + print(f"Expected number of log files: {expected_files}. Found: {len(files_err_out_found)}") + files_err_out_found_str = ", ".join([f.name for f in files_err_out_found]) + print(f"Log files found: {files_err_out_found_str}") + print("Log files content:") + for f in files_err_out_found: + print(f"File: {f.name}\n{f.read_text()}") + print("All files, permissions and owner:") + for f in log_dir.glob('*'): + file_stat = os.stat(f) + file_owner_id = file_stat.st_uid + file_owner = pwd.getpwuid(file_owner_id).pw_name + print(f"File: {f.name} owner: {file_owner} permissions: {oct(file_stat.st_mode)}") + else: + print(f"All log files are gathered: {expected_files}") + return files_check_list + + +def assert_files_recovered(files_check_list): + """ + Assert that the files are recovered correctly. + """ + for check_name in files_check_list: + assert files_check_list[check_name] + + +def init_run(run_tmpdir, jobs_data): + """ + Initialize the run, writing the jobs.yml file and creating the experiment. + """ + # write jobs_data + jobs_path = Path(f"{run_tmpdir.strpath}/t000/conf/jobs.yml") + log_dir = Path(f"{run_tmpdir.strpath}/t000/tmp/LOG_t000") + with jobs_path.open('w') as f: + f.write(jobs_data) + + # Create + init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True, test_type='test') + + # This is set in _init_log which is not called + as_misc = Path(f"{run_tmpdir.strpath}/t000/conf/as_misc.yml") + with as_misc.open('w') as f: + f.write(""" + AS_MISC: True + ASMISC: + COMMAND: run + AS_COMMAND: run + """) + return log_dir + + +@pytest.mark.parametrize("jobs_data, expected_db_entries, final_status", [ + # Success + (""" + EXPERIMENT: + NUMCHUNKS: '3' + JOBS: + job: + SCRIPT: | + echo "Hello World with id=Success" + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + """, 3, "COMPLETED"), # Number of jobs + # Success wrapper + (""" + EXPERIMENT: + NUMCHUNKS: '2' + JOBS: + job: + SCRIPT: | + echo "Hello World with id=Success + wrappers" + DEPENDENCIES: job-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + job2: + SCRIPT: | + echo "Hello World with id=Success + wrappers" + DEPENDENCIES: job2-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + wrapper2: + JOBS_IN_WRAPPER: job2 + TYPE: vertical + """, 4, "COMPLETED"), # Number of jobs + # Failure + (""" + JOBS: + job: + SCRIPT: | + decho "Hello World with id=FAILED" + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + retrials: 2 # In local, it started to fail at 18 retrials. + """, (2+1)*3, "FAILED"), # Retries set (N + 1) * number of jobs to run + # Failure wrappers + (""" + JOBS: + job: + SCRIPT: | + decho "Hello World with id=FAILED + wrappers" + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:10 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, (2+1)*1, "FAILED"), # Retries set (N + 1) * job chunk 1 ( the rest shouldn't run ) +], ids=["Success", "Success with wrapper", "Failure", "Failure with wrapper"]) +def test_run_uninterrupted(run_tmpdir, prepare_run, jobs_data, expected_db_entries, final_status): + log_dir = init_run(run_tmpdir, jobs_data) + # Run the experiment + exit_code = Autosubmit.run_experiment(expid='t000') + + # Check and display results + db_check_list = check_db_fields(run_tmpdir, expected_db_entries, final_status) + files_check_list = check_files_recovered(run_tmpdir, log_dir, expected_files=expected_db_entries*2) + + # Assert + assert_db_fields(db_check_list) + assert_files_recovered(files_check_list) + # TODO: GITLAB pipeline is not returning 0 or 1 for check_exit_code(final_status, exit_code) + # assert_exit_code(final_status, exit_code) diff --git a/test/unit/test_slurm_platform_pytest.py b/test/unit/test_slurm_platform_pytest.py new file mode 100644 index 000000000..251316a9c --- /dev/null +++ b/test/unit/test_slurm_platform_pytest.py @@ -0,0 +1,88 @@ +import pytest + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.slurmplatform import SlurmPlatform + + +@pytest.fixture +def as_conf(autosubmit_config, tmpdir): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "PLATFORMS": { + "pytest-slurm": { + "type": "slurm", + "host": "localhost", + "user": "user", + "project": "project", + "scratch_dir": "/scratch", + "QUEUE": "queue", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "00:01", + "TEMP_DIR": "", + "MAX_PROCESSORS": 99999, + }, + }, + "LOCAL_ROOT_DIR": str(tmpdir), + "LOCAL_TMP_DIR": str(tmpdir), + "LOCAL_PROJ_DIR": str(tmpdir), + "LOCAL_ASLOG_DIR": str(tmpdir), + } + as_conf = autosubmit_config("dummy-expid", exp_data) + return as_conf + + +@pytest.fixture +def slurm_platform(as_conf): + platform = SlurmPlatform(expid="dummy-expid", name='pytest-slurm', config=as_conf.experiment_data) + return platform + + +@pytest.fixture +def create_packages(as_conf, slurm_platform): + simple_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0)] + vertical_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + horizontal_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in simple_jobs + vertical_jobs + horizontal_jobs: + job._platform = slurm_platform + job._platform.name = slurm_platform.name + job.platform_name = slurm_platform.name + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(simple_jobs), + JobPackageVertical(vertical_jobs, configuration=as_conf), + JobPackageHorizontal(horizontal_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, slurm_platform, as_conf, create_packages): + valid_packages_to_submit = create_packages + failed_packages = [] + slurm_platform.get_jobid_by_jobname = mocker.MagicMock() + slurm_platform.send_command = mocker.MagicMock() + slurm_platform.submit_Script = mocker.MagicMock() + jobs_id = [1, 2, 3] + slurm_platform.submit_Script.return_value = jobs_id + slurm_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages) + for i, package in enumerate(valid_packages_to_submit): + for job in package.jobs: + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None + assert failed_packages == []