diff --git a/CHANGELOG.md b/CHANGELOG.md index 7317db866..a15b0a0dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `load_stac`/`metadata_from_stac`: add support for extracting actual temporal dimension metadata ([#567](https://github.com/Open-EO/openeo-python-client/issues/567)) +- `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590)) ### Changed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 1c1bbb392..acdf90827 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -114,6 +114,7 @@ def __init__( self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = ".", + *, cancel_running_job_after: Optional[int] = None, ): """Create a MultiBackendJobManager. @@ -133,8 +134,11 @@ def __init__( - get_job_metadata_path :param cancel_running_job_after [seconds]: - A temporal limit for long running jobs to get automatically canceled. - The preset is None, which disables the feature. + Optional temporal limit (in seconds) after which running jobs should be canceled + by the job manager. + + .. versionchanged:: 0.32.0 + Added `cancel_running_job_after` parameter. """ self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep @@ -143,7 +147,7 @@ def __init__( # An explicit None or "" should also default to "." self._root_dir = Path(root_dir or ".") - self.cancel_running_job_after = ( + self._cancel_running_job_after = ( datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None ) @@ -242,7 +246,6 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: ("memory", None), ("duration", None), ("backend_name", None), - ("backend_name", None), ] new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} df = df.assign(**new_columns) @@ -483,18 +486,15 @@ def on_job_cancel(self, job: BatchJob, row): def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) - current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) - - if current_time > job_running_start_time + self.cancel_running_job_after: + elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time + if elapsed > self._cancel_running_job_after: try: _log.info( - f"Cancelling job {job.job_id} as it has been running for more than {self.cancel_running_job_after}" + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" ) - job.stop() - except OpenEoApiError as e: - _log.error(f"Error Cancelling long-running job {job.job_id}: {e}") + _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" @@ -515,21 +515,25 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: job_dir.mkdir(parents=True) def _track_statuses(self, df: pd.DataFrame): - """tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long""" + """ + Tracks status (and stats) of running jobs (in place). + Optionally cancels jobs when running too long. + """ active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")] for i in active.index: job_id = df.loc[i, "id"] backend_name = df.loc[i, "backend_name"] + previous_status = df.loc[i, "status"] try: con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() - - previous_status = df.loc[i, "status"] new_status = job_metadata["status"] - _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}") + _log.info( + f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})" + ) if new_status == "finished": self.on_job_done(the_job, df.loc[i]) @@ -543,7 +547,7 @@ def _track_statuses(self, df: pd.DataFrame): if new_status == "canceled": self.on_job_cancel(the_job, df.loc[i]) - if self.cancel_running_job_after and new_status == "running": + if self._cancel_running_job_after and new_status == "running": self._cancel_prolonged_job(the_job, df.loc[i]) df.loc[i, "status"] = new_status