Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/inherit_confi…
Browse files Browse the repository at this point in the history
…g_support
  • Loading branch information
yutaro-oguri committed Dec 5, 2023
2 parents 7764cf9 + 588e87f commit 150a632
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 20 deletions.
42 changes: 27 additions & 15 deletions kannon/master.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import logging
import os
from collections import deque
from copy import deepcopy
from time import sleep
from typing import Deque, Dict, List, Optional, Set

import gokart
from gokart.target import make_target
Expand All @@ -26,10 +27,11 @@ def __init__(
# kannon resources
job_prefix: str,
path_child_script: str = "./run_child.py",
env_to_inherit: Optional[List[str]] = None,
master_pod_name: Optional[str] = None,
master_pod_uid: Optional[str] = None,
dynamic_config_path: Optional[str] = None,
env_to_inherit: list[str] | None = None,
master_pod_name: str | None = None,
master_pod_uid: str | None = None,
dynamic_config_path: str | None = None,
max_child_jobs: int | None = None,
) -> None:
# validation
if not os.path.exists(path_child_script):
Expand All @@ -46,8 +48,11 @@ def __init__(
self.master_pod_name = master_pod_name
self.master_pod_uid = master_pod_uid
self.dynamic_config_path = dynamic_config_path
if max_child_jobs is not None and max_child_jobs <= 0:
raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}")
self.max_child_jobs = max_child_jobs

self.task_id_to_job_name: Dict[str, str] = dict()
self.task_id_to_job_name: dict[str, str] = dict()

def build(self, root_task: gokart.TaskOnKart) -> None:
# check all config file paths exists
Expand Down Expand Up @@ -77,15 +82,18 @@ def build(self, root_task: gokart.TaskOnKart) -> None:
task_queue = self._create_task_queue(root_task)

# consume task queue
running_task_ids: set[str] = set()
logger.info("Consuming task queue...")
while task_queue:
task = task_queue.popleft()
if task.complete():
logger.info(f"Task {self._gen_task_info(task)} is already completed.")
if task.make_unique_id() in running_task_ids:
running_task_ids.remove(task.make_unique_id())
continue
if task.make_unique_id() in self.task_id_to_job_name:
if task.make_unique_id() in running_task_ids:
# check if task is still running on child job
assert self._check_child_task_status(task), f"Child task {self._gen_task_info(task)} failed."
self._check_child_task_status(task)
logger.info(f"Task {self._gen_task_info(task)} is still running on child job.")
task_queue.append(task) # re-enqueue task to check if it is done
continue
Expand All @@ -99,8 +107,13 @@ def build(self, root_task: gokart.TaskOnKart) -> None:
continue
# execute task
if isinstance(task, TaskOnBullet):
if self.max_child_jobs is not None and len(running_task_ids) >= self.max_child_jobs:
task_queue.append(task) # re-enqueue task to check later
logger.info(f"Reach max_child_jobs, waiting to run task {self._gen_task_info(task)} on child job...")
continue
logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...")
self._exec_bullet_task(task, remote_config_path)
running_task_ids.add(task.make_unique_id()) # mark as already launched task
task_queue.append(task) # re-enqueue task to check if it is done
elif isinstance(task, gokart.TaskOnKart):
logger.info(f"Executing task {self._gen_task_info(task)} on master job...")
Expand All @@ -111,9 +124,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None:

logger.info("All tasks completed!")

def _create_task_queue(self, root_task: gokart.TaskOnKart) -> Deque[gokart.TaskOnKart]:
task_queue: Deque[gokart.TaskOnKart] = deque()
visited_task_ids: Set[str] = set()
def _create_task_queue(self, root_task: gokart.TaskOnKart) -> deque[gokart.TaskOnKart]:
task_queue: deque[gokart.TaskOnKart] = deque()
visited_task_ids: set[str] = set()

def _rec_enqueue_task(task: gokart.TaskOnKart) -> None:
"""Traversal task tree in post-order to push tasks into task queue."""
Expand Down Expand Up @@ -141,7 +154,7 @@ def _exec_gokart_task(self, task: gokart.TaskOnKart) -> None:
except Exception:
raise RuntimeError(f"Task {self._gen_task_info(task)} on job master has failed.")

