diff --git a/nodes/manager/manager.py b/nodes/manager/manager.py index dc2a19e..930e193 100644 --- a/nodes/manager/manager.py +++ b/nodes/manager/manager.py @@ -9,6 +9,7 @@ from rhnode.common import QueueRequest, NodeMetaData from dotenv import load_dotenv from rhnode.version import __version__ +import time # load env variables from .env file if it exists load_dotenv() @@ -38,16 +39,23 @@ def add_job( or required_memory > self.memory_max ): raise ValueError("Job requirements exceed available resources.") - heapq.heappush( self.job_queue, - (-priority, job_id, required_gpu_mem, required_threads, required_memory), + ( + -priority, + time.time(), + job_id, + required_gpu_mem, + required_threads, + required_memory, + ), ) self.process_queue() def process_queue(self): while self.job_queue: ( + _, _, job_id, required_gpu_mem, @@ -155,22 +163,20 @@ def get_active_jobs_info(self): return active_jobs_info def get_queued_jobs_info(self): - return { - "queued_jobs": [ - { - "priority": job[0] * -1, - "job_id": job[1], - "required_gpu_mem": job[2], - "required_threads": job[3], - "required_memory": job[4], - } - for job in self.job_queue - ] - } + return [ + { + "priority": job[0] * -1, + "job_id": job[1], + "required_gpu_mem": job[2], + "required_threads": job[3], + "required_memory": job[4], + } + for job in self.job_queue + ] def get_load(self): active_jobs_info = self.get_active_jobs_info() - queued_jobs_info = self.get_queued_jobs_info()["queued_jobs"] + queued_jobs_info = self.get_queued_jobs_info() sum_gpu_mem = ( sum(self.gpu_devices_mem_max) if isinstance(self.gpu_devices_mem_max, list) @@ -331,16 +337,7 @@ async def redirect_to_manager(request: Request): @self.get("/manager") async def resource_queue(request: Request): active_jobs = self.queue.get_active_jobs_info() - queued_jobs = [ - { - "priority": job[0] * -1, - "job_id": job[1], - "required_gpu_mem": job[2], - "required_threads": job[3], - "required_memory": job[4], - } - for job in self.queue.job_queue - ] + queued_jobs = self.queue.get_queued_jobs_info() available_resources = self.queue.get_resource_info() for active_job in active_jobs: diff --git a/rhnode/__init__.py b/rhnode/__init__.py index 77bba81..1b5b76b 100644 --- a/rhnode/__init__.py +++ b/rhnode/__init__.py @@ -1,6 +1,6 @@ try: from .rhnode import RHNode - from .rhjob import RHJob + from .rhjob import RHJob, MultiJobRunner except ImportError: pass diff --git a/rhnode/resources/templates/index.html b/rhnode/resources/templates/index.html index 1a1b127..b4bde08 100644 --- a/rhnode/resources/templates/index.html +++ b/rhnode/resources/templates/index.html @@ -3,7 +3,7 @@ {% block body %} -

Processes

+

Active jobs

@@ -11,34 +11,70 @@

Processes

+ {% for item in tasks %} + {% set valid = True %} {% if item.status == 'Preparing' %} {% elif item.status == 'Queued' %} {% elif item.status == 'Running' %} - {% elif item.status == 'Finished' %} - - {% elif item.status == 'Error' %} - {% elif item.status == 'Cancelling' %} - - {% elif item.status == 'Cancelled' %} - + "> {% else %} - + {% set valid = False %} {% endif %} - + {% if valid %} + + + {% endif %} {% endfor %}
Status Priority CreatedRuntime
{{ item.task_id }}{{ item.task_id }} {{ item.status }} {{ item.priority }} {{ item.date }}{{ item.runtime }}
+

Past jobs

+ + + + + + + + + + + + {% for item in tasks %} + {% set valid = True %} + {% if item.status == 'Finished' %} + + {% elif item.status == 'Error' %} + + {% elif item.status == 'Cancelled' %} + + {% else %} + {% set valid = False %} + {% endif %} + {% if valid %} + + + + + + + {% endif %} + {% endfor %} + +
IDStatusPriorityCreatedRuntime
{{ item.task_id }}{{ item.status }}{{ item.priority }}{{ item.date }}{{ item.runtime }}
+ + + {% endblock %} \ No newline at end of file diff --git a/rhnode/resources/templates/task.html b/rhnode/resources/templates/task.html index fd07a8f..16c98e2 100644 --- a/rhnode/resources/templates/task.html +++ b/rhnode/resources/templates/task.html @@ -3,11 +3,11 @@ {% block body %}

