Skip to content

Commit

Permalink
Merge pull request #33 from CAAI/dev/cleanup_rhprocess
Browse files Browse the repository at this point in the history
UI and Manager update
  • Loading branch information
ChristianHinge authored Mar 21, 2024
2 parents eb07c7c + afbda6f commit fdf1312
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 49 deletions.
47 changes: 22 additions & 25 deletions nodes/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from rhnode.common import QueueRequest, NodeMetaData
from dotenv import load_dotenv
from rhnode.version import __version__
import time

# load env variables from .env file if it exists
load_dotenv()
Expand Down Expand Up @@ -38,16 +39,23 @@ def add_job(
or required_memory > self.memory_max
):
raise ValueError("Job requirements exceed available resources.")

heapq.heappush(
self.job_queue,
(-priority, job_id, required_gpu_mem, required_threads, required_memory),
(
-priority,
time.time(),
job_id,
required_gpu_mem,
required_threads,
required_memory,
),
)
self.process_queue()

def process_queue(self):
while self.job_queue:
(
_,
_,
job_id,
required_gpu_mem,
Expand Down Expand Up @@ -155,22 +163,20 @@ def get_active_jobs_info(self):
return active_jobs_info

def get_queued_jobs_info(self):
return {
"queued_jobs": [
{
"priority": job[0] * -1,
"job_id": job[1],
"required_gpu_mem": job[2],
"required_threads": job[3],
"required_memory": job[4],
}
for job in self.job_queue
]
}
return [
{
"priority": job[0] * -1,
"job_id": job[1],
"required_gpu_mem": job[2],
"required_threads": job[3],
"required_memory": job[4],
}
for job in self.job_queue
]

def get_load(self):
active_jobs_info = self.get_active_jobs_info()
queued_jobs_info = self.get_queued_jobs_info()["queued_jobs"]
queued_jobs_info = self.get_queued_jobs_info()
sum_gpu_mem = (
sum(self.gpu_devices_mem_max)
if isinstance(self.gpu_devices_mem_max, list)
Expand Down Expand Up @@ -331,16 +337,7 @@ async def redirect_to_manager(request: Request):
@self.get("/manager")
async def resource_queue(request: Request):
active_jobs = self.queue.get_active_jobs_info()
queued_jobs = [
{
"priority": job[0] * -1,
"job_id": job[1],
"required_gpu_mem": job[2],
"required_threads": job[3],
"required_memory": job[4],
}
for job in self.queue.job_queue
]
queued_jobs = self.queue.get_queued_jobs_info()
available_resources = self.queue.get_resource_info()

for active_job in active_jobs:
Expand Down
2 changes: 1 addition & 1 deletion rhnode/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
try:
from .rhnode import RHNode
from .rhjob import RHJob
from .rhjob import RHJob, MultiJobRunner

except ImportError:
pass
Expand Down
56 changes: 46 additions & 10 deletions rhnode/resources/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,78 @@
{% block body %}

<body>
<h3 class = "mt-5">Processes</h1>
<h3 class = "mt-5">Active jobs</h1>
<table class="table">
<thead>
<tr>
<th>ID</th>
<th>Status</th>
<th>Priority</th>
<th>Created</th>
<th>Runtime</th>
</tr>
</thead>
<tbody>
{% for item in tasks %}
{% set valid = True %}
{% if item.status == 'Preparing' %}
<tr class="table-primary">
{% elif item.status == 'Queued' %}
<tr class="table-info">
{% elif item.status == 'Running' %}
<tr class="table-success">
{% elif item.status == 'Finished' %}
<tr class="table-default">
{% elif item.status == 'Error' %}
<tr class="table-danger">
{% elif item.status == 'Cancelling' %}
<tr class="table-warning">
{% elif item.status == 'Cancelled' %}
<tr class="table-danger">
<tr class="table-warning">">
{% else %}
<tr">
{% set valid = False %}
{% endif %}
<td><a href = "{{item.href}}">{{ item.task_id }}</a></td>
{% if valid %}
<td><a href = "{{item.href}}">{{ item.task_id }}</a></td>
<td>{{ item.status }}</td>
<td>{{ item.priority }}</td>
<td>{{ item.date }}</td>
<td>{{ item.runtime }}</td>
</tr>
{% endif %}
{% endfor %}
</tbody>
</table>

<h3 class = "mt-5">Past jobs</h1>
<table class="table">
<thead>
<tr>
<th>ID</th>
<th>Status</th>
<th>Priority</th>
<th>Created</th>
<th>Runtime</th>
</tr>
</thead>
<tbody>
{% for item in tasks %}
{% set valid = True %}
{% if item.status == 'Finished' %}
<tr class="table-default">
{% elif item.status == 'Error' %}
<tr class="table-danger">
{% elif item.status == 'Cancelled' %}
<tr class="table-danger">
{% else %}
{% set valid = False %}
{% endif %}
{% if valid %}
<td><a href = "{{item.href}}">{{ item.task_id }}</a></td>
<td>{{ item.status }}</td>
<td>{{ item.priority }}</td>
<td>{{ item.date }}</td>
<td>{{ item.runtime }}</td>
</tr>
{% endif %}
{% endfor %}
</tbody>
</table>



{% endblock %}
19 changes: 18 additions & 1 deletion rhnode/resources/templates/task.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
{% block body %}
<h3 class = "mt-5">ID: {{queue_id}}</h1>
<h4 class = "mt-5">{{queue_status}}</h1>

<form method="POST" action="/{{default_context.node_name}}/jobs/{{queue_id}}/stop">
<button type="submit" class="btn btn-danger">End job</button>
</form>
{% if outputs is not none %}
<h3>Outputs</h3>
<table class="table">
<thead>
<tr>
Expand All @@ -31,4 +31,21 @@ <h4 class = "mt-5">{{queue_status}}</h1>
</tbody>
</table>
{% endif %}
<h3>Info</h3>
<table class="table">
<thead>
<tr>
<th>field</th>
<th>value</th>
</tr>
</thead>
<tbody>
{% for item in info %}
<tr>
<td>{{ item.name }}</td>
<td>{{ item.val }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}
66 changes: 65 additions & 1 deletion rhnode/rhjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,68 @@
from requests.exceptions import HTTPError


class MultiJobRunner:
"""
A class to distribute massive amounts of jobs on different RHnode clusters.
"""

def __init__(
self, jobs, on_job_finish_handle=None, queue_length=16, skip_on_error=True
):
self.skip_on_error = skip_on_error
self.jobs = jobs
self.on_job_finish_handle = on_job_finish_handle
self.started_jobs = {}
self.queue_length = queue_length
self.n_total_jobs = len(jobs)

def _finish_job(self, job):
try:
outputs = job.wait_for_finish()
if self.on_job_finish_handle is not None:
inputs = job.input_data
self.on_job_finish_handle(inputs, outputs)
except (JobFailedError, JobCancelledError) as error:
if self.skip_on_error:
print(
"job with inputs",
str(job.input_data),
"encountered an error or was cancelled, ignoring",
)
else:
raise

def _check_and_update_active_jobs(self):
IDs_to_remove = []
for i, (ID, job) in enumerate(self.started_jobs.items()):
if JobStatus(job._get_status()) in [
JobStatus.Finished,
JobStatus.Error,
JobStatus.Cancelled,
]:
self._finish_job(self.started_jobs[ID])
IDs_to_remove.append(ID)

for ID in IDs_to_remove:
remaining_jobs = len(self.jobs) + len(self.started_jobs)
print(
"Finished job:", ID, f"completed:{remaining_jobs}/{self.n_total_jobs}"
)
del self.started_jobs[ID]

while len(self.jobs) > 0 and len(self.started_jobs) <= self.queue_length:
new_job = self.jobs.pop(0)
new_job.start()
self.started_jobs[new_job.ID] = new_job
time.sleep(0.5)

def start(self):
print(f"Start MultiRunner with {self.n_total_jobs} jobs")
while len(self.jobs) > 0 or len(self.started_jobs) > 0:
self._check_and_update_active_jobs()
time.sleep(10)


class RHJob:
def __init__(
self,
Expand Down Expand Up @@ -296,6 +358,7 @@ def wait_for_finish(self):
time.sleep(10)
elif JobStatus(status) == JobStatus.Running:
time.sleep(4)
##TODO what if status is Cancelling or other unhandled? Is server spammed with get_status requests?

for key, value in output.items():
if isinstance(value, str) and "/download/" in value:
Expand All @@ -311,11 +374,12 @@ def wait_for_finish(self):
.split("=")[1]
.replace('"', "")
)
self._maybe_make_output_directory(output_path)
fname = Path(os.path.join(output_path, fname)).absolute()
else:
fname = Path(self.output_data[key]).absolute()

self._maybe_make_output_directory(str(fname.parent.absolute()))

with open(fname, "wb") as f:
f.write(response.content)
output[key] = fname
Expand Down
1 change: 1 addition & 0 deletions rhnode/rhnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .version import __version__

MANAGER_URL = "http://manager:8000/manager"
MANAGER_URL = "http://localhost:8000/manager"


class RHNode(ABC, FastAPI):
Expand Down
23 changes: 23 additions & 0 deletions rhnode/rhprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from contextlib import contextmanager
import time
from pydantic import ValidationError
from datetime import datetime


class RHProcess:
Expand Down Expand Up @@ -40,6 +41,8 @@ def __init__(
self.error = None
self.time_created = time.time()
self.time_last_accessed = None
self.time_started = None
self.time_finished = None
self.output = None
self.input = inputs_no_files

Expand Down Expand Up @@ -157,6 +160,7 @@ async def _maybe_wait_for_resources(self, job):

await asyncio.sleep(3)
try:
# print("Job,"+job.id)
yield gpu_id
finally:
if queue_id:
Expand Down Expand Up @@ -206,6 +210,20 @@ def is_ready_to_run(self):
except ValidationError:
return False

def get_runtime_str(self):
if self.time_started is None:
return None
else:
# convert timestamps to datetime object
dt1 = datetime.fromtimestamp(self.time_created)
if self.time_finished is None:
dt2 = datetime.fromtimestamp(time.time())
else:
dt2 = datetime.fromtimestamp(self.time_started)

delta = dt2 - dt1
return str(delta)[:-7]

## JOB RUNNING
async def run(self, job):
assert self.status == JobStatus.Preparing
Expand All @@ -230,6 +248,8 @@ async def run(self, job):
self.priority = job.priority

async with self._maybe_wait_for_resources(job) as cuda_device:
print("Starting job...", self.ID)
self.time_started = time.time()
## Cancel signal might come in waiting for cuda queue
if self.status == JobStatus.Cancelled:
return
Expand All @@ -249,6 +269,7 @@ async def run(self, job):
args=(self.input.copy(), job.copy(), result_queue),
)
p.start()
print("Awaiting job to finish...", self.ID)
while p.is_alive():
if self.status == JobStatus.Cancelling:
p.terminate()
Expand All @@ -260,6 +281,8 @@ async def run(self, job):
await asyncio.sleep(3)

response = result_queue.get()
print("Job finished...", self.ID, "response type:", response[0])
self.time_finished = time.time()
if response[0] == "error":
error_message = "".join(response[1])
error_type = response[2]
Expand Down
Loading

0 comments on commit fdf1312

Please sign in to comment.