Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set worker time limit in allocations #605

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
- name: Check Python formatting
run: |
export DIRECTORIES="scripts tests benchmarks crates/pyhq/python"
python -m black --check $DIRECTORIES
python -m black --preview -l 120 --check $DIRECTORIES
python -m flake8 $DIRECTORIES

- name: Build docs
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ worker stops. You can use this feature e.g. to initialize some data or load soft
--worker-start-cmd "/project/xxx/init-node.sh" \
--worker-stop-cmd "/project/xxx/cleanup-node.sh"
```
* You can now set a time limit for workers spawned in allocations with the `--worker-time-limit` flag. You can use this
command to make workers stop sooner, so that you e.g. give more headroom for a `--worker-stop-cmd` command to execute
before the allocation is terminated. If you do not use this parameter, worker time limit will be set to the time limit
of the allocation.

Example:
```console
$ hq alloc add pbs --time-limit 1h --worker-time-limit 58m --worker-stop-cmd "/project/xxxx/slow-command.sh"
```
In this case, the allocation will run for one hour, but the HQ worker will be stopped after 58 minutes (unless it is
stopped sooner because of idle timeout). The worker stop command will thus have at least two minutes to execute.

## Changes

Expand Down
12 changes: 3 additions & 9 deletions benchmarks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@


@app.command()
def compare_hq_version(
baseline: str, modified: Optional[str] = None, zero_worker: Optional[bool] = False
):
def compare_hq_version(baseline: str, modified: Optional[str] = None, zero_worker: Optional[bool] = False):
"""
Compares the performance of two HQ versions.
If `modified` is not set, the current git workspace version will be used.
Expand Down Expand Up @@ -71,13 +69,9 @@ def sleep():
task_counts = (10, 100, 1000)
descriptions = []

def add_product(
workloads: List[Workload], environments: List[EnvironmentDescriptor]
):
def add_product(workloads: List[Workload], environments: List[EnvironmentDescriptor]):
for env, workload in itertools.product(environments, workloads):
descriptions.append(
BenchmarkDescriptor(env_descriptor=env, workload=workload)
)
descriptions.append(BenchmarkDescriptor(env_descriptor=env, workload=workload))

add_product([SleepSnake(tc) for tc in task_counts], [SnakeEnvironmentDescriptor()])
add_product(
Expand Down
4 changes: 1 addition & 3 deletions benchmarks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ def cluster_serve(

@cluster.command("generate")
def cluster_generate(
directory: Path = typer.Argument(
..., exists=True, file_okay=False, help="Directory containing `cluster.json`"
),
directory: Path = typer.Argument(..., exists=True, file_okay=False, help="Directory containing `cluster.json`"),
output: Path = Path("out.html"),
):
"""Generate a HTML file with monitoring and profiling report for the given directory"""
Expand Down
10 changes: 2 additions & 8 deletions benchmarks/src/benchmark/identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def create_identifiers(
for descriptor in descriptors:
identifiers.extend(
BenchmarkInstance(
identifier=create_identifier(
workdir, descriptor, default_timeout_s, index=index
),
identifier=create_identifier(workdir, descriptor, default_timeout_s, index=index),
descriptor=descriptor,
)
for index in range(descriptor.repeat_count)
Expand Down Expand Up @@ -112,11 +110,7 @@ def create_benchmark_key(
environment_params: Dict[str, Any],
index: int,
) -> str:
return (
f"{workload}-{format_value(workload_params)}-{environment}-"
f"{format_value(environment_params)}-"
f"{index}"
)
return f"{workload}-{format_value(workload_params)}-{environment}-{format_value(environment_params)}-{index}"


def format_value(value):
Expand Down
26 changes: 7 additions & 19 deletions benchmarks/src/benchmark/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ def __init__(self, database: Database, workdir: Path, exit_on_error=True):
self.workdir.mkdir(parents=True, exist_ok=True)
self.exit_on_error = exit_on_error

def materialize_and_skip(
self, descriptors: List[BenchmarkDescriptor]
) -> List[BenchmarkInstance]:
instances = create_identifiers(
descriptors, workdir=self.workdir, default_timeout_s=DEFAULT_TIMEOUT_S
)
def materialize_and_skip(self, descriptors: List[BenchmarkDescriptor]) -> List[BenchmarkInstance]:
instances = create_identifiers(descriptors, workdir=self.workdir, default_timeout_s=DEFAULT_TIMEOUT_S)
return self._skip_completed(instances)

def compute_materialized(
Expand All @@ -40,9 +36,7 @@ def compute_materialized(
identifier = instance.identifier

logging.info(f"Executing benchmark {identifier}")
ctx = BenchmarkContext(
workdir=Path(identifier.workdir), timeout_s=identifier.timeout
)
ctx = BenchmarkContext(workdir=Path(identifier.workdir), timeout_s=identifier.timeout)
executor = self._create_executor(instance.descriptor)

try:
Expand All @@ -56,18 +50,14 @@ def compute_materialized(

yield (instance, result)

def compute(
self, descriptors: List[BenchmarkDescriptor]
) -> Iterable[Tuple[BenchmarkInstance, BenchmarkResult]]:
def compute(self, descriptors: List[BenchmarkDescriptor]) -> Iterable[Tuple[BenchmarkInstance, BenchmarkResult]]:
instances = self.materialize_and_skip(descriptors)
yield from self.compute_materialized(instances)

def save(self):
self.database.save()

def _skip_completed(
self, infos: List[BenchmarkInstance]
) -> List[BenchmarkInstance]:
def _skip_completed(self, infos: List[BenchmarkInstance]) -> List[BenchmarkInstance]:
not_completed = []
visited = set()
skipped = 0
Expand All @@ -94,10 +84,8 @@ def _handle_result(self, identifier: BenchmarkIdentifier, result: BenchmarkResul
if isinstance(result, Failure):
logging.error(f"Benchmark {key} has failed: {result.traceback}")
if self.exit_on_error:
raise Exception(
f"""Benchmark {identifier} has failed: {result}
You can find details in {identifier.workdir}"""
)
raise Exception(f"""Benchmark {identifier} has failed: {result}
You can find details in {identifier.workdir}""")
elif isinstance(result, Timeout):
logging.info(f"Benchmark {key} has timeouted after {result.timeout}s")
elif isinstance(result, Success):
Expand Down
12 changes: 3 additions & 9 deletions benchmarks/src/benchmark_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,12 @@ def _create_hq_benchmarks(
return descriptors


def create_basic_hq_benchmarks(
artifacts: List[BuiltBinary], repeat_count=2
) -> List[BenchmarkDescriptor]:
def create_basic_hq_benchmarks(artifacts: List[BuiltBinary], repeat_count=2) -> List[BenchmarkDescriptor]:
workloads = list(sleep_workloads())
return _create_hq_benchmarks(
artifacts, [HqWorkerConfig()], workloads, repeat_count=repeat_count
)
return _create_hq_benchmarks(artifacts, [HqWorkerConfig()], workloads, repeat_count=repeat_count)


def create_resources_hq_benchmarks(
artifacts: List[BuiltBinary], repeat_count=2
) -> List[BenchmarkDescriptor]:
def create_resources_hq_benchmarks(artifacts: List[BuiltBinary], repeat_count=2) -> List[BenchmarkDescriptor]:
workloads = list(sleep_resource_benchmarks())
return _create_hq_benchmarks(
artifacts,
Expand Down
9 changes: 2 additions & 7 deletions benchmarks/src/clusterutils/cluster_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ def prepare_workdir(workdir: Path) -> Path:
workdir.mkdir(parents=True, exist_ok=True)
return workdir.absolute()

pool_args = [
dataclasses.replace(args, workdir=prepare_workdir(args.workdir))
for args in processes
]
pool_args = [dataclasses.replace(args, workdir=prepare_workdir(args.workdir)) for args in processes]

logging.debug(f"Starting cluster processes: {pool_args}")

Expand All @@ -103,9 +100,7 @@ def start_monitoring(self, nodes: List[str], observe_processes=False):
if pyenv:
init_cmd += [f"source {pyenv}/bin/activate"]
else:
logging.warning(
"No Python virtualenv detected. Monitoring will probably not work."
)
logging.warning("No Python virtualenv detected. Monitoring will probably not work.")

nodes = sorted(set(nodes))
workdir = self.workdir / "monitoring"
Expand Down
16 changes: 4 additions & 12 deletions benchmarks/src/clusterutils/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ def __init__(self, frequency: int):

def check_availability(self):
if not is_binary_available("perf"):
raise Exception(
"Flamegraph profiling is not available. Please make sure that `perf` is available."
)
raise Exception("Flamegraph profiling is not available. Please make sure that `perf` is available.")

def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand:
path = output_dir / "perf-records.txt"
Expand Down Expand Up @@ -73,9 +71,7 @@ def __init__(self, events: List[str] = None):

def check_availability(self):
if not is_binary_available("perf"):
raise Exception(
"Performance events profiling is not available. Please install `perf`."
)
raise Exception("Performance events profiling is not available. Please install `perf`.")

def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand:
path = output_dir / "perf-events.txt"
Expand All @@ -100,9 +96,7 @@ class CachegrindProfiler(Profiler):

def check_availability(self):
if not is_binary_available("valgrind"):
raise Exception(
"Valgrind profiling is not available. Please install `valgrind`."
)
raise Exception("Valgrind profiling is not available. Please install `valgrind`.")

def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand:
path = output_dir / "cachegrind.txt"
Expand All @@ -122,9 +116,7 @@ class CallgrindProfiler(Profiler):

def check_availability(self):
if not is_binary_available("valgrind"):
raise Exception(
"Valgrind profiling is not available. Please install `valgrind`."
)
raise Exception("Valgrind profiling is not available. Please install `valgrind`.")

def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand:
path = output_dir / "callgrind.txt"
Expand Down
26 changes: 6 additions & 20 deletions benchmarks/src/environment/hq.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ def parameters(self) -> Dict[str, Any]:
return params


def assign_workers(
workers: List[HqWorkerConfig], nodes: List[str]
) -> Dict[str, List[HqWorkerConfig]]:
def assign_workers(workers: List[HqWorkerConfig], nodes: List[str]) -> Dict[str, List[HqWorkerConfig]]:
round_robin_node = 0
used_round_robin = set()

Expand All @@ -99,14 +97,10 @@ def assign_workers(
node = round_robin_node
round_robin_node = (round_robin_node + 1) % len(nodes)
if node in used_round_robin:
logging.warning(
f"There are more workers ({len(workers)}) than worker nodes ({len(nodes)})"
)
logging.warning(f"There are more workers ({len(workers)}) than worker nodes ({len(nodes)})")
used_round_robin.add(node)
if node >= len(nodes):
raise Exception(
f"Selected worker node is {node}, but there are only {len(nodes)} worker node(s)"
)
raise Exception(f"Selected worker node is {node}, but there are only {len(nodes)} worker node(s)")
node_assignments[nodes[node]].append(worker)
return dict(node_assignments)

Expand All @@ -126,11 +120,7 @@ def __init__(self, info: HqClusterInfo, workdir: Path):
self.nodes = self.info.cluster.node_list.resolve()
assert self.nodes

worker_nodes = (
self.nodes
if isinstance(self.info.cluster.node_list, Local)
else self.nodes[1:]
)
worker_nodes = self.nodes if isinstance(self.info.cluster.node_list, Local) else self.nodes[1:]
if not worker_nodes:
raise Exception("No worker nodes are available")

Expand Down Expand Up @@ -234,9 +224,7 @@ def _wait_for_server_start(self):

def _wait_for_workers(self, count: int):
def get_worker_count():
output = subprocess.check_output(
self._shared_args() + ["--output-mode", "json", "worker", "list"]
)
output = subprocess.check_output(self._shared_args() + ["--output-mode", "json", "worker", "list"])
return len(json.loads(output)) == count

wait_until(lambda: get_worker_count())
Expand All @@ -245,9 +233,7 @@ def _log_env_value(self) -> str:
return f"hyperqueue={'DEBUG' if self.info.debug else 'INFO'}"


def apply_profilers(
args: StartProcessArgs, profilers: List[Profiler], output_dir: Path
):
def apply_profilers(args: StartProcessArgs, profilers: List[Profiler], output_dir: Path):
for profiler in profilers:
profiler.check_availability()
result = profiler.profile(args.args, output_dir.absolute())
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/environment/snake.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,5 @@ def submit(self, cmds: str, cpus_per_task: int):
)
if not ret:
raise Exception(
f"SnakeMake execution failed. You can find more details in "
f"{self.workdir / '.snakemake' / 'log'}"
f"SnakeMake execution failed. You can find more details in {self.workdir / '.snakemake' / 'log'}"
)
4 changes: 1 addition & 3 deletions benchmarks/src/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,5 @@ def __post_init__(self):


class BenchmarkExecutor:
def execute(
self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext
) -> BenchmarkResult:
def execute(self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult:
raise NotImplementedError
14 changes: 4 additions & 10 deletions benchmarks/src/executor/external_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,12 @@ class ExternalBenchmarkExecutor(BenchmarkExecutor):
def __init__(self, init_script: Optional[Path] = None):
self.init_script = init_script.resolve()

def execute(
self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext
) -> BenchmarkResult:
def execute(self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult:
return execute_benchmark_in_external_process(benchmark, ctx, self.init_script)


def serialize_benchmark(
descriptor: BenchmarkDescriptor, ctx: BenchmarkContext
) -> bytes:
serialized_benchmark = SerializedBenchmark(
descriptor=descriptor, ctx=ctx, cwd=Path(os.getcwd()).absolute()
)
def serialize_benchmark(descriptor: BenchmarkDescriptor, ctx: BenchmarkContext) -> bytes:
serialized_benchmark = SerializedBenchmark(descriptor=descriptor, ctx=ctx, cwd=Path(os.getcwd()).absolute())
return pickle.dumps(serialized_benchmark)


Expand Down Expand Up @@ -117,7 +111,7 @@ def create_error_message(returncode: int) -> str:
with open(stdout_file) as stdout:
with open(stderr_file) as stderr:
return (
f"Benchmark launch in external process ended with exception.\n"
"Benchmark launch in external process ended with exception.\n"
f"Stdout: {stdout.read()}\n"
f"Stderr: {stderr.read()}\n"
f"Exit code: {returncode}"
Expand Down
8 changes: 2 additions & 6 deletions benchmarks/src/executor/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@
class LocalBenchmarkExecutor(BenchmarkExecutor):
"""Executes benchmarks in the current process"""

def execute(
self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext
) -> BenchmarkResult:
def execute(self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult:
return execute_benchmark(benchmark, ctx)


def execute_benchmark(
descriptor: BenchmarkDescriptor, ctx: BenchmarkContext
) -> BenchmarkResult:
def execute_benchmark(descriptor: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult:
env = descriptor.env_descriptor.create_environment(ctx.workdir)
workload = descriptor.workload

Expand Down
12 changes: 3 additions & 9 deletions benchmarks/src/monitoring/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ def serialize(self, file):
def record_resources() -> ResourceRecord:
cpus = psutil.cpu_percent(percpu=True)
mem = psutil.virtual_memory().percent
connections = sum(
1 if c[5] == "ESTABLISHED" else 0 for c in psutil.net_connections()
)
connections = sum(1 if c[5] == "ESTABLISHED" else 0 for c in psutil.net_connections())
bytes = psutil.net_io_counters()
io = psutil.disk_io_counters()

Expand All @@ -67,17 +65,13 @@ def record_processes(processes: List[psutil.Process]) -> Dict[str, ProcessRecord
try:
memory_info = process.memory_info()
cpu_utilization = process.cpu_percent()
data[str(process.pid)] = ProcessRecord(
rss=memory_info.rss, vm=memory_info.vms, cpu=cpu_utilization
)
data[str(process.pid)] = ProcessRecord(rss=memory_info.rss, vm=memory_info.vms, cpu=cpu_utilization)
except BaseException as e:
logging.error(e)
return data


def generate_record(
timestamp: int, processes: List[psutil.Process]
) -> MonitoringRecord:
def generate_record(timestamp: int, processes: List[psutil.Process]) -> MonitoringRecord:
return MonitoringRecord(
timestamp=timestamp,
resources=record_resources(),
Expand Down
Loading
Loading