def _exec_bullet_task(self, task: TaskOnBullet, remote_config_path: Optional[str]) -> None:
def _exec_bullet_task(self, task: TaskOnBullet, remote_config_path: str | None) -> None:
# Save task instance as pickle object
pkl_path = self._gen_pkl_path(task)
make_target(pkl_path).dump(task)
Expand All @@ -160,7 +173,7 @@ def _create_child_job_object(
self,
job_name: str,
task_pkl_path: str,
remote_config_path: Optional[str] = None,
remote_config_path: str | None = None,
) -> client.V1Job:
# TODO: use python -c to avoid dependency to execute_task.py
cmd = [
Expand Down Expand Up @@ -213,7 +226,7 @@ def _gen_task_info(task: gokart.TaskOnKart) -> str:
def _gen_pkl_path(task: gokart.TaskOnKart) -> str:
return os.path.join(task.workspace_directory, 'kannon', f'task_obj_{task.make_unique_id()}.pkl')

def _check_child_task_status(self, task: TaskOnBullet) -> bool:
def _check_child_task_status(self, task: TaskOnBullet) -> None:
if task.make_unique_id() not in self.task_id_to_job_name:
raise ValueError(f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`")
job_name = self.task_id_to_job_name[task.make_unique_id()]
Expand All @@ -224,7 +237,6 @@ def _check_child_task_status(self, task: TaskOnBullet) -> bool:
)
if job_status == JobStatus.FAILED:
raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.")
return True

def _is_executable(self, task: gokart.TaskOnKart) -> bool:
children = flatten(task.requires())
Expand Down
74 changes: 70 additions & 4 deletions test/integration_test/test_master_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,25 @@ def complete(self) -> bool:

class MockKannon(Kannon):

def __init__(self) -> None:
def __init__(self, *, max_child_jobs: int | None = None) -> None:
super().__init__(
api_instance=None,
template_job=client.V1Job(metadata=client.V1ObjectMeta()),
job_prefix="",
path_child_script=__file__, # just pass any existing file as dummy
env_to_inherit=None,
max_child_jobs=max_child_jobs,
)

def _exec_gokart_task(self, task: MockTaskOnKart) -> None:
task.run()

def _exec_bullet_task(self, task: MockTaskOnBullet, remote_config_path: Optional[str] = None) -> None:
def _exec_bullet_task(self, task: MockTaskOnBullet) -> None:
self.task_id_to_job_name[task.make_unique_id()] = "dummy_job_name"
task.run()

def _check_child_task_status(self, task: MockTaskOnBullet) -> bool:
return True
def _check_child_task_status(self, task: MockTaskOnBullet) -> None:
return None

def _is_executable(self, task: MockTaskOnKart) -> bool:
children = flatten(task.requires())
Expand Down Expand Up @@ -181,6 +182,71 @@ def requires(self) -> list[Child]:
'INFO:kannon.master:All tasks completed!',
])

def test_three_task_on_bullet_with_max_child_jobs(self) -> None:
self.maxDiff = None

class Child(MockTaskOnBullet):
param = luigi.IntParameter()

c1 = Child(param=1)
c1.wait_sec = 4
c2 = Child(param=2)
c2.wait_sec = 3
c3 = Child(param=3)
c3.wait_sec = 2

class Parent(MockTaskOnKart):

def requires(self) -> list[Child]:
return [c1, c2, c3]

root_task = Parent()

master = MockKannon(max_child_jobs=2)
with self.assertLogs() as cm:
master.build(root_task)

c1_task_info = master._gen_task_info(c1)
c2_task_info = master._gen_task_info(c2)
c3_task_info = master._gen_task_info(c3)
root_task_info = master._gen_task_info(root_task)
self.assertEqual(
cm.output,
[
'INFO:kannon.master:Creating task queue...',
f'INFO:kannon.master:Task {c1_task_info} is pushed to task queue',
f'INFO:kannon.master:Task {c2_task_info} is pushed to task queue',
f'INFO:kannon.master:Task {c3_task_info} is pushed to task queue',
f'INFO:kannon.master:Task {root_task_info} is pushed to task queue',
'INFO:kannon.master:Total tasks in task queue: 4',
'INFO:kannon.master:Consuming task queue...',
f'INFO:kannon.master:Checking if task {c1_task_info} is executable...',
f'INFO:kannon.master:Trying to run task {c1_task_info} on child job...',
f'INFO:kannon.master:Checking if task {c2_task_info} is executable...',
f'INFO:kannon.master:Trying to run task {c2_task_info} on child job...',
# c3 has to wait
f'INFO:kannon.master:Checking if task {c3_task_info} is executable...',
f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Task {c1_task_info} is still running on child job.',
f'INFO:kannon.master:Task {c2_task_info} is still running on child job.',
f'INFO:kannon.master:Checking if task {c3_task_info} is executable...',
f'INFO:kannon.master:Reach max_child_jobs, waiting to run task {c3_task_info} on child job...',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Task {c1_task_info} is already completed.',
f'INFO:kannon.master:Task {c2_task_info} is already completed.',
# now c3 can start
f'INFO:kannon.master:Checking if task {c3_task_info} is executable...',
f'INFO:kannon.master:Trying to run task {c3_task_info} on child job...',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Task {c3_task_info} is still running on child job.',
f'INFO:kannon.master:Checking if task {root_task_info} is executable...',
f'INFO:kannon.master:Executing task {root_task_info} on master job...',
f'INFO:kannon.master:Completed task {root_task_info} on master job.',
f'INFO:kannon.master:Task {c3_task_info} is already completed.',
'INFO:kannon.master:All tasks completed!',
])


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ isolated_build = true
[testenv]
allowlist_externals = coverage
skip_install = true
commands = coverage run -m unittest discover -s test
commands = coverage run -m unittest discover -v

[testenv:yapf]
allowlist_externals = yapf
Expand Down

0 comments on commit 150a632

Please sign in to comment.