Skip to content

Commit

Permalink
Add to Job and ClusterUtilization APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
epwalsh committed Jun 13, 2024
1 parent eb3745f commit 050c70b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ use patch releases for compatibility fixes instead.

## Unreleased

### Added

- Added `Job.is_preemptible` property.
- Added `Job.is_running` property.
- Added `Job.is_queued` property.
- Added `ClusterUtilization.jobs` field.

## [v1.27.2](https://github.com/allenai/beaker-py/releases/tag/v1.27.2) - 2024-05-31

### Added
Expand Down
2 changes: 2 additions & 0 deletions beaker/data_model/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional, Tuple

from .base import BaseModel, StrEnum, field_validator
from .job import Job
from .node import NodeResources, NodeUtilization

__all__ = ["ClusterStatus", "Cluster", "ClusterUtilization", "ClusterSpec", "ClusterPatch"]
Expand Down Expand Up @@ -77,6 +78,7 @@ class ClusterUtilization(BaseModel):
queued_jobs: int
running_preemptible_jobs: int
nodes: Tuple[NodeUtilization, ...]
jobs: Tuple[Job, ...]

@property
def id(self) -> str:
Expand Down
12 changes: 12 additions & 0 deletions beaker/data_model/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,25 @@ def is_done(self) -> bool:
"""
return self.status.current == CurrentJobStatus.finalized

@property
def is_running(self) -> bool:
return self.status.current in (CurrentJobStatus.running, CurrentJobStatus.idle)

@property
def is_queued(self) -> bool:
return self.status.current == CurrentJobStatus.created

@property
def was_preempted(self) -> bool:
return self.status.canceled is not None and self.status.canceled_code in {
CanceledCode.system_preemption,
CanceledCode.user_preemption,
}

@property
def is_preemptible(self) -> bool:
return self.preemptible or (self.priority == Priority.preemptible)

@property
def priority(self) -> Optional[Priority]:
"""
Expand Down
12 changes: 8 additions & 4 deletions beaker/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:
running_jobs = 0
queued_jobs = 0
running_preemptible_jobs = 0
jobs: List[Job] = []
node_to_util: Dict[str, Dict[str, Union[int, float]]] = {
node.id: {
"running_jobs": 0,
Expand All @@ -219,22 +220,24 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:
}

for job in self.beaker.job.list(cluster=cluster, finalized=False):
if job.status.current in (CurrentJobStatus.running, CurrentJobStatus.idle):
if job.is_running:
if job.node not in node_to_util:
continue
running_jobs += 1
if job.priority == Priority.preemptible or job.preemptible:
if job.is_preemptible:
running_preemptible_jobs += 1
elif job.status.current == CurrentJobStatus.created:
elif job.is_queued:
queued_jobs += 1

jobs.append(job)

if job.node is not None:
if job.node not in node_to_util:
continue # unlikely

node_util = node_to_util[job.node]
node_util["running_jobs"] += 1
if job.priority == Priority.preemptible or job.preemptible:
if job.is_preemptible:
node_util["running_preemptible_jobs"] += 1
if job.limits is not None:
if job.limits.gpus is not None:
Expand Down Expand Up @@ -285,6 +288,7 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:
queued_jobs=queued_jobs,
running_preemptible_jobs=running_preemptible_jobs,
nodes=tuple(node_utilizations),
jobs=tuple(jobs),
)

def filter_available(
Expand Down

0 comments on commit 050c70b

Please sign in to comment.