Skip to content

Commit

Permalink
Additional job check (#37)
Browse files Browse the repository at this point in the history
* Add additional waiting

* fix mock of kube api

* add util method to check child task

* fix exception

* Remove uncalled assert

* Add type annotation

* fix

---------

Co-authored-by: yutaro-oguri <[email protected]>
  • Loading branch information
maronuu and yutaro-oguri authored Dec 5, 2023
1 parent ce84cd3 commit 588e87f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
19 changes: 16 additions & 3 deletions kannon/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ def build(self, root_task: gokart.TaskOnKart) -> None:
running_task_ids.remove(task.make_unique_id())
continue
if task.make_unique_id() in running_task_ids:
# check if task is still running on child job
self._check_child_task_status(task)
logger.info(f"Task {self._gen_task_info(task)} is still running on child job.")
task_queue.append(task)
task_queue.append(task) # re-enqueue task to check if it is done
continue

# TODO: enable user to specify duration to sleep for each task
Expand Down Expand Up @@ -136,8 +138,7 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None:
)
create_job(self.api_instance, job, self.namespace)
logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}")
task_unique_id = task.make_unique_id()
self.task_id_to_job_name[task_unique_id] = job_name
self.task_id_to_job_name[task.make_unique_id()] = job_name

def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job:
# TODO: use python -c to avoid dependency to execute_task.py
Expand Down Expand Up @@ -174,6 +175,18 @@ def _gen_task_info(task: gokart.TaskOnKart) -> str:
def _gen_pkl_path(task: gokart.TaskOnKart) -> str:
return os.path.join(task.workspace_directory, 'kannon', f'task_obj_{task.make_unique_id()}.pkl')

def _check_child_task_status(self, task: TaskOnBullet) -> None:
if task.make_unique_id() not in self.task_id_to_job_name:
raise ValueError(f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`")
job_name = self.task_id_to_job_name[task.make_unique_id()]
job_status = get_job_status(
self.api_instance,
job_name,
self.namespace,
)
if job_status == JobStatus.FAILED:
raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.")

def _is_executable(self, task: gokart.TaskOnKart) -> bool:
children = flatten(task.requires())

Expand Down
12 changes: 12 additions & 0 deletions test/integration_test/test_master_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import gokart
import luigi
from kubernetes import client
from luigi.task import flatten

from kannon import Kannon, TaskOnBullet

Expand Down Expand Up @@ -55,8 +56,19 @@ def _exec_gokart_task(self, task: MockTaskOnKart) -> None:
task.run()

def _exec_bullet_task(self, task: MockTaskOnBullet) -> None:
self.task_id_to_job_name[task.make_unique_id()] = "dummy_job_name"
task.run()

def _check_child_task_status(self, task: MockTaskOnBullet) -> None:
return None

def _is_executable(self, task: MockTaskOnKart) -> bool:
children = flatten(task.requires())
for child in children:
if not child.complete():
return False
return True


class TestConsumeTaskQueue(unittest.TestCase):

Expand Down

0 comments on commit 588e87f

Please sign in to comment.