diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 2820a1a12..c8fff3cf9 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -16,11 +16,10 @@ from openeo import BatchJob, Connection from openeo.rest import OpenEoApiError -from openeo.util import deep_get +from openeo.util import deep_get, rfc3339 _log = logging.getLogger(__name__) - class _Backend(NamedTuple): """Container for backend info/settings""" @@ -33,6 +32,7 @@ class _Backend(NamedTuple): MAX_RETRIES = 5 + class JobDatabaseInterface(metaclass=abc.ABCMeta): """ Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`, @@ -112,7 +112,10 @@ def start_job( """ def __init__( - self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "." + self, + poll_sleep: int = 60, + root_dir: Optional[Union[str, Path]] = ".", + cancel_running_job_after: Optional[int] = None, ): """Create a MultiBackendJobManager. @@ -129,6 +132,10 @@ def __init__( - get_job_dir - get_error_log_path - get_job_metadata_path + + :param cancel_running_job_after [seconds]: + A temporal limit for long running jobs to get automatically canceled. + The preset is None, which disables the feature. """ self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep @@ -137,6 +144,11 @@ def __init__( # An explicit None or "" should also default to "." self._root_dir = Path(root_dir or ".") + self.cancel_running_job_after = ( + datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None + ) + + def add_backend( self, name: str, @@ -161,9 +173,7 @@ def add_backend( c = connection connection = lambda: c assert callable(connection) - self.backends[name] = _Backend( - get_connection=connection, parallel_jobs=parallel_jobs - ) + self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs) def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection: """Get a connection for the backend and optionally make it resilient (adds retry behavior) @@ -226,13 +236,15 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: ("status", "not_started"), ("id", None), ("start_time", None), + ("running_start_time", None), # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. - # However, they are through assumptions about job "usage" metadata in `_update_statuses`. + # However, they are through assumptions about job "usage" metadata in `_track_statuses`. ("cpu", None), ("memory", None), ("duration", None), ("backend_name", None), + ("backend_name", None), ] new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} df = df.assign(**new_columns) @@ -336,30 +348,26 @@ def run_jobs( & (df.status != "skipped") & (df.status != "start_failed") & (df.status != "error") + & (df.status != "canceled") ].size > 0 ): + with ignore_connection_errors(context="get statuses"): - self._update_statuses(df) + self._track_statuses(df) status_histogram = df.groupby("status").size().to_dict() _log.info(f"Status histogram: {status_histogram}") job_db.persist(df) if len(df[df.status == "not_started"]) > 0: # Check number of jobs running at each backend - running = df[ - (df.status == "created") - | (df.status == "queued") - | (df.status == "running") - ] + running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")] per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") for backend_name in self.backends: backend_load = per_backend.get(backend_name, 0) if backend_load < self.backends[backend_name].parallel_jobs: - to_add = ( - self.backends[backend_name].parallel_jobs - backend_load - ) + to_add = self.backends[backend_name].parallel_jobs - backend_load to_launch = df[df.status == "not_started"].iloc[0:to_add] for i in to_launch.index: self._launch_job(start_job, df, i, backend_name) @@ -407,7 +415,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" else: - df.loc[i, "start_time"] = datetime.datetime.now().isoformat() + df.loc[i, "start_time"] = rfc3339.utcnow() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): @@ -445,6 +453,7 @@ def on_job_done(self, job: BatchJob, row): with open(metadata_path, "w") as f: json.dump(job_metadata, f, ensure_ascii=False) + def on_job_error(self, job: BatchJob, row): """ Handles jobs that stopped with errors. Can be overridden to provide custom behaviour. @@ -463,6 +472,32 @@ def on_job_error(self, job: BatchJob, row): self.ensure_job_dir_exists(job.job_id) error_log_path.write_text(json.dumps(error_logs, indent=2)) + def on_job_cancel(self, job: BatchJob, row): + """ + Handle a job that was cancelled. Can be overridden to provide custom behaviour. + + Default implementation does not do anything. + + :param job: The job that was canceled. + :param row: DataFrame row containing the job's metadata. + """ + pass + + def _cancel_prolonged_job(self, job: BatchJob, row): + """Cancel the job if it has been running for too long.""" + job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) + current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) + + if current_time > job_running_start_time + self.cancel_running_job_after: + try: + _log.info( + f"Cancelling job {job.job_id} as it has been running for more than {self.cancel_running_job_after}") + + job.stop() + + except OpenEoApiError as e: + _log.error(f"Error Cancelling long-running job {job.job_id}: {e}") + def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" return self._root_dir / f"job_{job_id}" @@ -481,13 +516,10 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) - def _update_statuses(self, df: pd.DataFrame): - """Update status (and stats) of running jobs (in place).""" - active = df.loc[ - (df.status == "created") - | (df.status == "queued") - | (df.status == "running") - ] + + def _track_statuses(self, df: pd.DataFrame): + """tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long""" + active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")] for i in active.index: job_id = df.loc[i, "id"] backend_name = df.loc[i, "backend_name"] @@ -496,15 +528,30 @@ def _update_statuses(self, df: pd.DataFrame): con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() - _log.info( - f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}" - ) - if job_metadata["status"] == "finished": + + previous_status = df.loc[i, "status"] + new_status = job_metadata["status"] + + _log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}") + + + if new_status == "finished": self.on_job_done(the_job, df.loc[i]) - if df.loc[i, "status"] != "error" and job_metadata["status"] == "error": + + if previous_status != "error" and new_status == "error": self.on_job_error(the_job, df.loc[i]) + + if previous_status in {"created", "queued"} and new_status == "running": + df.loc[i, "running_start_time"] = rfc3339.utcnow() + + if new_status == "canceled": + self.on_job_cancel(the_job, df.loc[i]) + + if self.cancel_running_job_after and new_status == "running": + self._cancel_prolonged_job(the_job, df.loc[i]) + + df.loc[i, "status"] = new_status - df.loc[i, "status"] = job_metadata["status"] # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` for key in job_metadata.get("usage", {}).keys(): df.loc[i, key] = _format_usage_stat(job_metadata, key) @@ -617,3 +664,6 @@ def read(self) -> pd.DataFrame: def persist(self, df: pd.DataFrame): self.path.parent.mkdir(parents=True, exist_ok=True) df.to_parquet(self.path, index=False) + + + diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index cbae44cd1..4316c1aa4 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -2,6 +2,10 @@ import textwrap import threading from unittest import mock +import datetime +import time_machine +from openeo.util import rfc3339 +import time import geopandas @@ -26,13 +30,19 @@ MultiBackendJobManager, ParquetJobDatabase, ) +from openeo.rest import OpenEoApiError + + class TestMultiBackendJobManager: + + @pytest.fixture def sleep_mock(self): with mock.patch("time.sleep") as sleep: yield sleep + def test_basic(self, tmp_path, requests_mock, sleep_mock): requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) @@ -79,9 +89,7 @@ def mock_job_status(job_id, queued=1, running=2): # It also needs the job results endpoint, though that can be a dummy implementation. # When the job is finished the system tries to download the results and that is what # needs this endpoint. - requests_mock.get( - f"{backend}/jobs/{job_id}/results", json={"links": []} - ) + requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) mock_job_status("job-2018", queued=1, running=2) mock_job_status("job-2019", queued=2, running=3) @@ -136,6 +144,7 @@ def test_normalize_df(self): "status", "id", "start_time", + "running_start_time", "cpu", "memory", "duration", @@ -274,7 +283,6 @@ def start_worker_thread(): metadata_path = manager.get_job_metadata_path(job_id="job-2021") assert metadata_path.exists() - def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" requests_mock.get(backend, json={"api_version": "1.1.0"}) @@ -287,9 +295,7 @@ def test_on_error_log(self, tmp_path, requests_mock): "message": "Test that error handling works", } ] - requests_mock.get( - f"{backend}/jobs/{job_id}/logs", json={"logs": errors_log_lines} - ) + requests_mock.get(f"{backend}/jobs/{job_id}/logs", json={"logs": errors_log_lines}) root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) @@ -308,12 +314,9 @@ def test_on_error_log(self, tmp_path, requests_mock): contents = error_log_path.read_text() assert json.loads(contents) == errors_log_lines - @httpretty.activate(allow_net_connect=False, verbose=True) @pytest.mark.parametrize("http_error_status", [502, 503, 504]) - def test_is_resilient_to_backend_failures( - self, tmp_path, http_error_status, sleep_mock - ): + def test_is_resilient_to_backend_failures(self, tmp_path, http_error_status, sleep_mock): """ Our job should still succeed when the backend request succeeds eventually, after first failing the maximum allowed number of retries. @@ -333,9 +336,7 @@ def test_is_resilient_to_backend_failures( backend = "http://foo.test" job_id = "job-2018" - httpretty.register_uri( - "GET", backend, body=json.dumps({"api_version": "1.1.0"}) - ) + httpretty.register_uri("GET", backend, body=json.dumps({"api_version": "1.1.0"})) # First fail the max times the connection should retry, then succeed. after that response_list = [ @@ -352,9 +353,7 @@ def test_is_resilient_to_backend_failures( ) ) ] - httpretty.register_uri( - "GET", f"{backend}/jobs/{job_id}", responses=response_list - ) + httpretty.register_uri("GET", f"{backend}/jobs/{job_id}", responses=response_list) root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) @@ -384,9 +383,7 @@ def start_job(row, connection_provider, connection, **kwargs): @httpretty.activate(allow_net_connect=False, verbose=True) @pytest.mark.parametrize("http_error_status", [502, 503, 504]) - def test_resilient_backend_reports_error_when_max_retries_exceeded( - self, tmp_path, http_error_status, sleep_mock - ): + def test_resilient_backend_reports_error_when_max_retries_exceeded(self, tmp_path, http_error_status, sleep_mock): """We should get a RetryError when the backend request fails more times than the maximum allowed number of retries. Goal of the test is only to see that retrying is effectively executed. @@ -404,9 +401,7 @@ def test_resilient_backend_reports_error_when_max_retries_exceeded( backend = "http://foo.test" job_id = "job-2018" - httpretty.register_uri( - "GET", backend, body=json.dumps({"api_version": "1.1.0"}) - ) + httpretty.register_uri("GET", backend, body=json.dumps({"api_version": "1.1.0"})) # Fail one more time than the max allow retries. # But do add one successful request at the start, to simulate that the job was @@ -427,9 +422,7 @@ def test_resilient_backend_reports_error_when_max_retries_exceeded( MAX_RETRIES + 1 ) - httpretty.register_uri( - "GET", f"{backend}/jobs/{job_id}", responses=response_list - ) + httpretty.register_uri("GET", f"{backend}/jobs/{job_id}", responses=response_list) root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) @@ -459,6 +452,73 @@ def start_job(row, connection_provider, connection, **kwargs): assert set(result.status) == {"running"} assert set(result.backend_name) == {"foo"} + def test_cancel_prolonged_job_exceeds_duration(self): + # Set up a sample DataFrame with job data + df = pd.DataFrame({ + "id": ["job_1"], + "backend_name": ["foo"], + "status": ["running"], + "running_start_time": ["2020-01-01T00:00:00Z"] + }) + + # Initialize the manager with the cancel_running_job_after parameter + cancel_after_seconds = 12 * 60 * 60 # 12 hours + manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds) + + # Mock the connection and job retrieval + mock_connection = mock.MagicMock() + mock_job = mock.MagicMock() + mock_job.describe.return_value = {"status": "running"} + manager._get_connection = mock.MagicMock(return_value=mock_connection) + mock_connection.job.return_value = mock_job + + # Set up the running start time and future time + job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ") + future_time = job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) + datetime.timedelta(seconds=1) + + # Replace _cancel_prolonged_job with a mock to track its calls + manager._cancel_prolonged_job = mock.MagicMock() + + # Travel to the future where the job has exceeded its allowed running time + with time_machine.travel(future_time, tick=False): + manager._track_statuses(df) + + # Verify that the _cancel_prolonged_job method was called with the correct job and row + manager._cancel_prolonged_job.assert_called_once + + def test_cancel_prolonged_job_within_duration(self): + # Set up a sample DataFrame with job data + df = pd.DataFrame({ + "id": ["job_1"], + "backend_name": ["foo"], + "status": ["running"], + "running_start_time": ["2020-01-01T00:00:00Z"] + }) + + # Initialize the manager with the cancel_running_job_after parameter + cancel_after_seconds = 12 * 60 * 60 # 12 hours + manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds) + + # Mock the connection and job retrieval + mock_connection = mock.MagicMock() + mock_job = mock.MagicMock() + mock_job.describe.return_value = {"status": "running"} + manager._get_connection = mock.MagicMock(return_value=mock_connection) + mock_connection.job.return_value = mock_job + + # Set up the running start time and future time + job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ") + future_time = job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) - datetime.timedelta(seconds=1) + + # Replace _cancel_prolonged_job with a mock to track its calls + manager._cancel_prolonged_job = mock.MagicMock() + + # Travel to the future where the job has exceeded its allowed running time + with time_machine.travel(future_time, tick=False): + manager._track_statuses(df) + + # Verify that the _cancel_prolonged_job method was called with the correct job and row + manager._cancel_prolonged_job.assert_not_called JOB_DB_DF_BASICS = pd.DataFrame( {