Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include cancelling long running jobs #596

Closed
wants to merge 22 commits into from
Closed
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from openeo.rest import OpenEoApiError
from openeo.util import deep_get


_log = logging.getLogger(__name__)


Expand All @@ -30,6 +31,7 @@ class _Backend(NamedTuple):

MAX_RETRIES = 5


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -76,7 +78,10 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
max_running_duration: Optional[int] = 12 * 60 * 60,
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
):
"""Create a MultiBackendJobManager.

Expand All @@ -93,6 +98,10 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path

:param max_running_duration [seconds]:
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
A temporal limit for long running jobs to get automatically canceled.
The preset duration 12 hours
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -101,6 +110,8 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.max_running_duration = datetime.timedelta(seconds=max_running_duration)

def add_backend(
self,
name: str,
Expand All @@ -125,9 +136,7 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(
get_connection=connection, parallel_jobs=parallel_jobs
)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -192,11 +201,13 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
("start_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_update_statuses`.
# However, they are through assumptions about job "usage" metadata in `_tracks_statuses`.
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
("backend_name", None),
("costs", None)
soxofaan marked this conversation as resolved.
Show resolved Hide resolved

]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)
Expand Down Expand Up @@ -277,30 +288,26 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0
):

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
self._tracks_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
Expand Down Expand Up @@ -422,13 +429,10 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]

def _tracks_statuses(self, df: pd.DataFrame):
"""tracks status (and stats) of running jobs (in place). Cancels jobs when running too long"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
Expand All @@ -437,15 +441,38 @@ def _update_statuses(self, df: pd.DataFrame):
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
)
_log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}")

if job_metadata["status"] == "running":

timezone = datetime.timezone.utc
job_time_created = datetime.datetime.strptime(the_job.describe()["created"], "%Y-%m-%dT%H:%M:%SZ")
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
job_time_created = job_time_created.replace(tzinfo=timezone)

if datetime.datetime.now(timezone) > job_time_created + self.max_running_duration:
try:
_log.info(
f"Cancelling job {job_id} on backend {backend_name} as it has been running for more than {self.max_running_duration} minutes"
)
the_job.stop_job()
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_error(the_job, df.loc[i])
soxofaan marked this conversation as resolved.
Show resolved Hide resolved

except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job_id!r} on backend {backend_name}: {e}")


if job_metadata["status"] == "finished":
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_done(the_job, df.loc[i])
try:
df.loc[i, "costs"] = job_metadata["costs"]
except:
pass
soxofaan marked this conversation as resolved.
Show resolved Hide resolved

if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":
self.on_job_error(the_job, df.loc[i])

df.loc[i, "status"] = job_metadata["status"]

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
Expand Down Expand Up @@ -501,6 +528,7 @@ def read(self) -> pd.DataFrame:
):
df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
return df

def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(self.path, index=False)
Expand Down
Loading