ID: {{queue_id}}

{{queue_status}}

-
{% if outputs is not none %} +

Outputs

@@ -31,4 +31,21 @@

{{queue_status}}

{% endif %} +

Info

+ + + + + + + + + {% for item in info %} + + + + + {% endfor %} + +
fieldvalue
{{ item.name }}{{ item.val }}
{% endblock %} \ No newline at end of file diff --git a/rhnode/rhjob.py b/rhnode/rhjob.py index 54cc170..fbe04e9 100644 --- a/rhnode/rhjob.py +++ b/rhnode/rhjob.py @@ -8,6 +8,68 @@ from requests.exceptions import HTTPError +class MultiJobRunner: + """ + A class to distribute massive amounts of jobs on different RHnode clusters. + """ + + def __init__( + self, jobs, on_job_finish_handle=None, queue_length=16, skip_on_error=True + ): + self.skip_on_error = skip_on_error + self.jobs = jobs + self.on_job_finish_handle = on_job_finish_handle + self.started_jobs = {} + self.queue_length = queue_length + self.n_total_jobs = len(jobs) + + def _finish_job(self, job): + try: + outputs = job.wait_for_finish() + if self.on_job_finish_handle is not None: + inputs = job.input_data + self.on_job_finish_handle(inputs, outputs) + except (JobFailedError, JobCancelledError) as error: + if self.skip_on_error: + print( + "job with inputs", + str(job.input_data), + "encountered an error or was cancelled, ignoring", + ) + else: + raise + + def _check_and_update_active_jobs(self): + IDs_to_remove = [] + for i, (ID, job) in enumerate(self.started_jobs.items()): + if JobStatus(job._get_status()) in [ + JobStatus.Finished, + JobStatus.Error, + JobStatus.Cancelled, + ]: + self._finish_job(self.started_jobs[ID]) + IDs_to_remove.append(ID) + + for ID in IDs_to_remove: + remaining_jobs = len(self.jobs) + len(self.started_jobs) + print( + "Finished job:", ID, f"completed:{remaining_jobs}/{self.n_total_jobs}" + ) + del self.started_jobs[ID] + + while len(self.jobs) > 0 and len(self.started_jobs) <= self.queue_length: + new_job = self.jobs.pop(0) + new_job.start() + self.started_jobs[new_job.ID] = new_job + time.sleep(0.5) + + def start(self): + print(f"Start MultiRunner with {self.n_total_jobs} jobs") + while len(self.jobs) > 0 or len(self.started_jobs) > 0: + self._check_and_update_active_jobs() + time.sleep(10) + + class RHJob: def __init__( self, @@ -296,6 +358,7 @@ def wait_for_finish(self): time.sleep(10) elif JobStatus(status) == JobStatus.Running: time.sleep(4) + ##TODO what if status is Cancelling or other unhandled? Is server spammed with get_status requests? for key, value in output.items(): if isinstance(value, str) and "/download/" in value: @@ -311,11 +374,12 @@ def wait_for_finish(self): .split("=")[1] .replace('"', "") ) - self._maybe_make_output_directory(output_path) fname = Path(os.path.join(output_path, fname)).absolute() else: fname = Path(self.output_data[key]).absolute() + self._maybe_make_output_directory(str(fname.parent.absolute())) + with open(fname, "wb") as f: f.write(response.content) output[key] = fname diff --git a/rhnode/rhnode.py b/rhnode/rhnode.py index bbf81a5..3d04de7 100644 --- a/rhnode/rhnode.py +++ b/rhnode/rhnode.py @@ -21,6 +21,7 @@ from .version import __version__ MANAGER_URL = "http://manager:8000/manager" +MANAGER_URL = "http://localhost:8000/manager" class RHNode(ABC, FastAPI): diff --git a/rhnode/rhprocess.py b/rhnode/rhprocess.py index 2a2a968..840a7bb 100644 --- a/rhnode/rhprocess.py +++ b/rhnode/rhprocess.py @@ -12,6 +12,7 @@ from contextlib import contextmanager import time from pydantic import ValidationError +from datetime import datetime class RHProcess: @@ -40,6 +41,8 @@ def __init__( self.error = None self.time_created = time.time() self.time_last_accessed = None + self.time_started = None + self.time_finished = None self.output = None self.input = inputs_no_files @@ -157,6 +160,7 @@ async def _maybe_wait_for_resources(self, job): await asyncio.sleep(3) try: + # print("Job,"+job.id) yield gpu_id finally: if queue_id: @@ -206,6 +210,20 @@ def is_ready_to_run(self): except ValidationError: return False + def get_runtime_str(self): + if self.time_started is None: + return None + else: + # convert timestamps to datetime object + dt1 = datetime.fromtimestamp(self.time_created) + if self.time_finished is None: + dt2 = datetime.fromtimestamp(time.time()) + else: + dt2 = datetime.fromtimestamp(self.time_started) + + delta = dt2 - dt1 + return str(delta)[:-7] + ## JOB RUNNING async def run(self, job): assert self.status == JobStatus.Preparing @@ -230,6 +248,8 @@ async def run(self, job): self.priority = job.priority async with self._maybe_wait_for_resources(job) as cuda_device: + print("Starting job...", self.ID) + self.time_started = time.time() ## Cancel signal might come in waiting for cuda queue if self.status == JobStatus.Cancelled: return @@ -249,6 +269,7 @@ async def run(self, job): args=(self.input.copy(), job.copy(), result_queue), ) p.start() + print("Awaiting job to finish...", self.ID) while p.is_alive(): if self.status == JobStatus.Cancelling: p.terminate() @@ -260,6 +281,8 @@ async def run(self, job): await asyncio.sleep(3) response = result_queue.get() + print("Job finished...", self.ID, "response type:", response[0]) + self.time_finished = time.time() if response[0] == "error": error_message = "".join(response[1]) error_type = response[2] diff --git a/rhnode/routing/frontend.py b/rhnode/routing/frontend.py index 6cd3d0f..500852e 100644 --- a/rhnode/routing/frontend.py +++ b/rhnode/routing/frontend.py @@ -45,25 +45,34 @@ async def show_task_status(request: Request): template = env.get_template("index.html") formats = [] - sorted_jobs = sorted(rhnode.jobs.items(), key=lambda kv: -kv[1].time_created) + sorted_jobs = sorted(rhnode.jobs.items(), key=lambda kv: kv[1].time_created) for job_id, job in sorted_jobs: datetime_str = str(datetime.datetime.fromtimestamp(job.time_created)) - formats.append( - { - "task_id": job_id, - "status": job.status.name, - "href": rhnode.url_path_for("_show", job_id=job_id), - "date": datetime_str, - "priority": job.priority, - } - ) + runtime = job.get_runtime_str() + if runtime is None: + runtime = "" + formats.append(_get_job_info_dict(job, job_id)) html_content = template.render( default_context=_get_default_context(), tasks=formats ) return html_content + def _get_job_info_dict(job, job_id): + datetime_str = str(datetime.datetime.fromtimestamp(job.time_created)) + runtime = job.get_runtime_str() + if runtime is None: + runtime = "" + return { + "task_id": job_id, + "status": job.status.name, + "href": rhnode.url_path_for("_show", job_id=job_id), + "date": datetime_str[:-7], + "priority": job.priority, + "runtime": runtime, + } + @rhnode.get(rhnode._create_url("/jobs/{job_id}"), response_class=HTMLResponse) async def _show(job_id: str) -> HTMLResponse: job = rhnode.jobs[job_id] @@ -74,13 +83,15 @@ async def _show(job_id: str) -> HTMLResponse: template = env.get_template("task.html") + info = _get_job_info_dict(job, job_id) + info = [{"name": k, "val": v} for k, v in info.items()] html_content = template.render( default_context=_get_default_context(), outputs=output, queue_status=job.status, queue_id=job_id, + info=info, ) - # Return the rendered webpage return html_content