Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include cancelling long running jobs #596

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 76 additions & 31 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import contextlib
import datetime
from datetime import datetime, timedelta, timezone
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
import json
import logging
import time
Expand All @@ -14,7 +14,8 @@

from openeo import BatchJob, Connection
from openeo.rest import OpenEoApiError
from openeo.util import deep_get
from openeo.util import deep_get, rfc3339


_log = logging.getLogger(__name__)

Expand All @@ -30,6 +31,7 @@ class _Backend(NamedTuple):

MAX_RETRIES = 5


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -76,7 +78,10 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
max_running_duration: Optional[int] = None,
):
"""Create a MultiBackendJobManager.

Expand All @@ -93,6 +98,10 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path

:param max_running_duration [seconds]:
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
A temporal limit for long running jobs to get automatically canceled.
The preset duration 12 hours. Can be set to None to disable
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -101,6 +110,11 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.max_running_duration = (
timedelta(seconds=max_running_duration) if max_running_duration is not None else None
)


def add_backend(
self,
name: str,
Expand All @@ -125,9 +139,7 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(
get_connection=connection, parallel_jobs=parallel_jobs
)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -184,19 +196,21 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
pass
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

# check for some required columns.
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("start_time", None),
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
("running_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_update_statuses`.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
("backend_name", None)
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)
Expand Down Expand Up @@ -277,30 +291,26 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0
):

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
Expand Down Expand Up @@ -348,7 +358,7 @@ def _launch_job(self, start_job, df, i, backend_name):
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
df.loc[i, "status"] = "start_failed"
else:
df.loc[i, "start_time"] = datetime.datetime.now().isoformat()
df.loc[i, "start_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
if job:
df.loc[i, "id"] = job.job_id
with ignore_connection_errors(context="get status"):
Expand Down Expand Up @@ -386,6 +396,7 @@ def on_job_done(self, job: BatchJob, row):
with open(metadata_path, "w") as f:
json.dump(job_metadata, f, ensure_ascii=False)


def on_job_error(self, job: BatchJob, row):
"""
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Expand All @@ -404,6 +415,32 @@ def on_job_error(self, job: BatchJob, row):
self.ensure_job_dir_exists(job.job_id)
error_log_path.write_text(json.dumps(error_logs, indent=2))

def on_job_cancel(self, job: BatchJob, row):
"""
Handles jobs that that were cancelled. Can be overridden to provide custom behaviour.
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

Default implementation does not do anything.

:param job: The job that has finished.
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
:param row: DataFrame row containing the job's metadata.
"""
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?
HansVRP marked this conversation as resolved.
Show resolved Hide resolved


def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone = True)
current_time = datetime.now(timezone.utc)

if current_time > job_running_timestamp + self.max_running_duration:
try:
job.stop()
_log.info(
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
f"Cancelling job {job.job_id} as it has been running for more than {self.max_running_duration}"
)
except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand All @@ -422,13 +459,10 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]

def _track_statuses(self, df: pd.DataFrame):
"""tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
Expand All @@ -437,15 +471,25 @@ def _update_statuses(self, df: pd.DataFrame):
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
)
if job_metadata["status"] == "finished":
self.on_job_done(the_job, df.loc[i])
_log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}")

if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued" or df.loc[i, "status"] == "started") and job_metadata["status"] == "running":
df.loc[i, "running_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

if self.max_running_duration and job_metadata["status"] == "running":
self._cancel_prolonged_job(the_job, df.loc[i])
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_error(the_job, df.loc[i])

if job_metadata["status"] == "finished":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_done(the_job, df.loc[i])

if job_metadata["status"] == "canceled":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_cancel(the_job, df.loc[i])

df.loc[i, "status"] = job_metadata["status"]

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
Expand Down Expand Up @@ -501,6 +545,7 @@ def read(self) -> pd.DataFrame:
):
df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
return df

def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(self.path, index=False)
Expand Down
Loading