Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include cancelling long running jobs #596

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 81 additions & 31 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

from openeo import BatchJob, Connection
from openeo.rest import OpenEoApiError
from openeo.util import deep_get
from openeo.util import deep_get, rfc3339

_log = logging.getLogger(__name__)


class _Backend(NamedTuple):
"""Container for backend info/settings"""

Expand All @@ -30,6 +29,7 @@ class _Backend(NamedTuple):

MAX_RETRIES = 5


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -76,7 +76,10 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
cancel_running_job_after: Optional[int] = None,
):
"""Create a MultiBackendJobManager.

Expand All @@ -93,6 +96,10 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path

:param cancel_running_job_after [seconds]:
A temporal limit for long running jobs to get automatically canceled.
The preset is None, which disables the feature.
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -101,6 +108,11 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)


def add_backend(
self,
name: str,
Expand All @@ -125,9 +137,7 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(
get_connection=connection, parallel_jobs=parallel_jobs
)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -189,14 +199,15 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("start_time", None),
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
("running_start_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_update_statuses`.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
("backend_name", None)
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)
Expand Down Expand Up @@ -277,30 +288,26 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0
):

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
Expand Down Expand Up @@ -348,7 +355,7 @@ def _launch_job(self, start_job, df, i, backend_name):
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
df.loc[i, "status"] = "start_failed"
else:
df.loc[i, "start_time"] = datetime.datetime.now().isoformat()
df.loc[i, "start_time"] = rfc3339.utcnow()
if job:
df.loc[i, "id"] = job.job_id
with ignore_connection_errors(context="get status"):
Expand Down Expand Up @@ -386,6 +393,7 @@ def on_job_done(self, job: BatchJob, row):
with open(metadata_path, "w") as f:
json.dump(job_metadata, f, ensure_ascii=False)


def on_job_error(self, job: BatchJob, row):
"""
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Expand All @@ -404,6 +412,34 @@ def on_job_error(self, job: BatchJob, row):
self.ensure_job_dir_exists(job.job_id)
error_log_path.write_text(json.dumps(error_logs, indent=2))

def on_job_cancel(self, job: BatchJob, row):
"""
Handles jobs that that were cancelled. Can be overridden to provide custom behaviour.
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

Default implementation does not do anything.

:param job: The job that has finished.
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
:param row: DataFrame row containing the job's metadata.
"""
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
pass

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)


if current_time > job_running_start_time + self.cancel_running_job_after:
try:
print(str(self.cancel_running_job_after))
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
_log.info(
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
f"Cancelling job {job.job_id} as it has been running for more than {str(self.cancel_running_job_after)}"
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
)
job.stop()

except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand All @@ -422,13 +458,10 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]

def _track_statuses(self, df: pd.DataFrame):
"""tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
Expand All @@ -437,15 +470,29 @@ def _update_statuses(self, df: pd.DataFrame):
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
)
if job_metadata["status"] == "finished":
self.on_job_done(the_job, df.loc[i])

previous_status = df.loc[i, "status"]
new_status = job_metadata["status"]

_log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}")

if previous_status in {"created", "queued", "started"} and new_status == "running":
df.loc[i, "running_start_time"] = rfc3339.utcnow()

if self.cancel_running_job_after and job_metadata["status"] == "running":
self._cancel_prolonged_job(the_job, df.loc[i])
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_error(the_job, df.loc[i])

if job_metadata["status"] == "finished":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_done(the_job, df.loc[i])

if job_metadata["status"] == "canceled":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_cancel(the_job, df.loc[i])

df.loc[i, "status"] = job_metadata["status"]

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
Expand Down Expand Up @@ -501,6 +548,7 @@ def read(self) -> pd.DataFrame:
):
df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
return df

def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(self.path, index=False)
Expand All @@ -516,3 +564,5 @@ def read(self) -> pd.DataFrame:
def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(self.path, index=False)


Loading