From 9947b31de0653f110c759a2b85f37f94d505b5ba Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Fri, 19 Jul 2024 14:45:07 +0200 Subject: [PATCH 01/20] Include cancelling long running jobs --- openeo/extra/job_management.py | 46 +++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 3eb8a5868..da0bc4763 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -76,7 +76,7 @@ def start_job( """ def __init__( - self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "." + self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = ".", max_running_duration: Optional[int] = 720 ): """Create a MultiBackendJobManager. @@ -93,6 +93,10 @@ def __init__( - get_job_dir - get_error_log_path - get_job_metadata_path + + :param max_running_duration: + A temporal limit for long running jobs to get automatically cancelled. + The preset duration is 720 minutes or 12 hours """ self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep @@ -101,6 +105,9 @@ def __init__( # An explicit None or "" should also default to "." self._root_dir = Path(root_dir or ".") + self.max_running_duration = datetime.timedelta(minutes=max_running_duration) + + def add_backend( self, name: str, @@ -202,6 +209,39 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: df = df.assign(**new_columns) return df + + def _check_and_stop_long_running_jobs(self, df: pd.DataFrame): + """Check for long-running jobs and stop them if necessary.""" + running_jobs = df.loc[ + (df.status == "running") + + ] + for i in running_jobs.index: + + try: + job_id = df.loc[i, "id"] + backend_name = df.loc[i, "backend_name"] + con = self._get_connection(backend_name) + the_job = con.job(job_id) + + job_time_created = the_job.describe()['created'] + job_time_created = datetime.datetime.strptime(job_time_created, '%Y-%m-%dT%H:%M:%SZ') + time_difference = (datetime.datetime.now() - job_time_created) + + if (time_difference.total_seconds() > self.max_running_duration.total_seconds()): + + try: + _log.info(f"Cancelling job {job_id} on backend {backend_name} as it has been running for more than {self.max_running_duration} minutes") + + the_job.stop_job() + df.loc[i, "status"] = "cancelled_prolonged_job" + self.on_job_error(the_job, df.loc[i]) + + except OpenEoApiError as e: + _log.error(f"Error Cancelling long-running job {job_id!r} on backend {backend_name}: {e}") + + except: + pass def run_jobs( self, @@ -277,9 +317,13 @@ def run_jobs( & (df.status != "skipped") & (df.status != "start_failed") & (df.status != "error") + & (df.status != "cancelled_prolonged_job") ].size > 0 ): + # Check if any job is running longer than maximally allowed + self._check_and_stop_long_running_jobs(df) + with ignore_connection_errors(context="get statuses"): self._update_statuses(df) status_histogram = df.groupby("status").size().to_dict() From 46505a2509a44fd520dff4eff91eea5569d5c7fb Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Mon, 22 Jul 2024 16:28:24 +0200 Subject: [PATCH 02/20] revisions + add job cost --- openeo/extra/job_management.py | 116 ++++++++++++++------------------- 1 file changed, 50 insertions(+), 66 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index da0bc4763..708d32667 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -16,6 +16,7 @@ from openeo.rest import OpenEoApiError from openeo.util import deep_get + _log = logging.getLogger(__name__) @@ -30,6 +31,7 @@ class _Backend(NamedTuple): MAX_RETRIES = 5 + class MultiBackendJobManager: """ Tracker for multiple jobs on multiple backends. @@ -76,7 +78,10 @@ def start_job( """ def __init__( - self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = ".", max_running_duration: Optional[int] = 720 + self, + poll_sleep: int = 60, + root_dir: Optional[Union[str, Path]] = ".", + max_running_duration: Optional[int] = 12 * 60 * 60, ): """Create a MultiBackendJobManager. @@ -94,9 +99,9 @@ def __init__( - get_error_log_path - get_job_metadata_path - :param max_running_duration: - A temporal limit for long running jobs to get automatically cancelled. - The preset duration is 720 minutes or 12 hours + :param max_running_duration [seconds]: + A temporal limit for long running jobs to get automatically canceled. + The preset duration 12 hours """ self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep @@ -105,8 +110,7 @@ def __init__( # An explicit None or "" should also default to "." self._root_dir = Path(root_dir or ".") - self.max_running_duration = datetime.timedelta(minutes=max_running_duration) - + self.max_running_duration = datetime.timedelta(seconds=max_running_duration) def add_backend( self, @@ -132,9 +136,7 @@ def add_backend( c = connection connection = lambda: c assert callable(connection) - self.backends[name] = _Backend( - get_connection=connection, parallel_jobs=parallel_jobs - ) + self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs) def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection: """Get a connection for the backend and optionally make it resilient (adds retry behavior) @@ -199,49 +201,18 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: ("start_time", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. - # However, they are through assumptions about job "usage" metadata in `_update_statuses`. + # However, they are through assumptions about job "usage" metadata in `_tracks_statuses`. ("cpu", None), ("memory", None), ("duration", None), - ("backend_name", None), + ("backend_name", None), + ("costs", None) + ] new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} df = df.assign(**new_columns) return df - - def _check_and_stop_long_running_jobs(self, df: pd.DataFrame): - """Check for long-running jobs and stop them if necessary.""" - running_jobs = df.loc[ - (df.status == "running") - - ] - for i in running_jobs.index: - - try: - job_id = df.loc[i, "id"] - backend_name = df.loc[i, "backend_name"] - con = self._get_connection(backend_name) - the_job = con.job(job_id) - - job_time_created = the_job.describe()['created'] - job_time_created = datetime.datetime.strptime(job_time_created, '%Y-%m-%dT%H:%M:%SZ') - time_difference = (datetime.datetime.now() - job_time_created) - - if (time_difference.total_seconds() > self.max_running_duration.total_seconds()): - - try: - _log.info(f"Cancelling job {job_id} on backend {backend_name} as it has been running for more than {self.max_running_duration} minutes") - - the_job.stop_job() - df.loc[i, "status"] = "cancelled_prolonged_job" - self.on_job_error(the_job, df.loc[i]) - - except OpenEoApiError as e: - _log.error(f"Error Cancelling long-running job {job_id!r} on backend {backend_name}: {e}") - - except: - pass def run_jobs( self, @@ -317,34 +288,26 @@ def run_jobs( & (df.status != "skipped") & (df.status != "start_failed") & (df.status != "error") - & (df.status != "cancelled_prolonged_job") + & (df.status != "canceled") ].size > 0 ): - # Check if any job is running longer than maximally allowed - self._check_and_stop_long_running_jobs(df) with ignore_connection_errors(context="get statuses"): - self._update_statuses(df) + self._tracks_statuses(df) status_histogram = df.groupby("status").size().to_dict() _log.info(f"Status histogram: {status_histogram}") job_db.persist(df) if len(df[df.status == "not_started"]) > 0: # Check number of jobs running at each backend - running = df[ - (df.status == "created") - | (df.status == "queued") - | (df.status == "running") - ] + running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")] per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") for backend_name in self.backends: backend_load = per_backend.get(backend_name, 0) if backend_load < self.backends[backend_name].parallel_jobs: - to_add = ( - self.backends[backend_name].parallel_jobs - backend_load - ) + to_add = self.backends[backend_name].parallel_jobs - backend_load to_launch = df[df.status == "not_started"].iloc[0:to_add] for i in to_launch.index: self._launch_job(start_job, df, i, backend_name) @@ -466,13 +429,10 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) - def _update_statuses(self, df: pd.DataFrame): - """Update status (and stats) of running jobs (in place).""" - active = df.loc[ - (df.status == "created") - | (df.status == "queued") - | (df.status == "running") - ] + + def _tracks_statuses(self, df: pd.DataFrame): + """tracks status (and stats) of running jobs (in place). Cancels jobs when running too long""" + active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")] for i in active.index: job_id = df.loc[i, "id"] backend_name = df.loc[i, "backend_name"] @@ -481,15 +441,38 @@ def _update_statuses(self, df: pd.DataFrame): con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() - _log.info( - f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}" - ) + _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}") + + if job_metadata["status"] == "running": + + timezone = datetime.timezone.utc + job_time_created = datetime.datetime.strptime(the_job.describe()["created"], "%Y-%m-%dT%H:%M:%SZ") + job_time_created = job_time_created.replace(tzinfo=timezone) + + if datetime.datetime.now(timezone) > job_time_created + self.max_running_duration: + try: + _log.info( + f"Cancelling job {job_id} on backend {backend_name} as it has been running for more than {self.max_running_duration} minutes" + ) + the_job.stop_job() + self.on_job_error(the_job, df.loc[i]) + + except OpenEoApiError as e: + _log.error(f"Error Cancelling long-running job {job_id!r} on backend {backend_name}: {e}") + + if job_metadata["status"] == "finished": self.on_job_done(the_job, df.loc[i]) + try: + df.loc[i, "costs"] = job_metadata["costs"] + except: + pass + if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": self.on_job_error(the_job, df.loc[i]) df.loc[i, "status"] = job_metadata["status"] + # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` for key in job_metadata.get("usage", {}).keys(): df.loc[i, key] = _format_usage_stat(job_metadata, key) @@ -545,6 +528,7 @@ def read(self) -> pd.DataFrame: ): df["geometry"] = df["geometry"].apply(shapely.wkt.loads) return df + def persist(self, df: pd.DataFrame): self.path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(self.path, index=False) From 51ee69c881f508e7f952476bbef4dd492f9aeefe Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 23 Jul 2024 16:05:39 +0200 Subject: [PATCH 03/20] feature: add standard tracking of created timestamp and running timestamp. Enable cancelling jobs based on running duration --- openeo/extra/job_management.py | 87 ++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 708d32667..b26546ddc 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -14,7 +14,7 @@ from openeo import BatchJob, Connection from openeo.rest import OpenEoApiError -from openeo.util import deep_get +from openeo.util import deep_get, rfc3339 _log = logging.getLogger(__name__) @@ -81,7 +81,7 @@ def __init__( self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = ".", - max_running_duration: Optional[int] = 12 * 60 * 60, + max_running_duration: Optional[int] = None, ): """Create a MultiBackendJobManager. @@ -101,7 +101,7 @@ def __init__( :param max_running_duration [seconds]: A temporal limit for long running jobs to get automatically canceled. - The preset duration 12 hours + The preset duration 12 hours. Can be set to None to disable """ self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep @@ -110,7 +110,10 @@ def __init__( # An explicit None or "" should also default to "." self._root_dir = Path(root_dir or ".") - self.max_running_duration = datetime.timedelta(seconds=max_running_duration) + self.max_running_duration = ( + datetime.timedelta(seconds=max_running_duration) if max_running_duration is not None else None + ) + def add_backend( self, @@ -193,21 +196,21 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: :param df: The dataframe to normalize. :return: a new dataframe that is normalized. """ + pass # check for some required columns. required_with_default = [ ("status", "not_started"), ("id", None), - ("start_time", None), + ("created_timestamp", None), + ("running_timestamp", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. - # However, they are through assumptions about job "usage" metadata in `_tracks_statuses`. + # However, they are through assumptions about job "usage" metadata in `_track_statuses`. ("cpu", None), ("memory", None), ("duration", None), - ("backend_name", None), - ("costs", None) - + ("backend_name", None) ] new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} df = df.assign(**new_columns) @@ -294,7 +297,7 @@ def run_jobs( ): with ignore_connection_errors(context="get statuses"): - self._tracks_statuses(df) + self._track_statuses(df) status_histogram = df.groupby("status").size().to_dict() _log.info(f"Status histogram: {status_histogram}") job_db.persist(df) @@ -355,7 +358,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "start_time"] = datetime.datetime.now().isoformat() + df.loc[i, "created_timestamp"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): @@ -393,6 +396,7 @@ def on_job_done(self, job: BatchJob, row): with open(metadata_path, "w") as f: json.dump(job_metadata, f, ensure_ascii=False) + def on_job_error(self, job: BatchJob, row): """ Handles jobs that stopped with errors. Can be overridden to provide custom behaviour. @@ -411,6 +415,30 @@ def on_job_error(self, job: BatchJob, row): self.ensure_job_dir_exists(job.job_id) error_log_path.write_text(json.dumps(error_logs, indent=2)) + def on_job_cancel(self, job: BatchJob, row): + """ + Handles jobs that that were cancelled. Can be overridden to provide custom behaviour. + + Default implementation does not do anything. + + :param job: The job that has finished. + :param row: DataFrame row containing the job's metadata. + """ + # TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? + + + def _cancel_prolonged_job(self, job: BatchJob, row): + """Cancel the job if it has been running for too long.""" + job_running_timestamp = rfc3339.parse_datetime(row["running_timestamp"], with_timezone = True) + if datetime.datetime.now(datetime.timezone.utc) > job_running_timestamp + self.max_running_duration: + try: + job.stop() + _log.info( + f"Cancelling job {job.job_id} as it has been running for more than {self.max_running_duration}" + ) + except OpenEoApiError as e: + _log.error(f"Error Cancelling long-running job {job.job_id}: {e}") + def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" return self._root_dir / f"job_{job_id}" @@ -430,8 +458,8 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: job_dir.mkdir(parents=True) - def _tracks_statuses(self, df: pd.DataFrame): - """tracks status (and stats) of running jobs (in place). Cancels jobs when running too long""" + def _track_statuses(self, df: pd.DataFrame): + """tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long""" active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")] for i in active.index: job_id = df.loc[i, "id"] @@ -443,34 +471,21 @@ def _tracks_statuses(self, df: pd.DataFrame): job_metadata = the_job.describe() _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}") - if job_metadata["status"] == "running": - - timezone = datetime.timezone.utc - job_time_created = datetime.datetime.strptime(the_job.describe()["created"], "%Y-%m-%dT%H:%M:%SZ") - job_time_created = job_time_created.replace(tzinfo=timezone) - - if datetime.datetime.now(timezone) > job_time_created + self.max_running_duration: - try: - _log.info( - f"Cancelling job {job_id} on backend {backend_name} as it has been running for more than {self.max_running_duration} minutes" - ) - the_job.stop_job() - self.on_job_error(the_job, df.loc[i]) - - except OpenEoApiError as e: - _log.error(f"Error Cancelling long-running job {job_id!r} on backend {backend_name}: {e}") - + if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued") and job_metadata["status"] == "running": + df.loc[i, "running_timestamp"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') - if job_metadata["status"] == "finished": - self.on_job_done(the_job, df.loc[i]) - try: - df.loc[i, "costs"] = job_metadata["costs"] - except: - pass + if self.max_running_duration and job_metadata["status"] == "running": + self._cancel_prolonged_job(the_job, df.loc[i]) if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": self.on_job_error(the_job, df.loc[i]) + if job_metadata["status"] == "finished": + self.on_job_done(the_job, df.loc[i]) + + if job_metadata["status"] == "canceled": + self.on_job_cancel(the_job, df.loc[i]) + df.loc[i, "status"] = job_metadata["status"] # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` From 44202b0423c7a5a1585737213681908bb5051076 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 23 Jul 2024 17:13:25 +0200 Subject: [PATCH 04/20] fix: maintain output pd column names --- openeo/extra/job_management.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index b26546ddc..cecc9cae8 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -202,8 +202,8 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: required_with_default = [ ("status", "not_started"), ("id", None), - ("created_timestamp", None), - ("running_timestamp", None), + ("start_time", None), + ("running_time", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. # However, they are through assumptions about job "usage" metadata in `_track_statuses`. @@ -358,7 +358,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "created_timestamp"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + df.loc[i, "start_time"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): @@ -429,7 +429,7 @@ def on_job_cancel(self, job: BatchJob, row): def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" - job_running_timestamp = rfc3339.parse_datetime(row["running_timestamp"], with_timezone = True) + job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone = True) if datetime.datetime.now(datetime.timezone.utc) > job_running_timestamp + self.max_running_duration: try: job.stop() @@ -472,7 +472,7 @@ def _track_statuses(self, df: pd.DataFrame): _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}") if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued") and job_metadata["status"] == "running": - df.loc[i, "running_timestamp"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + df.loc[i, "running_time"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.max_running_duration and job_metadata["status"] == "running": self._cancel_prolonged_job(the_job, df.loc[i]) From 41eb1b928689ff6c5ba1a0ac0caa74efeafc026f Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 24 Jul 2024 13:22:14 +0200 Subject: [PATCH 05/20] feature: include unit tests --- openeo/extra/job_management.py | 12 ++-- tests/extra/test_job_management.py | 104 +++++++++++++++++++++-------- 2 files changed, 84 insertions(+), 32 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index cecc9cae8..2e30369d6 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,5 +1,5 @@ import contextlib -import datetime +from datetime import datetime, timedelta, timezone import json import logging import time @@ -111,7 +111,7 @@ def __init__( self._root_dir = Path(root_dir or ".") self.max_running_duration = ( - datetime.timedelta(seconds=max_running_duration) if max_running_duration is not None else None + timedelta(seconds=max_running_duration) if max_running_duration is not None else None ) @@ -358,7 +358,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "start_time"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + df.loc[i, "start_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): @@ -430,7 +430,9 @@ def on_job_cancel(self, job: BatchJob, row): def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone = True) - if datetime.datetime.now(datetime.timezone.utc) > job_running_timestamp + self.max_running_duration: + current_time = datetime.now(timezone.utc) + + if current_time > job_running_timestamp + self.max_running_duration: try: job.stop() _log.info( @@ -472,7 +474,7 @@ def _track_statuses(self, df: pd.DataFrame): _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}") if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued") and job_metadata["status"] == "running": - df.loc[i, "running_time"] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + df.loc[i, "running_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.max_running_duration and job_metadata["status"] == "running": self._cancel_prolonged_job(the_job, df.loc[i]) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 2fca87b41..d3a12b21f 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,6 +1,9 @@ import json import threading from unittest import mock +import datetime +from openeo.util import rfc3339 + # TODO: can we avoid using httpretty? # We need it for testing the resilience, which uses an HTTPadapter with Retry @@ -22,13 +25,19 @@ _CsvJobDatabase, _ParquetJobDatabase, ) +from openeo.rest import OpenEoApiError + + class TestMultiBackendJobManager: + + @pytest.fixture def sleep_mock(self): with mock.patch("time.sleep") as sleep: yield sleep + def test_basic(self, tmp_path, requests_mock, sleep_mock): requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) @@ -75,9 +84,7 @@ def mock_job_status(job_id, queued=1, running=2): # It also needs the job results endpoint, though that can be a dummy implementation. # When the job is finished the system tries to download the results and that is what # needs this endpoint. - requests_mock.get( - f"{backend}/jobs/{job_id}/results", json={"links": []} - ) + requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) mock_job_status("job-2018", queued=1, running=2) mock_job_status("job-2019", queued=2, running=3) @@ -132,6 +139,7 @@ def test_normalize_df(self): "status", "id", "start_time", + "running_time", "cpu", "memory", "duration", @@ -270,7 +278,6 @@ def start_worker_thread(): metadata_path = manager.get_job_metadata_path(job_id="job-2021") assert metadata_path.exists() - def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" requests_mock.get(backend, json={"api_version": "1.1.0"}) @@ -283,9 +290,7 @@ def test_on_error_log(self, tmp_path, requests_mock): "message": "Test that error handling works", } ] - requests_mock.get( - f"{backend}/jobs/{job_id}/logs", json={"logs": errors_log_lines} - ) + requests_mock.get(f"{backend}/jobs/{job_id}/logs", json={"logs": errors_log_lines}) root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) @@ -304,12 +309,9 @@ def test_on_error_log(self, tmp_path, requests_mock): contents = error_log_path.read_text() assert json.loads(contents) == errors_log_lines - @httpretty.activate(allow_net_connect=False, verbose=True) @pytest.mark.parametrize("http_error_status", [502, 503, 504]) - def test_is_resilient_to_backend_failures( - self, tmp_path, http_error_status, sleep_mock - ): + def test_is_resilient_to_backend_failures(self, tmp_path, http_error_status, sleep_mock): """ Our job should still succeed when the backend request succeeds eventually, after first failing the maximum allowed number of retries. @@ -329,9 +331,7 @@ def test_is_resilient_to_backend_failures( backend = "http://foo.test" job_id = "job-2018" - httpretty.register_uri( - "GET", backend, body=json.dumps({"api_version": "1.1.0"}) - ) + httpretty.register_uri("GET", backend, body=json.dumps({"api_version": "1.1.0"})) # First fail the max times the connection should retry, then succeed. after that response_list = [ @@ -348,9 +348,7 @@ def test_is_resilient_to_backend_failures( ) ) ] - httpretty.register_uri( - "GET", f"{backend}/jobs/{job_id}", responses=response_list - ) + httpretty.register_uri("GET", f"{backend}/jobs/{job_id}", responses=response_list) root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) @@ -380,9 +378,7 @@ def start_job(row, connection_provider, connection, **kwargs): @httpretty.activate(allow_net_connect=False, verbose=True) @pytest.mark.parametrize("http_error_status", [502, 503, 504]) - def test_resilient_backend_reports_error_when_max_retries_exceeded( - self, tmp_path, http_error_status, sleep_mock - ): + def test_resilient_backend_reports_error_when_max_retries_exceeded(self, tmp_path, http_error_status, sleep_mock): """We should get a RetryError when the backend request fails more times than the maximum allowed number of retries. Goal of the test is only to see that retrying is effectively executed. @@ -400,9 +396,7 @@ def test_resilient_backend_reports_error_when_max_retries_exceeded( backend = "http://foo.test" job_id = "job-2018" - httpretty.register_uri( - "GET", backend, body=json.dumps({"api_version": "1.1.0"}) - ) + httpretty.register_uri("GET", backend, body=json.dumps({"api_version": "1.1.0"})) # Fail one more time than the max allow retries. # But do add one successful request at the start, to simulate that the job was @@ -423,9 +417,7 @@ def test_resilient_backend_reports_error_when_max_retries_exceeded( MAX_RETRIES + 1 ) - httpretty.register_uri( - "GET", f"{backend}/jobs/{job_id}", responses=response_list - ) + httpretty.register_uri("GET", f"{backend}/jobs/{job_id}", responses=response_list) root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) @@ -455,7 +447,65 @@ def start_job(row, connection_provider, connection, **kwargs): assert set(result.status) == {"running"} assert set(result.backend_name) == {"foo"} - + @mock.patch('openeo.extra.job_management.datetime', autospec=True) + @mock.patch('openeo.extra.job_management.timezone', autospec=True) + def test_cancel_prolonged_job_exceeds_duration(self, MockTimezone, MockDatetime): + # Create mock BatchJob instance + job = mock.MagicMock() # Use MagicMock directly here + job.job_id = "test_job_id" + + row = { + "running_time": "2020-01-01T00:00:00Z" + } + + # Initialize manager with the max_running_duration as seconds + max_running_duration_seconds = 12 * 60 * 60 # 12 hours + manager = MultiBackendJobManager(max_running_duration=max_running_duration_seconds) + + # set up timestamps + job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone=True) + future_time = job_running_timestamp + datetime.timedelta(seconds=max_running_duration_seconds) + datetime.timedelta(seconds=1) + + # Set up the mock datetime + MockDatetime.now.return_value = future_time + MockTimezone.utc = datetime.timezone.utc + + manager._cancel_prolonged_job(job, row) + + # Verify that the stop method was called + job.stop.assert_called_once() + + + + @mock.patch('openeo.extra.job_management.datetime', autospec=True) + @mock.patch('openeo.extra.job_management.timezone', autospec=True) + def test_cancel_prolonged_job_within_duration(self, MockTimezone, MockDatetime): + # Create mock BatchJob instance + job = mock.MagicMock() # Use MagicMock directly here + job.job_id = "test_job_id" + + row = { + "running_time": "2020-01-01T00:00:00Z" + } + + # Initialize manager with the max_running_duration as seconds + max_running_duration_seconds = 12 * 60 * 60 # 12 hours + manager = MultiBackendJobManager(max_running_duration=max_running_duration_seconds) + + # set up timestamps + job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone=True) + future_time = job_running_timestamp + datetime.timedelta(seconds=max_running_duration_seconds) - datetime.timedelta(seconds=1) + + # Set up the mock datetime + MockDatetime.now.return_value = future_time + MockTimezone.utc = datetime.timezone.utc + + manager._cancel_prolonged_job(job, row) + + # Verify that the stop method was called + job.stop.assert_not_called() + + class TestCsvJobDatabase: def test_read_wkt(self, tmp_path): wkt_df = pd.DataFrame( From 13bef2a87d20c63251439b271b02e9870577ad95 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 24 Jul 2024 14:44:45 +0200 Subject: [PATCH 06/20] fix: ignored started condition for runtime logging --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 2e30369d6..b9afd0b30 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -473,7 +473,7 @@ def _track_statuses(self, df: pd.DataFrame): job_metadata = the_job.describe() _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}") - if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued") and job_metadata["status"] == "running": + if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued" or df.loc[i, "status"] == "started") and job_metadata["status"] == "running": df.loc[i, "running_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.max_running_duration and job_metadata["status"] == "running": From 3cf49b643b762248d96a8fb3f1de1e1fa165e801 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Mon, 5 Aug 2024 13:44:29 +0200 Subject: [PATCH 07/20] fix: usage time machine / rcf3339 and clean up --- openeo/extra/job_management.py | 51 ++++++++++++++++-------------- tests/extra/test_job_management.py | 49 +++++++++++++--------------- 2 files changed, 50 insertions(+), 50 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index b9afd0b30..7d90eedc5 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,5 +1,5 @@ import contextlib -from datetime import datetime, timedelta, timezone +import datetime import json import logging import time @@ -16,10 +16,8 @@ from openeo.rest import OpenEoApiError from openeo.util import deep_get, rfc3339 - _log = logging.getLogger(__name__) - class _Backend(NamedTuple): """Container for backend info/settings""" @@ -81,7 +79,7 @@ def __init__( self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = ".", - max_running_duration: Optional[int] = None, + cancel_running_job_after: Optional[int] = None, ): """Create a MultiBackendJobManager. @@ -99,9 +97,9 @@ def __init__( - get_error_log_path - get_job_metadata_path - :param max_running_duration [seconds]: + :param cancel_running_job_after [seconds]: A temporal limit for long running jobs to get automatically canceled. - The preset duration 12 hours. Can be set to None to disable + The preset is None, which disables the feature. """ self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep @@ -110,8 +108,8 @@ def __init__( # An explicit None or "" should also default to "." self._root_dir = Path(root_dir or ".") - self.max_running_duration = ( - timedelta(seconds=max_running_duration) if max_running_duration is not None else None + self.cancel_running_job_after = ( + datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None ) @@ -196,14 +194,13 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: :param df: The dataframe to normalize. :return: a new dataframe that is normalized. """ - pass # check for some required columns. required_with_default = [ ("status", "not_started"), ("id", None), ("start_time", None), - ("running_time", None), + ("running_start_time", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. # However, they are through assumptions about job "usage" metadata in `_track_statuses`. @@ -358,7 +355,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "start_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + df.loc[i, "start_time"] = rfc3339.utcnow() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): @@ -424,20 +421,22 @@ def on_job_cancel(self, job: BatchJob, row): :param job: The job that has finished. :param row: DataFrame row containing the job's metadata. """ - # TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? - + pass def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" - job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone = True) - current_time = datetime.now(timezone.utc) + job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) + current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) + - if current_time > job_running_timestamp + self.max_running_duration: + if current_time > job_running_start_time + self.cancel_running_job_after: try: - job.stop() + print(str(self.cancel_running_job_after)) _log.info( - f"Cancelling job {job.job_id} as it has been running for more than {self.max_running_duration}" - ) + f"Cancelling job {job.job_id} as it has been running for more than {str(self.cancel_running_job_after)}" +) + job.stop() + except OpenEoApiError as e: _log.error(f"Error Cancelling long-running job {job.job_id}: {e}") @@ -471,12 +470,16 @@ def _track_statuses(self, df: pd.DataFrame): con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() - _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}") - if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued" or df.loc[i, "status"] == "started") and job_metadata["status"] == "running": - df.loc[i, "running_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + previous_status = df.loc[i, "status"] + new_status = job_metadata["status"] + + _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}") - if self.max_running_duration and job_metadata["status"] == "running": + if previous_status in {"created", "queued", "started"} and new_status == "running": + df.loc[i, "running_start_time"] = rfc3339.utcnow() + + if self.cancel_running_job_after and job_metadata["status"] == "running": self._cancel_prolonged_job(the_job, df.loc[i]) if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": @@ -561,3 +564,5 @@ def read(self) -> pd.DataFrame: def persist(self, df: pd.DataFrame): self.path.parent.mkdir(parents=True, exist_ok=True) df.to_parquet(self.path, index=False) + + diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index d3a12b21f..48d739cdb 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -2,6 +2,7 @@ import threading from unittest import mock import datetime +import time_machine from openeo.util import rfc3339 @@ -139,7 +140,7 @@ def test_normalize_df(self): "status", "id", "start_time", - "running_time", + "running_start_time", "cpu", "memory", "duration", @@ -447,62 +448,52 @@ def start_job(row, connection_provider, connection, **kwargs): assert set(result.status) == {"running"} assert set(result.backend_name) == {"foo"} - @mock.patch('openeo.extra.job_management.datetime', autospec=True) - @mock.patch('openeo.extra.job_management.timezone', autospec=True) - def test_cancel_prolonged_job_exceeds_duration(self, MockTimezone, MockDatetime): + def test_cancel_prolonged_job_exceeds_duration(self): # Create mock BatchJob instance job = mock.MagicMock() # Use MagicMock directly here job.job_id = "test_job_id" row = { - "running_time": "2020-01-01T00:00:00Z" + "running_start_time": "2020-01-01T00:00:00Z" } # Initialize manager with the max_running_duration as seconds max_running_duration_seconds = 12 * 60 * 60 # 12 hours - manager = MultiBackendJobManager(max_running_duration=max_running_duration_seconds) + manager = MultiBackendJobManager(cancel_running_job_after=max_running_duration_seconds) # set up timestamps - job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone=True) + job_running_timestamp = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) future_time = job_running_timestamp + datetime.timedelta(seconds=max_running_duration_seconds) + datetime.timedelta(seconds=1) - # Set up the mock datetime - MockDatetime.now.return_value = future_time - MockTimezone.utc = datetime.timezone.utc - - manager._cancel_prolonged_job(job, row) - + with time_machine.travel(future_time, tick=False): + manager._cancel_prolonged_job(job, row) + # Verify that the stop method was called job.stop.assert_called_once() - @mock.patch('openeo.extra.job_management.datetime', autospec=True) - @mock.patch('openeo.extra.job_management.timezone', autospec=True) - def test_cancel_prolonged_job_within_duration(self, MockTimezone, MockDatetime): + def test_cancel_prolonged_job_within_duration(self): # Create mock BatchJob instance - job = mock.MagicMock() # Use MagicMock directly here + job = mock.MagicMock() job.job_id = "test_job_id" row = { - "running_time": "2020-01-01T00:00:00Z" + "running_start_time": "2020-01-01T00:00:00Z" } # Initialize manager with the max_running_duration as seconds max_running_duration_seconds = 12 * 60 * 60 # 12 hours - manager = MultiBackendJobManager(max_running_duration=max_running_duration_seconds) + manager = MultiBackendJobManager(cancel_running_job_after=max_running_duration_seconds) # set up timestamps - job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone=True) + job_running_timestamp = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) future_time = job_running_timestamp + datetime.timedelta(seconds=max_running_duration_seconds) - datetime.timedelta(seconds=1) - - # Set up the mock datetime - MockDatetime.now.return_value = future_time - MockTimezone.utc = datetime.timezone.utc - - manager._cancel_prolonged_job(job, row) - # Verify that the stop method was called + with time_machine.travel(future_time, tick=False): + manager._cancel_prolonged_job(job, row) + + # Verify that the stop method was not called job.stop.assert_not_called() @@ -554,3 +545,7 @@ def test_read_persist(self, tmp_path): path = tmp_path / "jobs.parquet" _ParquetJobDatabase(path).persist(df) assert _ParquetJobDatabase(path).read().equals(df) + + +#%% + From ad0cd430b43a218c04e1457d700630ab34727798 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:45:38 +0200 Subject: [PATCH 08/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 7d90eedc5..46bf2cc99 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -414,7 +414,7 @@ def on_job_error(self, job: BatchJob, row): def on_job_cancel(self, job: BatchJob, row): """ - Handles jobs that that were cancelled. Can be overridden to provide custom behaviour. + Handle a job that was cancelled. Can be overridden to provide custom behaviour. Default implementation does not do anything. From d1fd26cb5bcd2894bc928757f48b40cf75db59ec Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:45:55 +0200 Subject: [PATCH 09/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 46bf2cc99..40f379347 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -418,7 +418,7 @@ def on_job_cancel(self, job: BatchJob, row): Default implementation does not do anything. - :param job: The job that has finished. + :param job: The job that was canceled. :param row: DataFrame row containing the job's metadata. """ pass From 788496a89728d0019c86846a8555672a66931f6b Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:46:06 +0200 Subject: [PATCH 10/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 1 - 1 file changed, 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 40f379347..f402829ec 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -431,7 +431,6 @@ def _cancel_prolonged_job(self, job: BatchJob, row): if current_time > job_running_start_time + self.cancel_running_job_after: try: - print(str(self.cancel_running_job_after)) _log.info( f"Cancelling job {job.job_id} as it has been running for more than {str(self.cancel_running_job_after)}" ) From 89b1e3640c4c106c2b385b746823b1f15e92e5de Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:46:23 +0200 Subject: [PATCH 11/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index f402829ec..74f3e6221 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -207,7 +207,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: ("cpu", None), ("memory", None), ("duration", None), - ("backend_name", None) + ("backend_name", None), ] new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} df = df.assign(**new_columns) From 0b353c0710b65b5c893f3a140f615df1c6bfb00a Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:46:33 +0200 Subject: [PATCH 12/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 74f3e6221..0d73603c5 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -199,7 +199,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: required_with_default = [ ("status", "not_started"), ("id", None), - ("start_time", None), + ("start_time", None), ("running_start_time", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. From 6828ca817fb3a07178607a9a8379ef452dccae62 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:47:41 +0200 Subject: [PATCH 13/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 0d73603c5..544b4c60a 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -484,7 +484,7 @@ def _track_statuses(self, df: pd.DataFrame): if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": self.on_job_error(the_job, df.loc[i]) - if job_metadata["status"] == "finished": + if new_status == "finished": self.on_job_done(the_job, df.loc[i]) if job_metadata["status"] == "canceled": From de9ce04912a7b806e26d50af3be1bac6f8d084c2 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:48:02 +0200 Subject: [PATCH 14/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 544b4c60a..bae342566 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -481,7 +481,7 @@ def _track_statuses(self, df: pd.DataFrame): if self.cancel_running_job_after and job_metadata["status"] == "running": self._cancel_prolonged_job(the_job, df.loc[i]) - if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": + if previous_status != "error" and new_status == "error": self.on_job_error(the_job, df.loc[i]) if new_status == "finished": From b5a10bbf746c715cc6dbcc0b0a60912135b89582 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay <57984106+HansVRP@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:48:15 +0200 Subject: [PATCH 15/20] Update openeo/extra/job_management.py Co-authored-by: Stefaan Lippens --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index bae342566..5c7b08128 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -487,7 +487,7 @@ def _track_statuses(self, df: pd.DataFrame): if new_status == "finished": self.on_job_done(the_job, df.loc[i]) - if job_metadata["status"] == "canceled": + if new_status == "canceled": self.on_job_cancel(the_job, df.loc[i]) df.loc[i, "status"] = job_metadata["status"] From fdbc1c94a6425aa83ced01ab94af4aa39f1bc9a3 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 21 Aug 2024 11:43:28 +0200 Subject: [PATCH 16/20] code review changes --- openeo/extra/job_management.py | 32 ++++----- tests/extra/test_job_management.py | 111 +++++++++++++++++------------ 2 files changed, 81 insertions(+), 62 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 7d90eedc5..80a36ec64 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -199,15 +199,15 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: required_with_default = [ ("status", "not_started"), ("id", None), - ("start_time", None), - ("running_start_time", None), + ("start_time", None), + ("running_start_time", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. # However, they are through assumptions about job "usage" metadata in `_track_statuses`. ("cpu", None), ("memory", None), ("duration", None), - ("backend_name", None) + ("backend_name", None), ] new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} df = df.assign(**new_columns) @@ -414,11 +414,11 @@ def on_job_error(self, job: BatchJob, row): def on_job_cancel(self, job: BatchJob, row): """ - Handles jobs that that were cancelled. Can be overridden to provide custom behaviour. + Handle a job that was cancelled. Can be overridden to provide custom behaviour. Default implementation does not do anything. - :param job: The job that has finished. + :param job: The job that was canceled. :param row: DataFrame row containing the job's metadata. """ pass @@ -431,9 +431,8 @@ def _cancel_prolonged_job(self, job: BatchJob, row): if current_time > job_running_start_time + self.cancel_running_job_after: try: - print(str(self.cancel_running_job_after)) _log.info( - f"Cancelling job {job.job_id} as it has been running for more than {str(self.cancel_running_job_after)}" + f"Cancelling job {job.job_id} as it has been running for more than {self.cancel_running_job_after}" ) job.stop() @@ -479,19 +478,19 @@ def _track_statuses(self, df: pd.DataFrame): if previous_status in {"created", "queued", "started"} and new_status == "running": df.loc[i, "running_start_time"] = rfc3339.utcnow() - if self.cancel_running_job_after and job_metadata["status"] == "running": - self._cancel_prolonged_job(the_job, df.loc[i]) - - if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": + if previous_status != "error" and new_status == "error": self.on_job_error(the_job, df.loc[i]) - if job_metadata["status"] == "finished": + if new_status == "finished": self.on_job_done(the_job, df.loc[i]) - if job_metadata["status"] == "canceled": + if new_status == "canceled": self.on_job_cancel(the_job, df.loc[i]) - - df.loc[i, "status"] = job_metadata["status"] + + if self.cancel_running_job_after and new_status == "running": + self._cancel_prolonged_job(the_job, df.loc[i]) + + previous_status = new_status # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` for key in job_metadata.get("usage", {}).keys(): @@ -564,5 +563,6 @@ def read(self) -> pd.DataFrame: def persist(self, df: pd.DataFrame): self.path.parent.mkdir(parents=True, exist_ok=True) df.to_parquet(self.path, index=False) - + + diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 48d739cdb..c6ff0b04d 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -4,7 +4,7 @@ import datetime import time_machine from openeo.util import rfc3339 - +import time # TODO: can we avoid using httpretty? # We need it for testing the resilience, which uses an HTTPadapter with Retry @@ -449,54 +449,73 @@ def start_job(row, connection_provider, connection, **kwargs): assert set(result.backend_name) == {"foo"} def test_cancel_prolonged_job_exceeds_duration(self): - # Create mock BatchJob instance - job = mock.MagicMock() # Use MagicMock directly here - job.job_id = "test_job_id" - - row = { - "running_start_time": "2020-01-01T00:00:00Z" - } - - # Initialize manager with the max_running_duration as seconds - max_running_duration_seconds = 12 * 60 * 60 # 12 hours - manager = MultiBackendJobManager(cancel_running_job_after=max_running_duration_seconds) - - # set up timestamps - job_running_timestamp = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) - future_time = job_running_timestamp + datetime.timedelta(seconds=max_running_duration_seconds) + datetime.timedelta(seconds=1) - + # Set up a sample DataFrame with job data + df = pd.DataFrame({ + "id": ["job_1"], + "backend_name": ["foo"], + "status": ["running"], + "running_start_time": ["2020-01-01T00:00:00Z"] + }) + + # Initialize the manager with the cancel_running_job_after parameter + cancel_after_seconds = 12 * 60 * 60 # 12 hours + manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds) + + # Mock the connection and job retrieval + mock_connection = mock.MagicMock() + mock_job = mock.MagicMock() + mock_job.describe.return_value = {"status": "running"} + manager._get_connection = mock.MagicMock(return_value=mock_connection) + mock_connection.job.return_value = mock_job + + # Set up the running start time and future time + job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ") + future_time = job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) + datetime.timedelta(seconds=1) + + # Replace _cancel_prolonged_job with a mock to track its calls + manager._cancel_prolonged_job = mock.MagicMock() + + # Travel to the future where the job has exceeded its allowed running time with time_machine.travel(future_time, tick=False): - manager._cancel_prolonged_job(job, row) - - # Verify that the stop method was called - job.stop.assert_called_once() - - + manager._track_statuses(df) - def test_cancel_prolonged_job_within_duration(self): - # Create mock BatchJob instance - job = mock.MagicMock() - job.job_id = "test_job_id" + # Verify that the _cancel_prolonged_job method was called with the correct job and row + manager._cancel_prolonged_job.assert_called_once - row = { - "running_start_time": "2020-01-01T00:00:00Z" - } - - # Initialize manager with the max_running_duration as seconds - max_running_duration_seconds = 12 * 60 * 60 # 12 hours - manager = MultiBackendJobManager(cancel_running_job_after=max_running_duration_seconds) - - # set up timestamps - job_running_timestamp = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) - future_time = job_running_timestamp + datetime.timedelta(seconds=max_running_duration_seconds) - datetime.timedelta(seconds=1) - - with time_machine.travel(future_time, tick=False): - manager._cancel_prolonged_job(job, row) - - # Verify that the stop method was not called - job.stop.assert_not_called() - - + def test_cancel_prolonged_job_within_duration(self): + # Set up a sample DataFrame with job data + df = pd.DataFrame({ + "id": ["job_1"], + "backend_name": ["foo"], + "status": ["running"], + "running_start_time": ["2020-01-01T00:00:00Z"] + }) + + # Initialize the manager with the cancel_running_job_after parameter + cancel_after_seconds = 12 * 60 * 60 # 12 hours + manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds) + + # Mock the connection and job retrieval + mock_connection = mock.MagicMock() + mock_job = mock.MagicMock() + mock_job.describe.return_value = {"status": "running"} + manager._get_connection = mock.MagicMock(return_value=mock_connection) + mock_connection.job.return_value = mock_job + + # Set up the running start time and future time + job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ") + future_time = job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) - datetime.timedelta(seconds=1) + + # Replace _cancel_prolonged_job with a mock to track its calls + manager._cancel_prolonged_job = mock.MagicMock() + + # Travel to the future where the job has exceeded its allowed running time + with time_machine.travel(future_time, tick=False): + manager._track_statuses(df) + + # Verify that the _cancel_prolonged_job method was called with the correct job and row + manager._cancel_prolonged_job.assert_not_called + class TestCsvJobDatabase: def test_read_wkt(self, tmp_path): wkt_df = pd.DataFrame( From 8fa5dfbd6e37685cef417e639683f648bd548239 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Thu, 22 Aug 2024 13:02:20 +0200 Subject: [PATCH 17/20] clean up --- openeo/extra/job_management.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 1e3b0507f..9082ae664 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -475,11 +475,9 @@ def on_job_error(self, job: BatchJob, row): def on_job_cancel(self, job: BatchJob, row): """ Handle a job that was cancelled. Can be overridden to provide custom behaviour. - Handle a job that was cancelled. Can be overridden to provide custom behaviour. Default implementation does not do anything. - :param job: The job that was canceled. :param job: The job that was canceled. :param row: DataFrame row containing the job's metadata. """ From 3d89e4d29838c164046f723e8bc7f8853b7e0ddb Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Thu, 22 Aug 2024 15:26:07 +0200 Subject: [PATCH 18/20] avoid infinite loop --- openeo/extra/job_management.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 9082ae664..c044f08f8 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -348,7 +348,7 @@ def run_jobs( & (df.status != "skipped") & (df.status != "start_failed") & (df.status != "error") - & (df.status != "canceled") + #& (df.status != "canceled") ].size > 0 ): @@ -415,7 +415,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "start_time"] = rfc3339.utcnow() + df.loc[i, "start_time"] = datetime.datetime.now().isoformat() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): @@ -519,7 +519,7 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: def _track_statuses(self, df: pd.DataFrame): """tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long""" - active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")] + active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "started") | (df.status == "running")] for i in active.index: job_id = df.loc[i, "id"] backend_name = df.loc[i, "backend_name"] @@ -534,22 +534,23 @@ def _track_statuses(self, df: pd.DataFrame): _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}") - if previous_status in {"created", "queued", "started"} and new_status == "running": - df.loc[i, "running_start_time"] = rfc3339.utcnow() - - if previous_status != "error" and new_status == "error": - self.on_job_error(the_job, df.loc[i]) - if new_status == "finished": + if job_metadata["status"] == "finished": self.on_job_done(the_job, df.loc[i]) + if previous_status != "error" and job_metadata["status"] == "error": + self.on_job_error(the_job, df.loc[i]) + + if previous_status in {"created", "started", "queued", "started"} and new_status == "running": + df.loc[i, "running_start_time"] = rfc3339.utcnow() + if new_status == "canceled": self.on_job_cancel(the_job, df.loc[i]) if self.cancel_running_job_after and new_status == "running": self._cancel_prolonged_job(the_job, df.loc[i]) - previous_status = new_status + df.loc[i, "status"] = new_status # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` for key in job_metadata.get("usage", {}).keys(): From eec293f5d545f54b3fa1af0c0bf463a83c1c9729 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Thu, 22 Aug 2024 15:29:26 +0200 Subject: [PATCH 19/20] uniformly use rfc3339.utcnow() --- openeo/extra/job_management.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index c044f08f8..9f9349522 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -415,7 +415,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "start_time"] = datetime.datetime.now().isoformat() + df.loc[i, "start_time"] = rfc3339.utcnow() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): From 335f85a425a1e821903f5664154d221a3a276cb3 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Thu, 22 Aug 2024 15:35:14 +0200 Subject: [PATCH 20/20] clean up --- openeo/extra/job_management.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 9f9349522..c8fff3cf9 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -348,7 +348,7 @@ def run_jobs( & (df.status != "skipped") & (df.status != "start_failed") & (df.status != "error") - #& (df.status != "canceled") + & (df.status != "canceled") ].size > 0 ): @@ -519,7 +519,7 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: def _track_statuses(self, df: pd.DataFrame): """tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long""" - active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "started") | (df.status == "running")] + active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")] for i in active.index: job_id = df.loc[i, "id"] backend_name = df.loc[i, "backend_name"] @@ -535,13 +535,13 @@ def _track_statuses(self, df: pd.DataFrame): _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}") - if job_metadata["status"] == "finished": + if new_status == "finished": self.on_job_done(the_job, df.loc[i]) - if previous_status != "error" and job_metadata["status"] == "error": + if previous_status != "error" and new_status == "error": self.on_job_error(the_job, df.loc[i]) - if previous_status in {"created", "started", "queued", "started"} and new_status == "running": + if previous_status in {"created", "queued"} and new_status == "running": df.loc[i, "running_start_time"] = rfc3339.utcnow() if new_status == "canceled":