diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fe7dcab1c..25de58eb6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -86,7 +86,7 @@ jobs: - name: Check Python formatting run: | export DIRECTORIES="scripts tests benchmarks crates/pyhq/python" - python -m black --check $DIRECTORIES + python -m black --preview -l 120 --check $DIRECTORIES python -m flake8 $DIRECTORIES - name: Build docs diff --git a/CHANGELOG.md b/CHANGELOG.md index 95c7953f0..29c59a480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,17 @@ worker stops. You can use this feature e.g. to initialize some data or load soft --worker-start-cmd "/project/xxx/init-node.sh" \ --worker-stop-cmd "/project/xxx/cleanup-node.sh" ``` +* You can now set a time limit for workers spawned in allocations with the `--worker-time-limit` flag. You can use this +command to make workers stop sooner, so that you e.g. give more headroom for a `--worker-stop-cmd` command to execute +before the allocation is terminated. If you do not use this parameter, worker time limit will be set to the time limit +of the allocation. + + Example: + ```console + $ hq alloc add pbs --time-limit 1h --worker-time-limit 58m --worker-stop-cmd "/project/xxxx/slow-command.sh" + ``` + In this case, the allocation will run for one hour, but the HQ worker will be stopped after 58 minutes (unless it is + stopped sooner because of idle timeout). The worker stop command will thus have at least two minutes to execute. ## Changes diff --git a/benchmarks/main.py b/benchmarks/main.py index 63db2d4df..72d083865 100644 --- a/benchmarks/main.py +++ b/benchmarks/main.py @@ -28,9 +28,7 @@ @app.command() -def compare_hq_version( - baseline: str, modified: Optional[str] = None, zero_worker: Optional[bool] = False -): +def compare_hq_version(baseline: str, modified: Optional[str] = None, zero_worker: Optional[bool] = False): """ Compares the performance of two HQ versions. If `modified` is not set, the current git workspace version will be used. @@ -71,13 +69,9 @@ def sleep(): task_counts = (10, 100, 1000) descriptions = [] - def add_product( - workloads: List[Workload], environments: List[EnvironmentDescriptor] - ): + def add_product(workloads: List[Workload], environments: List[EnvironmentDescriptor]): for env, workload in itertools.product(environments, workloads): - descriptions.append( - BenchmarkDescriptor(env_descriptor=env, workload=workload) - ) + descriptions.append(BenchmarkDescriptor(env_descriptor=env, workload=workload)) add_product([SleepSnake(tc) for tc in task_counts], [SnakeEnvironmentDescriptor()]) add_product( diff --git a/benchmarks/postprocess.py b/benchmarks/postprocess.py index fe15ad956..6fb504cb3 100644 --- a/benchmarks/postprocess.py +++ b/benchmarks/postprocess.py @@ -29,9 +29,7 @@ def cluster_serve( @cluster.command("generate") def cluster_generate( - directory: Path = typer.Argument( - ..., exists=True, file_okay=False, help="Directory containing `cluster.json`" - ), + directory: Path = typer.Argument(..., exists=True, file_okay=False, help="Directory containing `cluster.json`"), output: Path = Path("out.html"), ): """Generate a HTML file with monitoring and profiling report for the given directory""" diff --git a/benchmarks/src/benchmark/identifier.py b/benchmarks/src/benchmark/identifier.py index 345c283af..d464bac42 100644 --- a/benchmarks/src/benchmark/identifier.py +++ b/benchmarks/src/benchmark/identifier.py @@ -62,9 +62,7 @@ def create_identifiers( for descriptor in descriptors: identifiers.extend( BenchmarkInstance( - identifier=create_identifier( - workdir, descriptor, default_timeout_s, index=index - ), + identifier=create_identifier(workdir, descriptor, default_timeout_s, index=index), descriptor=descriptor, ) for index in range(descriptor.repeat_count) @@ -112,11 +110,7 @@ def create_benchmark_key( environment_params: Dict[str, Any], index: int, ) -> str: - return ( - f"{workload}-{format_value(workload_params)}-{environment}-" - f"{format_value(environment_params)}-" - f"{index}" - ) + return f"{workload}-{format_value(workload_params)}-{environment}-{format_value(environment_params)}-{index}" def format_value(value): diff --git a/benchmarks/src/benchmark/runner.py b/benchmarks/src/benchmark/runner.py index 5c3ec9222..2dc7930b9 100644 --- a/benchmarks/src/benchmark/runner.py +++ b/benchmarks/src/benchmark/runner.py @@ -25,12 +25,8 @@ def __init__(self, database: Database, workdir: Path, exit_on_error=True): self.workdir.mkdir(parents=True, exist_ok=True) self.exit_on_error = exit_on_error - def materialize_and_skip( - self, descriptors: List[BenchmarkDescriptor] - ) -> List[BenchmarkInstance]: - instances = create_identifiers( - descriptors, workdir=self.workdir, default_timeout_s=DEFAULT_TIMEOUT_S - ) + def materialize_and_skip(self, descriptors: List[BenchmarkDescriptor]) -> List[BenchmarkInstance]: + instances = create_identifiers(descriptors, workdir=self.workdir, default_timeout_s=DEFAULT_TIMEOUT_S) return self._skip_completed(instances) def compute_materialized( @@ -40,9 +36,7 @@ def compute_materialized( identifier = instance.identifier logging.info(f"Executing benchmark {identifier}") - ctx = BenchmarkContext( - workdir=Path(identifier.workdir), timeout_s=identifier.timeout - ) + ctx = BenchmarkContext(workdir=Path(identifier.workdir), timeout_s=identifier.timeout) executor = self._create_executor(instance.descriptor) try: @@ -56,18 +50,14 @@ def compute_materialized( yield (instance, result) - def compute( - self, descriptors: List[BenchmarkDescriptor] - ) -> Iterable[Tuple[BenchmarkInstance, BenchmarkResult]]: + def compute(self, descriptors: List[BenchmarkDescriptor]) -> Iterable[Tuple[BenchmarkInstance, BenchmarkResult]]: instances = self.materialize_and_skip(descriptors) yield from self.compute_materialized(instances) def save(self): self.database.save() - def _skip_completed( - self, infos: List[BenchmarkInstance] - ) -> List[BenchmarkInstance]: + def _skip_completed(self, infos: List[BenchmarkInstance]) -> List[BenchmarkInstance]: not_completed = [] visited = set() skipped = 0 @@ -94,10 +84,8 @@ def _handle_result(self, identifier: BenchmarkIdentifier, result: BenchmarkResul if isinstance(result, Failure): logging.error(f"Benchmark {key} has failed: {result.traceback}") if self.exit_on_error: - raise Exception( - f"""Benchmark {identifier} has failed: {result} -You can find details in {identifier.workdir}""" - ) + raise Exception(f"""Benchmark {identifier} has failed: {result} +You can find details in {identifier.workdir}""") elif isinstance(result, Timeout): logging.info(f"Benchmark {key} has timeouted after {result.timeout}s") elif isinstance(result, Success): diff --git a/benchmarks/src/benchmark_defs.py b/benchmarks/src/benchmark_defs.py index bc20dc518..d6b17f290 100644 --- a/benchmarks/src/benchmark_defs.py +++ b/benchmarks/src/benchmark_defs.py @@ -82,18 +82,12 @@ def _create_hq_benchmarks( return descriptors -def create_basic_hq_benchmarks( - artifacts: List[BuiltBinary], repeat_count=2 -) -> List[BenchmarkDescriptor]: +def create_basic_hq_benchmarks(artifacts: List[BuiltBinary], repeat_count=2) -> List[BenchmarkDescriptor]: workloads = list(sleep_workloads()) - return _create_hq_benchmarks( - artifacts, [HqWorkerConfig()], workloads, repeat_count=repeat_count - ) + return _create_hq_benchmarks(artifacts, [HqWorkerConfig()], workloads, repeat_count=repeat_count) -def create_resources_hq_benchmarks( - artifacts: List[BuiltBinary], repeat_count=2 -) -> List[BenchmarkDescriptor]: +def create_resources_hq_benchmarks(artifacts: List[BuiltBinary], repeat_count=2) -> List[BenchmarkDescriptor]: workloads = list(sleep_resource_benchmarks()) return _create_hq_benchmarks( artifacts, diff --git a/benchmarks/src/clusterutils/cluster_helper.py b/benchmarks/src/clusterutils/cluster_helper.py index 2f1fa230a..c7340e349 100644 --- a/benchmarks/src/clusterutils/cluster_helper.py +++ b/benchmarks/src/clusterutils/cluster_helper.py @@ -74,10 +74,7 @@ def prepare_workdir(workdir: Path) -> Path: workdir.mkdir(parents=True, exist_ok=True) return workdir.absolute() - pool_args = [ - dataclasses.replace(args, workdir=prepare_workdir(args.workdir)) - for args in processes - ] + pool_args = [dataclasses.replace(args, workdir=prepare_workdir(args.workdir)) for args in processes] logging.debug(f"Starting cluster processes: {pool_args}") @@ -103,9 +100,7 @@ def start_monitoring(self, nodes: List[str], observe_processes=False): if pyenv: init_cmd += [f"source {pyenv}/bin/activate"] else: - logging.warning( - "No Python virtualenv detected. Monitoring will probably not work." - ) + logging.warning("No Python virtualenv detected. Monitoring will probably not work.") nodes = sorted(set(nodes)) workdir = self.workdir / "monitoring" diff --git a/benchmarks/src/clusterutils/profiler.py b/benchmarks/src/clusterutils/profiler.py index e1a44673a..3ff24267e 100644 --- a/benchmarks/src/clusterutils/profiler.py +++ b/benchmarks/src/clusterutils/profiler.py @@ -37,9 +37,7 @@ def __init__(self, frequency: int): def check_availability(self): if not is_binary_available("perf"): - raise Exception( - "Flamegraph profiling is not available. Please make sure that `perf` is available." - ) + raise Exception("Flamegraph profiling is not available. Please make sure that `perf` is available.") def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand: path = output_dir / "perf-records.txt" @@ -73,9 +71,7 @@ def __init__(self, events: List[str] = None): def check_availability(self): if not is_binary_available("perf"): - raise Exception( - "Performance events profiling is not available. Please install `perf`." - ) + raise Exception("Performance events profiling is not available. Please install `perf`.") def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand: path = output_dir / "perf-events.txt" @@ -100,9 +96,7 @@ class CachegrindProfiler(Profiler): def check_availability(self): if not is_binary_available("valgrind"): - raise Exception( - "Valgrind profiling is not available. Please install `valgrind`." - ) + raise Exception("Valgrind profiling is not available. Please install `valgrind`.") def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand: path = output_dir / "cachegrind.txt" @@ -122,9 +116,7 @@ class CallgrindProfiler(Profiler): def check_availability(self): if not is_binary_available("valgrind"): - raise Exception( - "Valgrind profiling is not available. Please install `valgrind`." - ) + raise Exception("Valgrind profiling is not available. Please install `valgrind`.") def profile(self, command: List[str], output_dir: Path) -> ProfiledCommand: path = output_dir / "callgrind.txt" diff --git a/benchmarks/src/environment/hq.py b/benchmarks/src/environment/hq.py index bbab1b8c4..9bab30f65 100644 --- a/benchmarks/src/environment/hq.py +++ b/benchmarks/src/environment/hq.py @@ -80,9 +80,7 @@ def parameters(self) -> Dict[str, Any]: return params -def assign_workers( - workers: List[HqWorkerConfig], nodes: List[str] -) -> Dict[str, List[HqWorkerConfig]]: +def assign_workers(workers: List[HqWorkerConfig], nodes: List[str]) -> Dict[str, List[HqWorkerConfig]]: round_robin_node = 0 used_round_robin = set() @@ -99,14 +97,10 @@ def assign_workers( node = round_robin_node round_robin_node = (round_robin_node + 1) % len(nodes) if node in used_round_robin: - logging.warning( - f"There are more workers ({len(workers)}) than worker nodes ({len(nodes)})" - ) + logging.warning(f"There are more workers ({len(workers)}) than worker nodes ({len(nodes)})") used_round_robin.add(node) if node >= len(nodes): - raise Exception( - f"Selected worker node is {node}, but there are only {len(nodes)} worker node(s)" - ) + raise Exception(f"Selected worker node is {node}, but there are only {len(nodes)} worker node(s)") node_assignments[nodes[node]].append(worker) return dict(node_assignments) @@ -126,11 +120,7 @@ def __init__(self, info: HqClusterInfo, workdir: Path): self.nodes = self.info.cluster.node_list.resolve() assert self.nodes - worker_nodes = ( - self.nodes - if isinstance(self.info.cluster.node_list, Local) - else self.nodes[1:] - ) + worker_nodes = self.nodes if isinstance(self.info.cluster.node_list, Local) else self.nodes[1:] if not worker_nodes: raise Exception("No worker nodes are available") @@ -234,9 +224,7 @@ def _wait_for_server_start(self): def _wait_for_workers(self, count: int): def get_worker_count(): - output = subprocess.check_output( - self._shared_args() + ["--output-mode", "json", "worker", "list"] - ) + output = subprocess.check_output(self._shared_args() + ["--output-mode", "json", "worker", "list"]) return len(json.loads(output)) == count wait_until(lambda: get_worker_count()) @@ -245,9 +233,7 @@ def _log_env_value(self) -> str: return f"hyperqueue={'DEBUG' if self.info.debug else 'INFO'}" -def apply_profilers( - args: StartProcessArgs, profilers: List[Profiler], output_dir: Path -): +def apply_profilers(args: StartProcessArgs, profilers: List[Profiler], output_dir: Path): for profiler in profilers: profiler.check_availability() result = profiler.profile(args.args, output_dir.absolute()) diff --git a/benchmarks/src/environment/snake.py b/benchmarks/src/environment/snake.py index daab89ae9..90e4c8f7d 100644 --- a/benchmarks/src/environment/snake.py +++ b/benchmarks/src/environment/snake.py @@ -65,6 +65,5 @@ def submit(self, cmds: str, cpus_per_task: int): ) if not ret: raise Exception( - f"SnakeMake execution failed. You can find more details in " - f"{self.workdir / '.snakemake' / 'log'}" + f"SnakeMake execution failed. You can find more details in {self.workdir / '.snakemake' / 'log'}" ) diff --git a/benchmarks/src/executor/executor.py b/benchmarks/src/executor/executor.py index 0a3873c91..8976c19c1 100644 --- a/benchmarks/src/executor/executor.py +++ b/benchmarks/src/executor/executor.py @@ -15,7 +15,5 @@ def __post_init__(self): class BenchmarkExecutor: - def execute( - self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext - ) -> BenchmarkResult: + def execute(self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult: raise NotImplementedError diff --git a/benchmarks/src/executor/external_executor.py b/benchmarks/src/executor/external_executor.py index 6bd2d21ac..e08ad653e 100644 --- a/benchmarks/src/executor/external_executor.py +++ b/benchmarks/src/executor/external_executor.py @@ -27,18 +27,12 @@ class ExternalBenchmarkExecutor(BenchmarkExecutor): def __init__(self, init_script: Optional[Path] = None): self.init_script = init_script.resolve() - def execute( - self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext - ) -> BenchmarkResult: + def execute(self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult: return execute_benchmark_in_external_process(benchmark, ctx, self.init_script) -def serialize_benchmark( - descriptor: BenchmarkDescriptor, ctx: BenchmarkContext -) -> bytes: - serialized_benchmark = SerializedBenchmark( - descriptor=descriptor, ctx=ctx, cwd=Path(os.getcwd()).absolute() - ) +def serialize_benchmark(descriptor: BenchmarkDescriptor, ctx: BenchmarkContext) -> bytes: + serialized_benchmark = SerializedBenchmark(descriptor=descriptor, ctx=ctx, cwd=Path(os.getcwd()).absolute()) return pickle.dumps(serialized_benchmark) @@ -117,7 +111,7 @@ def create_error_message(returncode: int) -> str: with open(stdout_file) as stdout: with open(stderr_file) as stderr: return ( - f"Benchmark launch in external process ended with exception.\n" + "Benchmark launch in external process ended with exception.\n" f"Stdout: {stdout.read()}\n" f"Stderr: {stderr.read()}\n" f"Exit code: {returncode}" diff --git a/benchmarks/src/executor/local_executor.py b/benchmarks/src/executor/local_executor.py index 9c09df7d0..7f6842dc7 100644 --- a/benchmarks/src/executor/local_executor.py +++ b/benchmarks/src/executor/local_executor.py @@ -10,15 +10,11 @@ class LocalBenchmarkExecutor(BenchmarkExecutor): """Executes benchmarks in the current process""" - def execute( - self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext - ) -> BenchmarkResult: + def execute(self, benchmark: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult: return execute_benchmark(benchmark, ctx) -def execute_benchmark( - descriptor: BenchmarkDescriptor, ctx: BenchmarkContext -) -> BenchmarkResult: +def execute_benchmark(descriptor: BenchmarkDescriptor, ctx: BenchmarkContext) -> BenchmarkResult: env = descriptor.env_descriptor.create_environment(ctx.workdir) workload = descriptor.workload diff --git a/benchmarks/src/monitoring/record.py b/benchmarks/src/monitoring/record.py index 12d31cbc9..510db7789 100644 --- a/benchmarks/src/monitoring/record.py +++ b/benchmarks/src/monitoring/record.py @@ -44,9 +44,7 @@ def serialize(self, file): def record_resources() -> ResourceRecord: cpus = psutil.cpu_percent(percpu=True) mem = psutil.virtual_memory().percent - connections = sum( - 1 if c[5] == "ESTABLISHED" else 0 for c in psutil.net_connections() - ) + connections = sum(1 if c[5] == "ESTABLISHED" else 0 for c in psutil.net_connections()) bytes = psutil.net_io_counters() io = psutil.disk_io_counters() @@ -67,17 +65,13 @@ def record_processes(processes: List[psutil.Process]) -> Dict[str, ProcessRecord try: memory_info = process.memory_info() cpu_utilization = process.cpu_percent() - data[str(process.pid)] = ProcessRecord( - rss=memory_info.rss, vm=memory_info.vms, cpu=cpu_utilization - ) + data[str(process.pid)] = ProcessRecord(rss=memory_info.rss, vm=memory_info.vms, cpu=cpu_utilization) except BaseException as e: logging.error(e) return data -def generate_record( - timestamp: int, processes: List[psutil.Process] -) -> MonitoringRecord: +def generate_record(timestamp: int, processes: List[psutil.Process]) -> MonitoringRecord: return MonitoringRecord( timestamp=timestamp, resources=record_resources(), diff --git a/benchmarks/src/postprocessing/common.py b/benchmarks/src/postprocessing/common.py index 9ee6d56e9..f0600840a 100644 --- a/benchmarks/src/postprocessing/common.py +++ b/benchmarks/src/postprocessing/common.py @@ -62,9 +62,7 @@ def get_process_aggregated_stats(report: ClusterReport) -> Dict[Any, ProcessStat if pid in pid_to_key: key = pid_to_key[pid] used_keys.add(key) - process_stats[key]["max_rss"] = max( - process_record.rss, process_stats[key]["max_rss"] - ) + process_stats[key]["max_rss"] = max(process_record.rss, process_stats[key]["max_rss"]) process_stats[key]["avg_cpu"].append(process_record.cpu) for worker in process_stats: @@ -74,10 +72,7 @@ def get_process_aggregated_stats(report: ClusterReport) -> Dict[Any, ProcessStat for key in unused_keys: del process_stats[key] - return { - k: ProcessStats(max_rss=v["max_rss"], avg_cpu=v["avg_cpu"]) - for (k, v) in process_stats.items() - } + return {k: ProcessStats(max_rss=v["max_rss"], avg_cpu=v["avg_cpu"]) for (k, v) in process_stats.items()} def average(data) -> float: diff --git a/benchmarks/src/postprocessing/monitor.py b/benchmarks/src/postprocessing/monitor.py index 93f7b3abd..bc5da8783 100644 --- a/benchmarks/src/postprocessing/monitor.py +++ b/benchmarks/src/postprocessing/monitor.py @@ -91,9 +91,7 @@ def select_colors(items: List) -> List[str]: return colors -def prepare_time_range_figure( - range: TimeRange, width=720, height=250, **kwargs -) -> Figure: +def prepare_time_range_figure(range: TimeRange, width=720, height=250, **kwargs) -> Figure: fig = figure( plot_width=width, plot_height=height, @@ -120,9 +118,7 @@ def render_node_per_cpu_pct_utilization(figure: Figure, df: pd.DataFrame): cpu_series = df[CPU_KEY] cpu_count = len(cpu_series.iloc[0]) - cpus = [ - resample(cpu_series.apply(lambda res: res[i]), time) for i in range(cpu_count) - ] + cpus = [resample(cpu_series.apply(lambda res: res[i]), time) for i in range(cpu_count)] cpu_mean = resample(cpu_series.apply(lambda res: average(res)), time) figure.yaxis[0].formatter = NumeralTickFormatter(format="0 %") @@ -189,9 +185,7 @@ def render_node_network_connections(figure: Figure, df: pd.DataFrame): figure.line(x="x", y="count", legend_label="Network connections", source=data) -def render_bytes_sent_received( - figure: Figure, df: pd.DataFrame, label: str, read_col: str, write_col: str -): +def render_bytes_sent_received(figure: Figure, df: pd.DataFrame, label: str, read_col: str, write_col: str): time = df[DATETIME_KEY] def accumulate(column): @@ -202,9 +196,7 @@ def accumulate(column): read = accumulate(read_col) write = accumulate(write_col) - data = ColumnDataSource( - dict(rx=read, rx_kb=read / 1024, tx=write, tx_kb=write / 1024, x=read.index) - ) + data = ColumnDataSource(dict(rx=read, rx_kb=read / 1024, tx=write, tx_kb=write / 1024, x=read.index)) tooltips = [("RX", "@rx_kb KiB"), ("TX", "@tx_kb KiB")] figure.add_tools(HoverTool(tooltips=tooltips)) @@ -213,12 +205,8 @@ def accumulate(column): y_max = max(max(data.data["rx"]), max(data.data["tx"])) figure.y_range = Range1d(0, y_max + 0.01) - figure.line( - x="x", y="rx", color="blue", legend_label="{} RX".format(label), source=data - ) - figure.line( - x="x", y="tx", color="red", legend_label="{} TX".format(label), source=data - ) + figure.line(x="x", y="rx", color="blue", legend_label="{} RX".format(label), source=data) + figure.line(x="x", y="tx", color="red", legend_label="{} TX".format(label), source=data) def render_node_network_activity(figure: Figure, df: pd.DataFrame): @@ -232,9 +220,7 @@ def render_node_network_activity(figure: Figure, df: pd.DataFrame): def render_node_io_activity(figure: Figure, df: pd.DataFrame): - render_bytes_sent_received( - figure, df, label="I/O", read_col=IO_READ_KEY, write_col=IO_WRITE_KEY - ) + render_bytes_sent_received(figure, df, label="I/O", read_col=IO_READ_KEY, write_col=IO_WRITE_KEY) def render_node_utilization(df: pd.DataFrame) -> LayoutDOM: @@ -267,9 +253,7 @@ def get_node_description(report: ClusterReport, hostname: str) -> str: return description -def render_nodes_resource_usage( - report: ClusterReport, resources_df: pd.DataFrame -) -> LayoutDOM: +def render_nodes_resource_usage(report: ClusterReport, resources_df: pd.DataFrame) -> LayoutDOM: items = sorted(resources_df.groupby(HOSTNAME_KEY), key=lambda item: item[0]) rows = [] for hostname, node_data in items: @@ -338,9 +322,7 @@ def create_global_resource_datasource_and_tooltips( return (source, tooltips) -def render_global_resource_usage( - report: ClusterReport, resources_df: pd.DataFrame -) -> LayoutDOM: +def render_global_resource_usage(report: ClusterReport, resources_df: pd.DataFrame) -> LayoutDOM: df = resources_df[[DATETIME_KEY, HOSTNAME_KEY, CPU_KEY, MEM_KEY]].copy() # Average CPUs per record df[CPU_KEY] = df[CPU_KEY].apply(lambda entry: average(entry)) @@ -351,9 +333,7 @@ def render_global_resource_usage( hostnames = sorted(df[HOSTNAME_KEY].unique()) def render(title: str, key: str) -> Column: - (source, tooltips) = create_global_resource_datasource_and_tooltips( - time_index, df, key - ) + (source, tooltips) = create_global_resource_datasource_and_tooltips(time_index, df, key) figure = prepare_time_range_figure(range, tooltips=tooltips) render_global_percent_resource_usage(figure, source, report, hostnames) return Column(children=[Div(text=title), figure]) @@ -535,9 +515,7 @@ def render_item(item: Tuple[str, str]): return Panel(child=create_tabs(items, render_item), title=process.key) - return Panel( - child=create_tabs(processes, render_process_output), title=node.hostname - ) + return Panel(child=create_tabs(processes, render_process_output), title=node.hostname) return create_node_tabs(report, render_node_output) @@ -551,9 +529,7 @@ def render_process(pid: int) -> Optional[Model]: process_data = node_data[node_data[PID_KEY] == pid] max_rss = process_data[RSS_KEY].max() avg_cpu = process_data[AVG_CPU_KEY].mean() - process = [ - p for p in report.cluster.nodes[node.hostname].processes if p.pid == pid - ] + process = [p for p in report.cluster.nodes[node.hostname].processes if p.pid == pid] if not process: logging.warning(f"Process {node.hostname}/{pid} not found in cluster") return @@ -572,9 +548,7 @@ def render_process(pid: int) -> Optional[Model]: mem_figure.add_tools(HoverTool(tooltips=[("RSS (MiB)", "@rss")])) data = ColumnDataSource(dict(rss=mem, x=mem.index)) - mem_figure.line( - x="x", y="rss", color="red", legend_label="Memory", source=data - ) + mem_figure.line(x="x", y="rss", color="red", legend_label="Memory", source=data) cpu = resample(process_data[AVG_CPU_KEY], time) cpu_figure = prepare_time_range_figure(range) @@ -584,18 +558,14 @@ def render_process(pid: int) -> Optional[Model]: cpu_figure.add_tools(HoverTool(tooltips=[("Avg. CPU usage", "@cpu")])) data = ColumnDataSource(dict(cpu=cpu / 100.0, x=cpu.index)) - cpu_figure.line( - x="x", y="cpu", color="blue", legend_label="CPU usage (%)", source=data - ) + cpu_figure.line(x="x", y="cpu", color="blue", legend_label="CPU usage (%)", source=data) - summary = PreText( - text=f""" + summary = PreText(text=f""" PID: {pid} Key: {process.key} Max. RSS: {humanize.naturalsize(max_rss, binary=True)} Avg. CPU: {avg_cpu:.02f} % -""".strip() - ) +""".strip()) columns = [summary, mem_figure, cpu_figure] return Panel(child=Column(children=columns), title=process.key) @@ -618,9 +588,7 @@ def create_page(report: ClusterReport): ] if not per_node_df.empty: - structure += [ - ("Node utilization", lambda r: render_nodes_resource_usage(r, per_node_df)) - ] + structure += [("Node utilization", lambda r: render_nodes_resource_usage(r, per_node_df))] per_process_df = create_per_process_resources_df(report.monitoring) if not per_process_df.empty: diff --git a/benchmarks/src/postprocessing/overview.py b/benchmarks/src/postprocessing/overview.py index 6cf598f69..c883cda46 100644 --- a/benchmarks/src/postprocessing/overview.py +++ b/benchmarks/src/postprocessing/overview.py @@ -52,9 +52,7 @@ class BenchmarkEntry: def render(template: str, **kwargs) -> str: - return Template(template).render( - **kwargs, format_bytes=lambda v: humanize.naturalsize(v, binary=True) - ) + return Template(template).render(**kwargs, format_bytes=lambda v: humanize.naturalsize(v, binary=True)) def style(level=0) -> Dict[str, Any]: @@ -62,9 +60,7 @@ def style(level=0) -> Dict[str, Any]: def render_benchmark(entry: BenchmarkEntry): - with open( - os.path.join(os.path.dirname(__file__), "templates/benchmark.html") - ) as file: + with open(os.path.join(os.path.dirname(__file__), "templates/benchmark.html")) as file: template = file.read() node_utilization = { @@ -100,9 +96,7 @@ def render_bench_subtab(child, name): panel = render_bench_subtab(render_profiling_data(report), name) elif key == "Global usage": per_node_df = create_global_resources_df(report.monitoring) - panel = render_bench_subtab( - render_global_resource_usage(report, per_node_df), name - ) + panel = render_bench_subtab(render_global_resource_usage(report, per_node_df), name) elif key == "Node usage": # Keep maximum of each subfigure for axis sync between rows per_node_df = create_global_resources_df(report.monitoring) @@ -111,17 +105,13 @@ def render_bench_subtab(child, name): for fig in figs: figmax = fig.y_range.end if node_maxes.get(fig.title.text) is not None: - node_maxes[fig.title.text] = max( - figmax, node_maxes[fig.title.text] - ) + node_maxes[fig.title.text] = max(figmax, node_maxes[fig.title.text]) else: node_maxes[fig.title.text] = figmax panel = render_bench_subtab(child, name) elif key == "Process usage": per_process_df = create_per_process_resources_df(report.monitoring) - panel = render_bench_subtab( - render_process_resource_usage(report, per_process_df), name - ) + panel = render_bench_subtab(render_process_resource_usage(report, per_process_df), name) widgets[key].append(panel) for w in widgets["Node usage"]: @@ -184,17 +174,13 @@ def render_workload( workload_params: str, data: pd.DataFrame, ): - with open( - os.path.join(os.path.dirname(__file__), "templates/workload.html") - ) as file: + with open(os.path.join(os.path.dirname(__file__), "templates/workload.html")) as file: template = file.read() environments = {} for group, group_data in groupby_environment(data): key = group[0] + "(" + group[1] + ") [" + workload + ":" + workload_params + "]" - environments[key] = render_environment( - level + 1, entry_map, group[0], group[1], group_data - ) + environments[key] = render_environment(level + 1, entry_map, group[0], group[1], group_data) return render(template, environments=environments) @@ -229,9 +215,7 @@ def pregenerate_entries(database: Database, directory: Path) -> EntryMap: entry_map = {} with Pool() as pool: args = [(record, directory) for record in database.records] - for (record, _), entry in tqdm.tqdm( - zip(args, pool.imap(generate_entry, args)), total=len(args) - ): + for (record, _), entry in tqdm.tqdm(zip(args, pool.imap(generate_entry, args)), total=len(args)): if entry is not None: entry_map[record.benchmark_metadata["key"]] = entry return entry_map @@ -248,9 +232,7 @@ def generate_summary_html(database: Database, directory: Path) -> Path: def summary_by_benchmark(df: pd.DataFrame, file): - grouped = df.groupby(["workload", "workload-params", "env", "env-params"])[ - "duration" - ] + grouped = df.groupby(["workload", "workload-params", "env", "env-params"])["duration"] with pd_print_all(): for group, data in sorted(grouped, key=lambda item: item[0]): result = data.describe().to_frame().transpose() @@ -285,9 +267,7 @@ def two_level_summary( print(file=file) -def generate_comparison_html( - benchmarks: List[str], database: Database, directory: Path -): +def generate_comparison_html(benchmarks: List[str], database: Database, directory: Path): entry_map = pregenerate_entries(database, directory) ensure_directory(directory.joinpath("comparisons")) @@ -321,9 +301,7 @@ def generate(f): two_level_summary(df, groupby_workload, groupby_environment, f) print("Grouped by environment:", file=f) - two_level_summary( - df, groupby_environment, groupby_workload, f, print_total=True - ) + two_level_summary(df, groupby_environment, groupby_workload, f, print_total=True) print("Grouped by benchmark:", file=f) summary_by_benchmark(df, f) @@ -361,9 +339,7 @@ def create_href(addr, location, name): comparisons = {} for comparison in glob(str(directory.joinpath("comparisons/*"))): name = os.path.basename(comparison) - comparisons[name] = create_href( - addr, Path("comparisons").joinpath(name), os.path.splitext(name)[0] - ) + comparisons[name] = create_href(addr, Path("comparisons").joinpath(name), os.path.splitext(name)[0]) comparisons_col = column(list(comparisons.values())) diff --git a/benchmarks/src/postprocessing/report.py b/benchmarks/src/postprocessing/report.py index f3612b973..40c40128c 100644 --- a/benchmarks/src/postprocessing/report.py +++ b/benchmarks/src/postprocessing/report.py @@ -52,9 +52,7 @@ def load_profiling_data(cluster: Cluster) -> ProfilingData: if file.is_file(): process_records[tag] = file else: - logging.warning( - f"Profiler record `{tag}` for `{process.key}` not found at {file}" - ) + logging.warning(f"Profiler record `{tag}` for `{process.key}` not found at {file}") data[process.key] = process_records return data @@ -69,7 +67,5 @@ def load_monitoring_data(directory: Path, cluster: Cluster) -> MonitoringData: with open(trace_file) as f: data[node] = MonitoringRecord.deserialize_records(f) else: - logging.warning( - f"Monitoring trace for {node.hostname} not found at {trace_file}" - ) + logging.warning(f"Monitoring trace for {node.hostname} not found at {trace_file}") return data diff --git a/benchmarks/src/postprocessing/serve.py b/benchmarks/src/postprocessing/serve.py index 8ab0af920..2a682f6c8 100644 --- a/benchmarks/src/postprocessing/serve.py +++ b/benchmarks/src/postprocessing/serve.py @@ -61,9 +61,7 @@ def get(self, key: str): class ComparisonHandler(web.RequestHandler): def get(self, key: str): - html_file = open( - Path("summary/comparisons").joinpath(key), "r", encoding="utf-8" - ) + html_file = open(Path("summary/comparisons").joinpath(key), "r", encoding="utf-8") source_code = html_file.read() self.write(source_code) @@ -79,16 +77,10 @@ def get(self, key: str): compare0 = tables[1] > tables[0] def color_0(x): - return [ - "background-color:red" if i[1][0] else "" - for i in compare0.iterrows() - ] + return ["background-color:red" if i[1][0] else "" for i in compare0.iterrows()] def color_1(x): - return [ - "background-color:red" if i[1][0] else "" - for i in compare1.iterrows() - ] + return ["background-color:red" if i[1][0] else "" for i in compare1.iterrows()] # tables[1].style.apply_index(color_b) output = "" @@ -104,9 +96,7 @@ def color_1(x): plt.savefig(img, format="png") plt.clf() - with open( - os.path.join(os.path.dirname(__file__), "templates/compare_table.html") - ) as fp: + with open(os.path.join(os.path.dirname(__file__), "templates/compare_table.html")) as fp: file = fp.read() self.write(render(file, tables=output)) @@ -127,9 +117,7 @@ class OverViewHandler(web.RequestHandler): def summary_reader_buffer(self): df = create_database_df(database) print("Grouped by workload:", file=self.summary_txt) - two_level_summary( - df, groupby_workload, groupby_environment, self.summary_txt - ) + two_level_summary(df, groupby_workload, groupby_environment, self.summary_txt) print("Grouped by environment:", file=self.summary_txt) two_level_summary( df, @@ -160,7 +148,6 @@ def summary_reader(self): key = line_parsed[0] entries = [] continue - entries.append(line_parsed[0]) if line_parsed[0] != "" else None i += 1 if line == "": @@ -175,24 +162,16 @@ def get(self): self.summary_reader_buffer() data = self.summary_reader() df = create_database_df(database) - grouped = df.groupby( - ["workload", "workload-params", "env", "env-params", "index"] - )["duration"] + grouped = df.groupby(["workload", "workload-params", "env", "env-params", "index"])["duration"] pairs = [] for pair in list(grouped): - pairs.append( - [pair[0], pair[1].describe().dropna().to_frame().T.to_html()] - ) + pairs.append([pair[0], pair[1].describe().dropna().to_frame().T.to_html()]) data["Grouped by benchmark:"] = pairs for i in range(len(data["Grouped by benchmark:"])): - formated_str = "-".join( - [str(z) for z in data["Grouped by benchmark:"][i][0]] - ) + formated_str = "-".join([str(z) for z in data["Grouped by benchmark:"][i][0]]) formated_str = formated_str.replace(",", "_") data["Grouped by benchmark:"][i][0] = formated_str - with open( - os.path.join(os.path.dirname(__file__), "templates/summary.html") - ) as fp: + with open(os.path.join(os.path.dirname(__file__), "templates/summary.html")) as fp: file = fp.read() self.write(render(file, data=data, keys=list(data.keys())[:2])) # self.write(data) diff --git a/benchmarks/src/submit/execute_script.py b/benchmarks/src/submit/execute_script.py index 87b6a2078..2ad5bf593 100644 --- a/benchmarks/src/submit/execute_script.py +++ b/benchmarks/src/submit/execute_script.py @@ -35,33 +35,23 @@ def execute( options = deserialize_submit_options(submit_options) database_file = (directory / "result.json").resolve() - if ( - database_path - and database_path.is_file() - and database_file != database_path.resolve() - ): + if database_path and database_path.is_file() and database_file != database_path.resolve(): database_file = directory / database_path.name shutil.copyfile(database_path, database_file) database = Database(database_file) - runner = BenchmarkRunner( - database, workdir=directory, materialize_fn=materialize_benchmark - ) + runner = BenchmarkRunner(database, workdir=directory, materialize_fn=materialize_benchmark) benchmark_count = len(identifiers) def run(): - for identifier, benchmark, result in tqdm( - runner.compute(identifiers), total=benchmark_count - ): + for identifier, benchmark, result in tqdm(runner.compute(identifiers), total=benchmark_count): logging.info(f"Finished benchmark {identifier}: {result}") max_runtime = options.walltime if max_runtime.total_seconds() > 60: max_runtime = max_runtime - timedelta(minutes=1) - logging.info( - f"Starting to benchmark {benchmark_count} benchmarks, max time is {max_runtime}" - ) + logging.info(f"Starting to benchmark {benchmark_count} benchmarks, max time is {max_runtime}") start = time.time() try: @@ -84,11 +74,7 @@ def run(): root_dir = directory / "resubmits" directory = generate_job_dir(root_dir) - remaining = [ - identifier - for identifier in identifiers - if not database.has_record_for(identifier) - ] + remaining = [identifier for identifier in identifiers if not database.has_record_for(identifier)] logging.warning( f"Benchmark didn't finish in {max_runtime}, computed {benchmark_count - len(remaining)}" diff --git a/benchmarks/src/submit/submit.py b/benchmarks/src/submit/submit.py index e86c59f26..45bd020f5 100644 --- a/benchmarks/src/submit/submit.py +++ b/benchmarks/src/submit/submit.py @@ -65,11 +65,7 @@ def create_submit_script_body( workdir = Path(os.getcwd()).absolute() resubmit_flag = "--resubmit" if resubmit else "--no-resubmit" - init_cmd = ( - f"source {options.init_script.absolute()} || exit 1" - if options.init_script - else "" - ) + init_cmd = f"source {options.init_script.absolute()} || exit 1" if options.init_script else "" command = f"""{header} diff --git a/benchmarks/src/utils/benchmark.py b/benchmarks/src/utils/benchmark.py index ee2a17bdb..4707abd7d 100644 --- a/benchmarks/src/utils/benchmark.py +++ b/benchmarks/src/utils/benchmark.py @@ -17,18 +17,14 @@ def run_benchmarks(workdir: Path, descriptors: List[BenchmarkDescriptor]) -> Dat runner = BenchmarkRunner(database, workdir=workdir) materialized = runner.materialize_and_skip(descriptors) - for _info, _result in tqdm.tqdm( - runner.compute_materialized(materialized), total=len(materialized) - ): + for _info, _result in tqdm.tqdm(runner.compute_materialized(materialized), total=len(materialized)): pass runner.save() return database -def run_benchmarks_with_postprocessing( - workdir: Path, descriptors: List[BenchmarkDescriptor] -): +def run_benchmarks_with_postprocessing(workdir: Path, descriptors: List[BenchmarkDescriptor]): database = run_benchmarks(workdir, descriptors) summary_txt = workdir / "summary.txt" diff --git a/benchmarks/src/utils/process.py b/benchmarks/src/utils/process.py index edd3ac182..21eccc00c 100644 --- a/benchmarks/src/utils/process.py +++ b/benchmarks/src/utils/process.py @@ -24,10 +24,8 @@ def execute_process( if result.returncode != 0: with open(stdout) as stdout_file: with open(stderr) as stderr_file: - raise Exception( - f"""The process {args} has exited with error code {result.returncode} + raise Exception(f"""The process {args} has exited with error code {result.returncode} Stdout: {stdout_file.read()} Stderr: {stderr_file.read()} -""".strip() - ) +""".strip()) return result diff --git a/benchmarks/src/workloads/sleep.py b/benchmarks/src/workloads/sleep.py index 00e23c6a2..e881c1e0e 100644 --- a/benchmarks/src/workloads/sleep.py +++ b/benchmarks/src/workloads/sleep.py @@ -21,9 +21,7 @@ def name(self) -> str: class SleepHQ(Sleep): def execute(self, env: HqEnvironment) -> WorkloadExecutionResult: - return measure_hq_tasks( - env, ["sleep", str(self.sleep_duration)], task_count=self.task_count - ) + return measure_hq_tasks(env, ["sleep", str(self.sleep_duration)], task_count=self.task_count) class SleepSnake(Sleep): diff --git a/benchmarks/src/workloads/utils.py b/benchmarks/src/workloads/utils.py index 3ece93bb5..5cebe4060 100644 --- a/benchmarks/src/workloads/utils.py +++ b/benchmarks/src/workloads/utils.py @@ -47,9 +47,7 @@ def measure_hq_tasks( return result(timer.duration()) -def measure_snake_tasks( - env: Environment, command: str, task_count: int, cpus_per_task=1 -) -> WorkloadExecutionResult: +def measure_snake_tasks(env: Environment, command: str, task_count: int, cpus_per_task=1) -> WorkloadExecutionResult: assert isinstance(env, SnakeEnvironment) args = f"""rule all: diff --git a/crates/hyperqueue/src/client/commands/autoalloc.rs b/crates/hyperqueue/src/client/commands/autoalloc.rs index ee4bb4a41..78f127949 100644 --- a/crates/hyperqueue/src/client/commands/autoalloc.rs +++ b/crates/hyperqueue/src/client/commands/autoalloc.rs @@ -1,6 +1,7 @@ use std::time::Duration; use clap::Parser; +use humantime::format_duration; use crate::client::commands::worker::{ArgServerLostPolicy, SharedWorkerStartOpts}; use crate::client::globalsettings::GlobalSettings; @@ -121,6 +122,15 @@ struct SharedQueueOpts { #[arg(long)] worker_stop_cmd: Option, + /// Time limit after which workers in the submitted allocations will be stopped. + /// By default, it is set to the time limit of the allocation. + /// However, if you want the workers to be stopped sooner, for example to give `worker_stop_cmd` + /// more time to execute before the allocation is killed, you can lower the worker time limit. + /// + /// The limit must not be larger than the allocation time limit. + #[arg(long, value_parser = parse_hms_or_human_time)] + worker_time_limit: Option, + /// Additional arguments passed to the submit command #[arg(trailing_var_arg(true))] additional_args: Vec, @@ -208,7 +218,7 @@ pub async fn command_autoalloc( Ok(()) } -fn args_to_params(args: SharedQueueOpts) -> AllocationQueueParams { +fn args_to_params(args: SharedQueueOpts) -> anyhow::Result { let SharedQueueOpts { backlog, time_limit, @@ -218,6 +228,7 @@ fn args_to_params(args: SharedQueueOpts) -> AllocationQueueParams { worker_args, worker_start_cmd, worker_stop_cmd, + worker_time_limit, additional_args, on_server_lost, no_dry_run: _, @@ -232,6 +243,14 @@ wasted allocation duration." } } + if let Some(ref worker_time_limit) = worker_time_limit { + if worker_time_limit > &time_limit { + return Err(anyhow::anyhow!( + "Worker time limit cannot be larger than queue time limit" + )); + } + } + let SharedWorkerStartOpts { cpus, resource, @@ -275,7 +294,13 @@ wasted allocation duration." format!("\"{}\"", server_lost_policy_to_str(&on_server_lost.into())), ]); - AllocationQueueParams { + let worker_time_limit = worker_time_limit.unwrap_or(time_limit); + worker_args.extend([ + "--time-limit".to_string(), + format!("\"{}\"", format_duration(worker_time_limit)), + ]); + + Ok(AllocationQueueParams { workers_per_alloc, backlog, timelimit: time_limit, @@ -286,7 +311,7 @@ wasted allocation duration." max_worker_count, worker_args, idle_timeout, - } + }) } async fn dry_run_command(mut session: ClientSession, opts: DryRunOpts) -> anyhow::Result<()> { @@ -296,7 +321,7 @@ async fn dry_run_command(mut session: ClientSession, opts: DryRunOpts) -> anyhow }; let message = FromClientMessage::AutoAlloc(AutoAllocRequest::DryRun { manager, - parameters, + parameters: parameters?, }); rpc_call!(session.connection(), message, @@ -325,7 +350,7 @@ async fn add_queue(mut session: ClientSession, opts: AddQueueOpts) -> anyhow::Re let message = FromClientMessage::AutoAlloc(AutoAllocRequest::AddQueue { manager, - parameters, + parameters: parameters?, dry_run, }); diff --git a/crates/hyperqueue/src/server/autoalloc/queue/common.rs b/crates/hyperqueue/src/server/autoalloc/queue/common.rs index ad7902c75..0e4f001ee 100644 --- a/crates/hyperqueue/src/server/autoalloc/queue/common.rs +++ b/crates/hyperqueue/src/server/autoalloc/queue/common.rs @@ -144,12 +144,8 @@ pub fn build_worker_args( .unwrap_or_else(get_default_worker_idle_time); let idle_timeout = humantime::format_duration(idle_timeout); - // Allow some time for cleanup - let time_limit = queue_info.timelimit.saturating_sub(Duration::from_secs(10)); - let time_limit = humantime::format_duration(time_limit); - let mut args = format!( - "{} worker start --idle-timeout \"{idle_timeout}\" --time-limit \"{time_limit}\" --manager \"{manager}\" --server-dir \"{}\"", + "{} worker start --idle-timeout \"{idle_timeout}\" --manager \"{manager}\" --server-dir \"{}\"", hq_path.display(), server_dir.display() ); diff --git a/crates/pyhq/python/hyperqueue/client.py b/crates/pyhq/python/hyperqueue/client.py index 3a9aff9e1..619154ef9 100644 --- a/crates/pyhq/python/hyperqueue/client.py +++ b/crates/pyhq/python/hyperqueue/client.py @@ -83,9 +83,7 @@ def submit(self, job: Job) -> SubmittedJob: raise Exception("Submitted job must have at least a single task") job_id = self.connection.submit_job(job_desc) - logging.info( - f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}" - ) + logging.info(f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}") return SubmittedJob(job=job, id=job_id) def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bool: @@ -95,9 +93,7 @@ def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bo job_ids_str = ",".join(str(id) for id in job_ids) if len(jobs) > 1: job_ids_str = "{" + job_ids_str + "}" - logging.info( - f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish" - ) + logging.info(f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish") callback = create_progress_callback() diff --git a/crates/pyhq/python/hyperqueue/job.py b/crates/pyhq/python/hyperqueue/job.py index c1db5c53e..d2ba9f014 100644 --- a/crates/pyhq/python/hyperqueue/job.py +++ b/crates/pyhq/python/hyperqueue/job.py @@ -31,11 +31,7 @@ def __init__( self.tasks: List[Task] = [] self.task_map: Dict[TaskId, Task] = {} self.max_fails = max_fails - self.default_workdir = ( - Path(default_workdir).resolve() - if default_workdir is not None - else default_workdir - ) + self.default_workdir = Path(default_workdir).resolve() if default_workdir is not None else default_workdir self.default_env = default_env or {} def task_by_id(self, id: TaskId) -> Optional[Task]: diff --git a/crates/pyhq/python/hyperqueue/output.py b/crates/pyhq/python/hyperqueue/output.py index df6bd5faf..ed507dd27 100644 --- a/crates/pyhq/python/hyperqueue/output.py +++ b/crates/pyhq/python/hyperqueue/output.py @@ -19,13 +19,9 @@ def default_stderr() -> str: # TODO: how to resolve TASK_ID in the context of some other task? class Output: - def __init__( - self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None - ): + def __init__(self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None): if filepath and extension: - raise ValidationException( - "Parameters `filepath` and `extension` are mutually exclusive" - ) + raise ValidationException("Parameters `filepath` and `extension` are mutually exclusive") self.name = name self.filepath = filepath diff --git a/crates/pyhq/python/hyperqueue/task/function/wrapper.py b/crates/pyhq/python/hyperqueue/task/function/wrapper.py index 5919c6bd9..e99b26c56 100644 --- a/crates/pyhq/python/hyperqueue/task/function/wrapper.py +++ b/crates/pyhq/python/hyperqueue/task/function/wrapper.py @@ -9,9 +9,7 @@ class CloudWrapper: Wraps a callable so that cloudpickle is used to pickle it, caching the pickle. """ - def __init__( - self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL - ): + def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL): if fn is None: if pickled_fn is None: raise ValueError("Pass at least one of `fn` and `pickled_fn`") @@ -27,9 +25,7 @@ def __init__( self.pickled_fn = pickled_fn self.cache = cache self.protocol = protocol - self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format( - self.fn, self.fn.__doc__ - ) + self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format(self.fn, self.fn.__doc__) if hasattr(self.fn, "__name__"): self.__name__ = self.fn.__name__ @@ -56,9 +52,7 @@ def _get_pickled_fn(self): return pfn def __call__(self, *args, **kwargs): - logging.debug( - f"Running function {self.fn} using args {args} and kwargs {kwargs}" - ) + logging.debug(f"Running function {self.fn} using args {args} and kwargs {kwargs}") return self.fn(*args, **kwargs) def __reduce__(self): diff --git a/crates/pyhq/python/hyperqueue/task/program.py b/crates/pyhq/python/hyperqueue/task/program.py index c376301de..f079de73b 100644 --- a/crates/pyhq/python/hyperqueue/task/program.py +++ b/crates/pyhq/python/hyperqueue/task/program.py @@ -95,8 +95,6 @@ def get_task_outputs(task: ExternalProgram) -> Dict[str, Output]: outputs = gather_outputs(task.args) + gather_outputs(task.env) for output in outputs: if output.name in output_map: - raise ValidationException( - f"Output `{output.name}` has been defined multiple times" - ) + raise ValidationException(f"Output `{output.name}` has been defined multiple times") output_map[output.name] = output return output_map diff --git a/crates/pyhq/python/hyperqueue/utils/package.py b/crates/pyhq/python/hyperqueue/utils/package.py index 783710dea..a7b8c26f7 100644 --- a/crates/pyhq/python/hyperqueue/utils/package.py +++ b/crates/pyhq/python/hyperqueue/utils/package.py @@ -3,7 +3,4 @@ def __init__(self, package: str): self.package = package def __str__(self): - return ( - f"Unable to import `{self.package}`. You have to install the " - f"`{self.package}` package." - ) + return f"Unable to import `{self.package}`. You have to install the `{self.package}` package." diff --git a/docs/deployment/allocation.md b/docs/deployment/allocation.md index 539b429fe..ac80aa0d6 100644 --- a/docs/deployment/allocation.md +++ b/docs/deployment/allocation.md @@ -53,52 +53,105 @@ create multiple allocation queues, and you can even combine PBS queues with Slur In addition to arguments that are passed to `qsub`/`sbatch`, you can also use several other command line options when creating a new allocation queue: -- **`--time-limit `** Sets the walltime of created allocations[^1]. - - This parameter is **required**, as HyperQueue must know the duration of the individual allocations. - Make sure that you pass a time limit that does not exceed the limit of the PBS/Slurm queue - that you intend to use, otherwise the allocation submissions will fail. You can use the - [`dry-run` command](#dry-run-command) to debug this. - -- `--backlog ` How many allocations should be queued (waiting to be started) in PBS/Slurm at any given time. -- `--workers-per-alloc ` How many workers should be requested in each allocation. This corresponds - to the number of requested nodes, as the allocator will always create a single worker per node. -- `--max-worker-count ` Maximum number of workers that can be queued or running in the created - allocation queue. The amount of workers will be limited by the manager (PBS/Slurm), but you can - use this parameter to make the limit smaller, for example if you also want to create manager allocations - outside HyperQueue. -- **Worker resources** You can specify [CPU](../jobs/cresources.md) and [generic](../jobs/resources.md) - resources of workers spawned in the created allocation queue. The name and syntax of these parameters - is the same as when you create a worker manually: - - === "PBS" - - ```bash - $ hq alloc add pbs --time-limit 1h --cpus 4x4 --resource "gpus/nvidia=range(1-2)" -- -qqprod -AAccount1 - ``` - - === "Slurm" - - ``` bash - $ hq alloc add slurm --time-limit 1h --cpus 4x4 --resource "gpus/nvidia=range(1-2)" -- --partition=p1 - ``` +#### Time limit +Format[^1]: **`--time-limit `** + +Sets the walltime of created allocations. + +This parameter is **required**, as HyperQueue must know the duration of the individual allocations. +Make sure that you pass a time limit that does not exceed the limit of the PBS/Slurm queue +that you intend to use, otherwise the allocation submissions will fail. You can use the +[`dry-run` command](#dry-run-command) to debug this. + +Workers in this allocation queue will be by default created with a time limit equal to the time limit of the queue +(unless overridden with [Worker time limit](#worker-time-limit)). + +!!! Important + If you specify a [time request](../jobs/jobs.md#time-management) + for a task, you should be aware that the time limit for the allocation queue **should be larger than the time request** + if you want to run this task on workers created by this allocations queue, because it will always take some time before + a worker is fully initialized. For example, if you set `--time-request 1h` when submitting a task, and `--time-limit 1h` + when creating an allocation queue, this task will never get scheduled on workers from this queue. + +#### Backlog +Format: `--backlog ` + +How many allocations should be queued (waiting to be started) in PBS/Slurm at any given time. Has to be a positive integer. + +#### Workers per allocation +Format: `--workers-per-alloc ` + +How many workers should be requested in each allocation. This corresponds to the number of requested nodes, as the allocator +will always create a single worker per node. + +#### Max worker count +Format: `--max-worker-count ` + +Maximum number of workers that can be queued or running in the allocation queue. The total amount of workers will be usually +limited by the manager (PBS/Slurm), but you can use this parameter to make the limit smaller, for example if you also want +to create manager allocations outside HyperQueue. + +#### Worker resources +You can specify [CPU](../jobs/cresources.md) and [generic](../jobs/resources.md) resources of workers spawned by the +allocation queue. The name and syntax of these parameters is the same as when you create a +[worker manually](../jobs/resources.md#worker-resources): + +=== "PBS" + + ```bash + $ hq alloc add pbs --time-limit 1h --cpus 4x4 --resource "gpus/nvidia=range(1-2)" -- -qqprod -AAccount1 + ``` + +=== "Slurm" + + ```bash + $ hq alloc add slurm --time-limit 1h --cpus 4x4 --resource "gpus/nvidia=range(1-2)" -- --partition=p1 + ``` If you do not pass any resources, they will be detected automatically (same as it works with `hq worker start`). -- `--idle-timeout ` [Idle timeout](worker.md#idle-timeout) for workers started by the -automatic allocator. We suggest that you do not use a long duration for this parameter, as it can -result in wasting precious allocation time. +#### Idle timeout +Format[^1]: `--idle-timeout ` + +Sets the [idle timeout](worker.md#idle-timeout) for workers started by the allocation queue. We suggest that you do not +use a long duration for this parameter, as it can result in wasting precious allocation time. + +#### Worker start command +Format: `--worker-start-cmd ` + +Specifies a shell command that will be executed on each allocated node just before a worker is started +on that node. You can use it e.g. to initialize some shared environment for the node, or to load software modules. + +#### Worker stop command +Format: - `--worker-stop-cmd ` + +Specifies a shell command that will be executed on each allocated node just after the worker stops on +that node. You can use it e.g. to clean up a previously initialized environment for the node. + +!!! Warning + + The execution of this command is best-effort! It is not guaranteed that the command will always be executed. For example, + PBS/Slurm can kill the allocation without giving HQ a chance to run the command. + +#### Worker time limit +Format[^1]: `--worker-time-limit ` + +Sets the time limit of workers spawned by the allocation queue. After the time limit expires, the worker will be stopped. +By default, the worker time limit is set to the time limit of the allocation queue. But if you want, you can shorten it +with this flag to make the worker exit sooner, for example to give more time for a [worker stop command](#worker-stop-command) +to execute. + +!!! Note -- `--worker-start-cmd ` This shell command will be executed on each allocated node just before a worker is started -on the given node. You can use it e.g. to initialize some shared environment for the node, or to load software modules. + This command is not designed to stop workers early if they have nothing to do. This functionality is + provided by [idle timeout](#idle-timeout). -- `--worker-stop-cmd ` This shell command will be executed on each allocated node just after the worker stops on -that node. You can use it e.g. to clean up a previously initialized environment for the node. **Note that the execution of -this command is best-effort! It is not guaranteed that the command will always be executed.** For example, PBS/Slurm can -kill the allocation without giving HQ a chance to run the command. +#### Name +Format: `--name ` -- `--name ` Name of the allocation queue. Will be used to name allocations. Serves for debug purposes only. +Name of the allocation queue. It will be used to name allocations submitted to the job manager. Serves for debug purposes +only. [^1]: You can use various [shortcuts](../cli/shortcuts.md#duration) for the duration value. diff --git a/docs/deployment/worker.md b/docs/deployment/worker.md index 25d3b95ef..b1c886a65 100644 --- a/docs/deployment/worker.md +++ b/docs/deployment/worker.md @@ -88,14 +88,17 @@ they are often started inside PBS/Slurm jobs that have a limited duration. It is very useful for the workers to know how much remaining time ("lifetime") do they have until they will be stopped. This duration is called the `Worker time limit`. -When a worker is started inside a PBS or Slurm job, it will automatically calculate the time limit from the job's -metadata. If you want to set time limit manually for workers started outside of PBS/Slurm jobs or if you want to +When a worker is started manually inside a PBS or Slurm job, it will automatically calculate the time limit from the job's +metadata. If you want to set time limit for workers started outside of PBS/Slurm jobs or if you want to override the detected settings, you can use the `--time-limit=` option[^1] when starting the worker. [^1]: You can use various [shortcuts](../cli/shortcuts.md#duration) for the duration value. When the time limit is reached, the worker is automatically terminated. +The time limit of a worker affects what tasks can be scheduled to it. For example, a task submitted with `--time-request 10m` +will not be scheduled onto a worker that only has a remaining time limit of 5 minutes. + ## Idle timeout When you deploy *HQ* workers inside a PBS or Slurm job, keeping the worker alive will drain resources from your accounting project (unless you use a free queue). If a worker has nothing to do, it might be better to terminate it diff --git a/scripts/bless_tests.sh b/scripts/bless_tests.sh new file mode 100755 index 000000000..7f4cd10e6 --- /dev/null +++ b/scripts/bless_tests.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +cd `dirname $0`/.. + +python3 -m pytest tests --inline-snapshot=fix + +# Reformat code to wrap long lines +./check.sh diff --git a/scripts/check.sh b/scripts/check.sh index 1bee66c7c..364d62895 100755 --- a/scripts/check.sh +++ b/scripts/check.sh @@ -9,7 +9,7 @@ cargo fmt --all # Format Python code isort --profile black scripts tests benchmarks crates/pyhq/python -black scripts tests benchmarks crates/pyhq/python +black --preview -l 120 scripts tests benchmarks crates/pyhq/python # Lint Python code flake8 scripts tests benchmarks crates/pyhq/python diff --git a/scripts/check_package_versions.py b/scripts/check_package_versions.py index cfcb9ea12..af343c2cd 100644 --- a/scripts/check_package_versions.py +++ b/scripts/check_package_versions.py @@ -5,18 +5,11 @@ """ Checks that HyperQueue and its binding have the same version. """ - output = ( - subprocess.check_output(["cargo", "metadata", "--no-deps", "-q"]) - .decode() - .strip() - ) + output = subprocess.check_output(["cargo", "metadata", "--no-deps", "-q"]).decode().strip() metadata = json.loads(output) packages = metadata["packages"] hq_version = [p["version"] for p in packages if p["name"] == "hyperqueue"][0] pyhq_version = [p["version"] for p in packages if p["name"] == "pyhq"][0] if hq_version != pyhq_version: - raise Exception( - f"Hyperqueue has a different version ({hq_version}) " - f"than its Python binding ({pyhq_version})" - ) + raise Exception(f"Hyperqueue has a different version ({hq_version}) than its Python binding ({pyhq_version})") diff --git a/scripts/get_docs_version.py b/scripts/get_docs_version.py index 67e121006..5516a23e3 100644 --- a/scripts/get_docs_version.py +++ b/scripts/get_docs_version.py @@ -10,9 +10,7 @@ def get_version(output: str): if not output: return latest_version() else: - tags = [ - t.strip() for t in output.splitlines(keepends=False) if t.startswith("v") - ] + tags = [t.strip() for t in output.splitlines(keepends=False) if t.startswith("v")] # Ignore pre-release versions tags = [tag for tag in tags if "-" not in tag] if not tags: @@ -27,8 +25,6 @@ def get_version(output: str): Calculates whether the current commit is a stable version (=there is some tag pointing to it) or an unstable one. """ - output = ( - subprocess.check_output(["git", "tag", "--points-at", "HEAD"]).decode().strip() - ) + output = subprocess.check_output(["git", "tag", "--points-at", "HEAD"]).decode().strip() version = get_version(output) print(json.dumps(version)) diff --git a/tests/autoalloc/conftest.py b/tests/autoalloc/conftest.py index 2aea6d370..9f6a7ae6a 100644 --- a/tests/autoalloc/conftest.py +++ b/tests/autoalloc/conftest.py @@ -33,9 +33,7 @@ def create_env_in_home(request): # We cannot create server directory in /tmp, because it wouldn't be available to compute nodes testname = request.node.name now = datetime.now() - directory = ( - Path.home() / ".hq-pytest" / f"{testname}-{now.strftime('%d-%m-%y-%H-%M-%S')}" - ) + directory = Path.home() / ".hq-pytest" / f"{testname}-{now.strftime('%d-%m-%y-%H-%M-%S')}" directory.mkdir(parents=True, exist_ok=False) with run_hq_env(directory) as env: try: @@ -53,13 +51,9 @@ def cluster_hq_env(request): def pbs_test(fn): signature = inspect.signature(fn) if "hq_env" in signature.parameters: - raise Exception( - "Do not use the `hq_env` fixture with `pbs_test`. Use `cluster_hq_env` instead." - ) + raise Exception("Do not use the `hq_env` fixture with `pbs_test`. Use `cluster_hq_env` instead.") - @pytest.mark.skipif( - not PBS_AVAILABLE, reason="This test can be run only if PBS is available" - ) + @pytest.mark.skipif(not PBS_AVAILABLE, reason="This test can be run only if PBS is available") @pytest.mark.pbs @functools.wraps(fn) def wrapped(*args, **kwargs): @@ -84,13 +78,9 @@ def slurm_credentials() -> str: def slurm_test(fn): signature = inspect.signature(fn) if "hq_env" in signature.parameters: - raise Exception( - "Do not use the `hq_env` fixture with `slurm_test`. Use `cluster_hq_env` instead." - ) + raise Exception("Do not use the `hq_env` fixture with `slurm_test`. Use `cluster_hq_env` instead.") - @pytest.mark.skipif( - not SLURM_AVAILABLE, reason="This test can be run only if SLURM is available" - ) + @pytest.mark.skipif(not SLURM_AVAILABLE, reason="This test can be run only if SLURM is available") @pytest.mark.slurm @functools.wraps(fn) def wrapped(*args, **kwargs): diff --git a/tests/autoalloc/mock/handler.py b/tests/autoalloc/mock/handler.py index 57c4b21a4..9baf541cd 100644 --- a/tests/autoalloc/mock/handler.py +++ b/tests/autoalloc/mock/handler.py @@ -23,9 +23,7 @@ def response_error(stdout="", stderr="", code=1): class CommandHandler(ABC): @abc.abstractmethod - async def handle_command( - self, request: Request, cmd: str - ) -> Optional[CommandResponse]: + async def handle_command(self, request: Request, cmd: str) -> Optional[CommandResponse]: return response_error() diff --git a/tests/autoalloc/mock/pbs.py b/tests/autoalloc/mock/pbs.py index 9e5c6ba8c..57ddf07a4 100644 --- a/tests/autoalloc/mock/pbs.py +++ b/tests/autoalloc/mock/pbs.py @@ -24,9 +24,7 @@ class PbsCommandAdapter(CommandHandler): def __init__(self, handler: Manager): self.handler = handler - async def handle_command( - self, request: Request, cmd: str - ) -> Optional[CommandResponse]: + async def handle_command(self, request: Request, cmd: str) -> Optional[CommandResponse]: input = await extract_mock_input(request) if cmd == "qsub": diff --git a/tests/autoalloc/mock/slurm.py b/tests/autoalloc/mock/slurm.py index cbb3313fe..ac7768058 100644 --- a/tests/autoalloc/mock/slurm.py +++ b/tests/autoalloc/mock/slurm.py @@ -19,9 +19,7 @@ class SlurmCommandAdapter(CommandHandler): def __init__(self, handler: Manager): self.handler = handler - async def handle_command( - self, request: Request, cmd: str - ) -> Optional[CommandResponse]: + async def handle_command(self, request: Request, cmd: str) -> Optional[CommandResponse]: input = await extract_mock_input(request) if cmd == "sbatch": diff --git a/tests/autoalloc/test_autoalloc.py b/tests/autoalloc/test_autoalloc.py index e005dc6af..06d25d314 100644 --- a/tests/autoalloc/test_autoalloc.py +++ b/tests/autoalloc/test_autoalloc.py @@ -1,9 +1,10 @@ import time from os.path import dirname, join from pathlib import Path -from typing import List +from typing import List, Literal import pytest +from inline_snapshot import snapshot from ..conftest import HqEnv, get_hq_binary from ..utils.wait import ( @@ -146,6 +147,18 @@ def test_autoalloc_require_timelimit(hq_env: HqEnv, spec: ManagerSpec): ) +@all_managers +def test_autoalloc_worker_time_limit_too_large(hq_env: HqEnv, spec: ManagerSpec): + hq_env.start_server() + add_queue( + hq_env, + manager=spec.manager_type(), + time_limit="1h", + worker_time_limit="2h", + expect_fail="Worker time limit cannot be larger than queue time limit", + ) + + def test_autoalloc_remove_queue(hq_env: HqEnv): hq_env.start_server() add_queue(hq_env, manager="pbs") @@ -190,9 +203,7 @@ def test_add_queue(hq_env: HqEnv, spec: ManagerSpec): def test_pbs_queue_qsub_args(hq_env: HqEnv): queue = ManagerQueue() - with MockJobManager( - hq_env, adapt_pbs(ExtractSubmitScriptPath(queue, PbsManager())) - ): + with MockJobManager(hq_env, adapt_pbs(ExtractSubmitScriptPath(queue, PbsManager()))): hq_env.start_server() prepare_tasks(hq_env) @@ -265,12 +276,18 @@ def test_submit_time_request_equal_to_time_limit(hq_env: HqEnv, spec: ManagerSpe wait_for_alloc(hq_env, "QUEUED", "1.job") +def normalize_output(hq_env: HqEnv, manager: Literal["pbs", "slurm"], output: str) -> str: + return ( + output.replace(get_hq_binary(), "") + .replace(hq_env.server_dir, "") + .replace(f'"{manager}"', '""') + ) + + def test_pbs_multinode_allocation(hq_env: HqEnv): queue = ManagerQueue() - with MockJobManager( - hq_env, adapt_pbs(ExtractSubmitScriptPath(queue, PbsManager())) - ): + with MockJobManager(hq_env, adapt_pbs(ExtractSubmitScriptPath(queue, PbsManager()))): hq_env.start_server() prepare_tasks(hq_env) @@ -278,12 +295,10 @@ def test_pbs_multinode_allocation(hq_env: HqEnv): qsub_script_path = queue.get() with open(qsub_script_path) as f: - commands = extract_script_commands(f.read()) - assert ( - commands - == f"""pbsdsh -- bash -l -c '{get_hq_binary()} worker start --idle-timeout "5m" \ ---time-limit "59m 50s" --manager "pbs" --server-dir "{hq_env.server_dir}/001" --on-server-lost \ -"finish-running"'""" + commands = normalize_output(hq_env, "pbs", extract_script_commands(f.read())) + assert commands == snapshot( + 'pbsdsh -- bash -l -c \' worker start --idle-timeout "5m" --manager "" --server-dir' + ' "/001" --on-server-lost "finish-running" --time-limit "1h"\'' ) @@ -298,12 +313,10 @@ def test_slurm_multinode_allocation(hq_env: HqEnv): add_queue(hq_env, manager="slurm", workers_per_alloc=2) sbatch_script_path = queue.get() with open(sbatch_script_path) as f: - commands = extract_script_commands(f.read()) - assert ( - commands - == f"""srun --overlap {get_hq_binary()} worker start --idle-timeout "5m" \ ---time-limit "59m 50s" --manager "slurm" --server-dir "{hq_env.server_dir}/001" --on-server-lost "finish-running" -""".rstrip() + commands = normalize_output(hq_env, "slurm", extract_script_commands(f.read())) + assert commands == snapshot( + 'srun --overlap worker start --idle-timeout "5m" --manager "" --server-dir' + ' "/001" --on-server-lost "finish-running" --time-limit "1h"' ) @@ -410,8 +423,10 @@ def test_fail_on_remove_queue_with_running_jobs(hq_env: HqEnv, spec: ManagerSpec remove_queue( hq_env, 1, - expect_fail="Allocation queue has running jobs, so it will not be removed. " - "Use `--force` if you want to remove the queue anyway", + expect_fail=( + "Allocation queue has running jobs, so it will not be removed. " + "Use `--force` if you want to remove the queue anyway" + ), ) wait_for_alloc(hq_env, "RUNNING", job_id) @@ -502,9 +517,7 @@ def dry_run_cmd(spec: ManagerSpec) -> List[str]: return ["alloc", "dry-run", spec.manager_type(), "--time-limit", "1h"] -@pytest.mark.skipif( - PBS_AVAILABLE, reason="This test will not work properly if `qsub` is available" -) +@pytest.mark.skipif(PBS_AVAILABLE, reason="This test will not work properly if `qsub` is available") def test_pbs_dry_run_missing_qsub(hq_env: HqEnv): hq_env.start_server() hq_env.command( @@ -584,11 +597,7 @@ def get_exec_script(script_path: str): """ with open(script_path) as f: data = f.read() - return [ - line - for line in data.splitlines(keepends=False) - if line and not line.startswith("#") - ][0] + return [line for line in data.splitlines(keepends=False) if line and not line.startswith("#")][0] def get_worker_args(script_path: str): @@ -625,31 +634,11 @@ def test_pass_cpu_and_resources_to_worker(hq_env: HqEnv, spec: ManagerSpec): ) script = queue.get() - assert get_worker_args(script) == [ - "worker", - "start", - "--idle-timeout", - '"5m"', - "--time-limit", - '"59m', - '50s"', - "--manager", - f'"{spec.manager_type()}"', - "--server-dir", - f'"{hq_env.server_dir}/001"', - "--cpus", - '"2x8"', - "--resource", - '"x=sum(100)"', - "--resource", - '"y=range(1-4)"', - "--resource", - '"z=[1,2,4]"', - "--no-hyper-threading", - "--no-detect-resources", - "--on-server-lost", - '"finish-running"', - ] + assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(script))) == snapshot( + 'worker start --idle-timeout "5m" --manager "" --server-dir "/001" --cpus "2x8"' + ' --resource "x=sum(100)" --resource "y=range(1-4)" --resource "z=[1,2,4]" --no-hyper-threading' + ' --no-detect-resources --on-server-lost "finish-running" --time-limit "1h"' + ) @all_managers @@ -671,21 +660,10 @@ def test_pass_idle_timeout_to_worker(hq_env: HqEnv, spec: ManagerSpec): ) script_path = queue.get() - assert get_worker_args(script_path) == [ - "worker", - "start", - "--idle-timeout", - '"30m"', - "--time-limit", - '"59m', - '50s"', - "--manager", - f'"{spec.manager_type()}"', - "--server-dir", - f'"{hq_env.server_dir}/001"', - "--on-server-lost", - '"finish-running"', - ] + assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(script_path))) == snapshot( + 'worker start --idle-timeout "30m" --manager "" --server-dir "/001" --on-server-lost' + ' "finish-running" --time-limit "1h"' + ) @all_managers @@ -703,21 +681,27 @@ def test_pass_on_server_lost(hq_env: HqEnv, spec: ManagerSpec): additional_worker_args=["--on-server-lost=stop"], ) qsub_script_path = queue.get() - assert get_worker_args(qsub_script_path) == [ - "worker", - "start", - "--idle-timeout", - '"5m"', - "--time-limit", - '"59m', - '50s"', - "--manager", - f'"{spec.manager_type()}"', - "--server-dir", - f'"{hq_env.server_dir}/001"', - "--on-server-lost", - '"stop"', - ] + assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(qsub_script_path))) == snapshot( + 'worker start --idle-timeout "5m" --manager "" --server-dir "/001" --on-server-lost' + ' "stop" --time-limit "1h"' + ) + + +@all_managers +def test_pass_worker_time_limit(hq_env: HqEnv, spec: ManagerSpec): + queue = ManagerQueue() + manager = ExtractSubmitScriptPath(queue, PbsManager()) + + with MockJobManager(hq_env, spec.adapt(manager)): + hq_env.start_server() + prepare_tasks(hq_env) + + add_queue(hq_env, manager=spec.manager_type(), worker_time_limit="30m") + qsub_script_path = queue.get() + assert normalize_output(hq_env, spec.manager_type(), " ".join(get_worker_args(qsub_script_path))) == snapshot( + 'worker start --idle-timeout "5m" --manager "" --server-dir "/001" --on-server-lost' + ' "finish-running" --time-limit "30m"' + ) @all_managers @@ -737,11 +721,9 @@ def test_start_stop_cmd(hq_env: HqEnv, spec: ManagerSpec): ) script = queue.get() - assert ( - get_exec_script(script) - == f"""init.sh && {get_hq_binary()} worker start --idle-timeout "5m" \ ---time-limit "59m 50s" --manager "{spec.manager_type()}" --server-dir "{hq_env.server_dir}/001" \ ---on-server-lost "finish-running"; unload.sh""" + assert normalize_output(hq_env, spec.manager_type(), get_exec_script(script)) == snapshot( + 'init.sh && worker start --idle-timeout "5m" --manager "" --server-dir' + ' "/001" --on-server-lost "finish-running" --time-limit "1h"; unload.sh' ) @@ -793,9 +775,7 @@ def test_external_pbs_submit_single_worker(cluster_hq_env: HqEnv, pbs_credential @pbs_test -def test_external_pbs_submit_multiple_workers( - cluster_hq_env: HqEnv, pbs_credentials: str -): +def test_external_pbs_submit_multiple_workers(cluster_hq_env: HqEnv, pbs_credentials: str): cluster_hq_env.start_server() prepare_tasks(cluster_hq_env, count=100) @@ -812,9 +792,7 @@ def test_external_pbs_submit_multiple_workers( @slurm_test -def test_external_slurm_submit_single_worker( - cluster_hq_env: HqEnv, slurm_credentials: str -): +def test_external_slurm_submit_single_worker(cluster_hq_env: HqEnv, slurm_credentials: str): cluster_hq_env.start_server() prepare_tasks(cluster_hq_env, count=10) @@ -830,9 +808,7 @@ def test_external_slurm_submit_single_worker( @slurm_test -def test_external_slurm_submit_multiple_workers( - cluster_hq_env: HqEnv, slurm_credentials: str -): +def test_external_slurm_submit_multiple_workers(cluster_hq_env: HqEnv, slurm_credentials: str): cluster_hq_env.start_server() prepare_tasks(cluster_hq_env, count=100) @@ -848,9 +824,7 @@ def test_external_slurm_submit_multiple_workers( wait_for_job_state(cluster_hq_env, 1, "FINISHED") -def wait_for_alloc( - hq_env: HqEnv, state: str, allocation_id: str, timeout=DEFAULT_TIMEOUT -): +def wait_for_alloc(hq_env: HqEnv, state: str, allocation_id: str, timeout=DEFAULT_TIMEOUT): """ Wait until an allocation has the given `state`. Assumes a single allocation queue. @@ -878,9 +852,7 @@ def wait(): raise e -def start_server_with_quick_refresh( - hq_env: HqEnv, autoalloc_refresh_ms=100, autoalloc_status_check_ms=100 -): +def start_server_with_quick_refresh(hq_env: HqEnv, autoalloc_refresh_ms=100, autoalloc_status_check_ms=100): hq_env.start_server( env={ "HQ_AUTOALLOC_REFRESH_INTERVAL_MS": str(autoalloc_refresh_ms), diff --git a/tests/autoalloc/utils.py b/tests/autoalloc/utils.py index a70b9b1af..e70cbc6a3 100644 --- a/tests/autoalloc/utils.py +++ b/tests/autoalloc/utils.py @@ -7,11 +7,7 @@ def extract_script_args(script: str, prefix: str) -> List[str]: - return [ - line[len(prefix) :].strip() - for line in script.splitlines(keepends=False) - if line.startswith(prefix) - ] + return [line[len(prefix) :].strip() for line in script.splitlines(keepends=False) if line.startswith(prefix)] def extract_script_commands(script: str) -> str: @@ -19,9 +15,7 @@ def extract_script_commands(script: str) -> str: Returns all non-empty lines as text from `script` that do not start with a bash comment (`#`). """ return "\n".join( - line.strip() - for line in script.splitlines(keepends=False) - if not line.startswith("#") and line.strip() + line.strip() for line in script.splitlines(keepends=False) if not line.startswith("#") and line.strip() ) @@ -37,6 +31,7 @@ def add_queue( additional_worker_args: List[str] = None, additional_args=None, time_limit="1h", + worker_time_limit: Optional[str] = None, dry_run=False, start_cmd: Optional[str] = None, stop_cmd: Optional[str] = None, @@ -57,6 +52,8 @@ def add_queue( ) if time_limit is not None: args.extend(["--time-limit", time_limit]) + if worker_time_limit is not None: + args.extend(["--worker-time-limit", worker_time_limit]) if start_cmd is not None: args.extend(["--worker-start-cmd", start_cmd]) if stop_cmd is not None: diff --git a/tests/conftest.py b/tests/conftest.py index 4bde85ddd..1af5de375 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,9 +59,7 @@ def is_process_alive(): elif expected_code is not None: assert process.returncode == expected_code - self.processes = [ - (n, p) for (n, p) in self.processes if p is not process - ] + self.processes = [(n, p) for (n, p) in self.processes if p is not process] return False raise Exception(f"Process with pid {process.pid} not found") @@ -71,11 +69,7 @@ def check_running_processes(self): """Checks that everything is still running""" for name, process in self.processes: if process.poll() is not None: - raise Exception( - "Process {0} crashed (log in {1}/{0}.out)".format( - name, self.work_path - ) - ) + raise Exception("Process {0} crashed (log in {1}/{0}.out)".format(name, self.work_path)) def kill_all(self): self.sort_processes_for_kill() @@ -84,9 +78,7 @@ def kill_all(self): if not process.poll(): os.killpg(os.getpgid(process.pid), signal.SIGTERM) - def get_processes_by_name( - self, name: str - ) -> Iterable[Tuple[int, subprocess.Popen]]: + def get_processes_by_name(self, name: str) -> Iterable[Tuple[int, subprocess.Popen]]: for i, (n, p) in enumerate(self.processes): if n == name: yield (i, p) @@ -143,9 +135,7 @@ def server_args(server_dir="hq-server", debug=True): args += ["server", "start"] return args - def start_server( - self, server_dir="hq-server", args=None, env=None - ) -> subprocess.Popen: + def start_server(self, server_dir="hq-server", args=None, env=None) -> subprocess.Popen: self.server_dir = os.path.join(self.work_path, server_dir) environment = self.make_default_env() if env: @@ -290,15 +280,11 @@ def command( if process.returncode != 0: if expect_fail: if expect_fail not in stdout: - raise Exception( - f"Command should failed with message '{expect_fail}' but got:\n{stdout}" - ) + raise Exception(f"Command should failed with message '{expect_fail}' but got:\n{stdout}") else: return print(f"Process output: {stdout}") - raise Exception( - f"Process failed with exit-code {process.returncode}\n\n{stdout}" - ) + raise Exception(f"Process failed with exit-code {process.returncode}\n\n{stdout}") if expect_fail is not None: raise Exception("Command should failed") if as_table: diff --git a/tests/job/test_job_forget.py b/tests/job/test_job_forget.py index f631f0bab..3d4e7e07d 100644 --- a/tests/job/test_job_forget.py +++ b/tests/job/test_job_forget.py @@ -61,9 +61,7 @@ def test_forget_multiple_jobs(hq_env: HqEnv): hq_env.command(["submit", "/non-existing"]) wait_for_job_state(hq_env, 7, "FAILED") - forget_jobs( - hq_env, "all", forgotten=6, ignored=1, statutes=["finished", "canceled"] - ) + forget_jobs(hq_env, "all", forgotten=6, ignored=1, statutes=["finished", "canceled"]) wait_for_job_list_count(hq_env, 1) diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 0dd3c17dd..de4a3771f 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -246,9 +246,7 @@ def test_print_task_placeholders(hq_env: HqEnv): output = parse_json_output(hq_env, ["--output-mode=json", "job", "info", "1"]) - schema = Schema( - [{"id": id, "state": "finished"} for id in range(1, 5)], ignore_extra_keys=True - ) + schema = Schema([{"id": id, "state": "finished"} for id in range(1, 5)], ignore_extra_keys=True) schema.validate(output[0]["tasks"]) tasks = sorted(output[0]["tasks"], key=lambda t: t["id"]) diff --git a/tests/pyapi/__init__.py b/tests/pyapi/__init__.py index ddc10750c..3761bd4c1 100644 --- a/tests/pyapi/__init__.py +++ b/tests/pyapi/__init__.py @@ -9,9 +9,7 @@ from ..utils.mock import ProgramMock -def prepare_job_client( - hq_env: HqEnv, with_worker=True, **job_args -) -> Tuple[Job, Client]: +def prepare_job_client(hq_env: HqEnv, with_worker=True, **job_args) -> Tuple[Job, Client]: hq_env.start_server() if with_worker: hq_env.start_worker() diff --git a/tests/pyapi/test_function.py b/tests/pyapi/test_function.py index 58ff3ac32..68d940515 100644 --- a/tests/pyapi/test_function.py +++ b/tests/pyapi/test_function.py @@ -33,9 +33,7 @@ def test_submit_python_prologue(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker() - client = Client( - hq_env.server_dir, python_env=PythonEnv(prologue=f"source {init_sh.resolve()}") - ) + client = Client(hq_env.server_dir, python_env=PythonEnv(prologue=f"source {init_sh.resolve()}")) def body(): print(os.environ.get("ABC")) @@ -58,9 +56,7 @@ def body(): client.wait_for_jobs([job_id], raise_on_error=False) errors = client.get_failed_tasks(job_id) assert list(errors.keys()) == [0] - assert errors[0].error.endswith( - ' raise Exception("MyException")\nException: MyException\n' - ) + assert errors[0].error.endswith(' raise Exception("MyException")\nException: MyException\n') assert errors[0].stderr == os.path.abspath("err") diff --git a/tests/pyapi/test_job.py b/tests/pyapi/test_job.py index 607a7b997..1028864b3 100644 --- a/tests/pyapi/test_job.py +++ b/tests/pyapi/test_job.py @@ -19,9 +19,7 @@ def test_submit_empty_job(hq_env: HqEnv): (job, client) = prepare_job_client(hq_env) - with pytest.raises( - Exception, match="Submitted job must have at least a single task" - ): + with pytest.raises(Exception, match="Submitted job must have at least a single task"): client.submit(job) @@ -138,12 +136,8 @@ def test_job_generic_resources(hq_env: HqEnv): "fairy=sum(1000)", ] ) - t1 = job.program( - args=bash("echo Hello"), resources=ResourceRequest(resources={"gpus": 1}) - ) - job.program( - args=bash("echo Hello"), resources=ResourceRequest(resources={"gpus": 4}) - ) + t1 = job.program(args=bash("echo Hello"), resources=ResourceRequest(resources={"gpus": 1})) + job.program(args=bash("echo Hello"), resources=ResourceRequest(resources={"gpus": 4})) job.program( args=bash("echo Hello"), resources=ResourceRequest(resources={"gpus": 2}), @@ -181,9 +175,7 @@ def test_task_priorities(hq_env: HqEnv): job.function(lambda: 0, priority=p) job_id = client.submit(job) client.wait_for_jobs([job_id]) - data = hq_env.command(["--output-mode=json", "task", "list", "1"], as_json=True)[ - "1" - ] + data = hq_env.command(["--output-mode=json", "task", "list", "1"], as_json=True)["1"] starts1 = [(props["id"], iso8601.parse_date(props["started_at"])) for props in data] starts2 = starts1[:] @@ -212,9 +204,7 @@ def test_resource_uniqueness_priorities(hq_env: HqEnv): job_id = client.submit(job) client.wait_for_jobs([job_id]) - data = hq_env.command(["--output-mode=json", "task", "list", "1"], as_json=True)[ - "1" - ] + data = hq_env.command(["--output-mode=json", "task", "list", "1"], as_json=True)["1"] print(data) tasks_on_worker_2 = [t["id"] for t in data if t["worker"] == 1] print(tasks_on_worker_2) diff --git a/tests/requirements.txt b/tests/requirements.txt index 119441dc2..d506352e3 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -10,3 +10,4 @@ psutil==5.8.0 jinja2==3.0.3 requests==2.31.0 aiohttp==3.8.3 +inline-snapshot==0.2.0 diff --git a/tests/test_array.py b/tests/test_array.py index a945a7495..9d2ec52a3 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -13,15 +13,11 @@ def test_job_array_submit(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker(cpus=4) - hq_env.command( - ["submit", "--array=30-36", "--", "bash", "-c", "echo $HQ_JOB_ID-$HQ_TASK_ID"] - ) + hq_env.command(["submit", "--array=30-36", "--", "bash", "-c", "echo $HQ_JOB_ID-$HQ_TASK_ID"]) wait_for_job_state(hq_env, 1, "FINISHED") for i in list(range(0, 30)) + list(range(37, 40)): - assert not os.path.isfile( - os.path.join(hq_env.work_path, default_task_output(job_id=1, task_id=i)) - ) + assert not os.path.isfile(os.path.join(hq_env.work_path, default_task_output(job_id=1, task_id=i))) assert not os.path.isfile( os.path.join( hq_env.work_path, @@ -30,9 +26,7 @@ def test_job_array_submit(hq_env: HqEnv): ) for i in range(36, 37): - stdout = os.path.join( - hq_env.work_path, default_task_output(job_id=1, task_id=i) - ) + stdout = os.path.join(hq_env.work_path, default_task_output(job_id=1, task_id=i)) assert os.path.isfile(stdout) assert os.path.isfile( os.path.join( @@ -94,22 +88,13 @@ def test_job_array_error_some(hq_env: HqEnv): assert table.header == ["Task ID", "Worker", "Error"] assert table.get_column_value("Task ID")[0] == "2" - assert ( - table.get_column_value("Error")[0] - == "Error: Program terminated with exit code 1" - ) + assert table.get_column_value("Error")[0] == "Error: Program terminated with exit code 1" assert table.get_column_value("Task ID")[1] == "3" - assert ( - table.get_column_value("Error")[1] - == "Error: Program terminated with exit code 1" - ) + assert table.get_column_value("Error")[1] == "Error: Program terminated with exit code 1" assert table.get_column_value("Task ID")[2] == "7" - assert ( - table.get_column_value("Error")[2] - == "Error: Program terminated with exit code 1" - ) + assert table.get_column_value("Error")[2] == "Error: Program terminated with exit code 1" table = hq_env.command(["task", "list", "1"], as_table=True) for i, state in enumerate( @@ -229,9 +214,7 @@ def test_array_times(hq_env: HqEnv): hq_env.command(["submit", "--array=1-3", "sleep", "1"]) wait_for_job_state(hq_env, 1, "FINISHED") - time.sleep( - 1.2 - ) # This sleep is not redundant, we check that after finished time is not moving + time.sleep(1.2) # This sleep is not redundant, we check that after finished time is not moving for i in range(1, 4): table = hq_env.command(["task", "info", "1", str(i)], as_table=True) diff --git a/tests/test_cpus.py b/tests/test_cpus.py index ea8d8f72f..57791f9f1 100644 --- a/tests/test_cpus.py +++ b/tests/test_cpus.py @@ -33,17 +33,11 @@ def test_job_num_of_cpus(hq_env: HqEnv): "echo $HQ_CPUS", ] ) - hq_env.command( - ["submit", "--cpus", "5 scatter", "--", "bash", "-c", "echo $HQ_CPUS"] - ) + hq_env.command(["submit", "--cpus", "5 scatter", "--", "bash", "-c", "echo $HQ_CPUS"]) - hq_env.command( - ["submit", "--cpus", "4 compact!", "--", "bash", "-c", "echo $HQ_CPUS"] - ) + hq_env.command(["submit", "--cpus", "4 compact!", "--", "bash", "-c", "echo $HQ_CPUS"]) - hq_env.command( - ["submit", "--cpus", "5 compact!", "--", "bash", "-c", "echo $HQ_CPUS"] - ) + hq_env.command(["submit", "--cpus", "5 compact!", "--", "bash", "-c", "echo $HQ_CPUS"]) hq_env.command(["submit", "--cpus", "all", "--", "bash", "-c", "echo $HQ_CPUS"]) @@ -185,9 +179,7 @@ def test_job_no_pin(hq_env: HqEnv): def test_job_show_pin_in_table(hq_env: HqEnv): hq_env.start_server() - hq_env.command( - ["submit", "--pin", "taskset", "--cpus", "2 compact!", "--", "hostname"] - ) + hq_env.command(["submit", "--pin", "taskset", "--cpus", "2 compact!", "--", "hostname"]) hq_env.start_worker(cpus=2) wait_for_job_state(hq_env, 1, "FINISHED") diff --git a/tests/test_events.py b/tests/test_events.py index df23d5762..c8291957d 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -76,9 +76,7 @@ def body(): print("BUS1, 10.0 %, 100 MiB, 200 MiB") """, ): - hq_env.start_worker( - args=["--overview-interval", "10ms", "--resource", "gpus/nvidia=[0]"] - ) + hq_env.start_worker(args=["--overview-interval", "10ms", "--resource", "gpus/nvidia=[0]"]) wait_for_worker_state(hq_env, 1, "RUNNING") time.sleep(0.2) hq_env.command(["worker", "stop", "1"]) @@ -122,9 +120,7 @@ def body(): print(json.dumps(data)) """, ): - hq_env.start_worker( - args=["--overview-interval", "10ms", "--resource", "gpus/amd=[0]"] - ) + hq_env.start_worker(args=["--overview-interval", "10ms", "--resource", "gpus/amd=[0]"]) wait_for_worker_state(hq_env, 1, "RUNNING") time.sleep(0.2) hq_env.command(["worker", "stop", "1"]) diff --git a/tests/test_job.py b/tests/test_job.py index e7fb10c10..741d43eab 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -88,9 +88,7 @@ def test_job_custom_name(hq_env: HqEnv): table = list_jobs(hq_env) assert len(table) == 1 - table.check_columns_value( - ["ID", "Name", "State"], 0, ["1", "sleep_prog", "WAITING"] - ) + table.check_columns_value(["ID", "Name", "State"], 0, ["1", "sleep_prog", "WAITING"]) with pytest.raises(Exception): hq_env.command(["submit", "--name=second_sleep \n", "sleep", "1"]) @@ -153,29 +151,17 @@ def test_job_output_default(hq_env: HqEnv, tmp_path): wait_for_job_state(hq_env, [1, 2, 3], ["FINISHED", "FAILED"]) - check_file_contents( - os.path.join(tmp_path, default_task_output(job_id=1, type="stdout")), "hello\n" - ) - check_file_contents( - os.path.join(tmp_path, default_task_output(job_id=1, type="stderr")), "" - ) - check_file_contents( - os.path.join(tmp_path, default_task_output(job_id=2, type="stdout")), "" - ) + check_file_contents(os.path.join(tmp_path, default_task_output(job_id=1, type="stdout")), "hello\n") + check_file_contents(os.path.join(tmp_path, default_task_output(job_id=1, type="stderr")), "") + check_file_contents(os.path.join(tmp_path, default_task_output(job_id=2, type="stdout")), "") - with open( - os.path.join(tmp_path, default_task_output(job_id=2, type="stderr")) - ) as f: + with open(os.path.join(tmp_path, default_task_output(job_id=2, type="stderr"))) as f: data = f.read() assert "No such file or directory" in data assert data.startswith("ls:") - check_file_contents( - os.path.join(tmp_path, default_task_output(job_id=3, type="stdout")), "" - ) - check_file_contents( - os.path.join(tmp_path, default_task_output(job_id=3, type="stderr")), "" - ) + check_file_contents(os.path.join(tmp_path, default_task_output(job_id=3, type="stdout")), "") + check_file_contents(os.path.join(tmp_path, default_task_output(job_id=3, type="stderr")), "") def test_job_create_output_folders(hq_env: HqEnv): @@ -203,9 +189,7 @@ def test_job_create_output_folders(hq_env: HqEnv): def test_job_output_configured(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker(cpus=1) - hq_env.command( - ["submit", "--stdout=abc", "--stderr=xyz", "--", "bash", "-c", "echo 'hello'"] - ) + hq_env.command(["submit", "--stdout=abc", "--stderr=xyz", "--", "bash", "-c", "echo 'hello'"]) wait_for_job_state(hq_env, 1, "FINISHED") check_file_contents(os.path.join(tmp_path, "abc"), "hello\n") @@ -235,18 +219,12 @@ def test_job_output_absolute_path(hq_env: HqEnv, tmp_path): def test_job_output_none(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker(cpus=1) - hq_env.command( - ["submit", "--stdout=none", "--stderr=none", "--", "bash", "-c", "echo 'hello'"] - ) + hq_env.command(["submit", "--stdout=none", "--stderr=none", "--", "bash", "-c", "echo 'hello'"]) wait_for_job_state(hq_env, 1, "FINISHED") assert not os.path.exists(os.path.join(tmp_path, "none")) - assert not os.path.exists( - os.path.join(tmp_path, default_task_output(job_id=1, task_id=0, type="stdout")) - ) - assert not os.path.exists( - os.path.join(tmp_path, default_task_output(job_id=1, task_id=0, type="stderr")) - ) + assert not os.path.exists(os.path.join(tmp_path, default_task_output(job_id=1, task_id=0, type="stdout"))) + assert not os.path.exists(os.path.join(tmp_path, default_task_output(job_id=1, task_id=0, type="stderr"))) def test_job_filters(hq_env: HqEnv): @@ -270,14 +248,10 @@ def test_job_filters(hq_env: HqEnv): table.check_column_value("State", 2, "WAITING") assert len(table) == 3 - table_canceled = hq_env.command( - ["job", "list", "--filter", "canceled"], as_table=True - ) + table_canceled = hq_env.command(["job", "list", "--filter", "canceled"], as_table=True) assert len(table_canceled) == 1 - table_waiting = hq_env.command( - ["job", "list", "--filter", "waiting"], as_table=True - ) + table_waiting = hq_env.command(["job", "list", "--filter", "waiting"], as_table=True) assert len(table_waiting) == 2 hq_env.start_worker(cpus=1) @@ -285,22 +259,16 @@ def test_job_filters(hq_env: HqEnv): wait_for_job_state(hq_env, 4, "RUNNING") - table_running = hq_env.command( - ["job", "list", "--filter", "running"], as_table=True - ) + table_running = hq_env.command(["job", "list", "--filter", "running"], as_table=True) assert len(table_running) == 1 - table_finished = hq_env.command( - ["job", "list", "--filter", "finished"], as_table=True - ) + table_finished = hq_env.command(["job", "list", "--filter", "finished"], as_table=True) assert len(table_finished) == 1 table_failed = hq_env.command(["job", "list", "--filter", "failed"], as_table=True) assert len(table_failed) == 1 - table_failed = hq_env.command( - ["job", "list", "--filter", "failed,finished"], as_table=True - ) + table_failed = hq_env.command(["job", "list", "--filter", "failed,finished"], as_table=True) assert len(table_failed) == 2 @@ -486,8 +454,7 @@ def test_cancel_send_sigint(hq_env: HqEnv): [ "submit", "--", - *python( - """ + *python(""" import sys import time import signal @@ -500,8 +467,7 @@ def signal_handler(sig, frame): print("ready", flush=True) time.sleep(3600) -""" - ), +"""), ] ) wait_for_job_state(hq_env, 1, "RUNNING") @@ -510,10 +476,7 @@ def signal_handler(sig, frame): hq_env.command(["job", "cancel", "1"]) wait_for_job_state(hq_env, 1, "CANCELED") - wait_until( - lambda: read_file(default_task_output()).splitlines(keepends=False)[1] - == "sigint" - ) + wait_until(lambda: read_file(default_task_output()).splitlines(keepends=False)[1] == "sigint") def test_cancel_kill_if_sigint_fails(hq_env: HqEnv): @@ -524,8 +487,7 @@ def test_cancel_kill_if_sigint_fails(hq_env: HqEnv): [ "submit", "--", - *python( - """ + *python(""" import os import sys import time @@ -539,8 +501,7 @@ def signal_handler(sig, frame): print("ready", flush=True) time.sleep(3600) -""" - ), +"""), ] ) wait_for_job_state(hq_env, 1, "RUNNING") @@ -612,9 +573,7 @@ def test_set_env(hq_env: HqEnv): ) wait_for_job_state(hq_env, 1, "FINISHED") - check_file_contents( - os.path.join(hq_env.work_path, default_task_output()), "BAR BAR2\n" - ) + check_file_contents(os.path.join(hq_env.work_path, default_task_output()), "BAR BAR2\n") table = hq_env.command(["job", "info", "1"], as_table=True) table.check_row_value("Environment", "FOO=BAR\nFOO2=BAR2") @@ -773,9 +732,7 @@ def test_job_resubmit_empty(hq_env: HqEnv): def test_job_resubmit_with_log(hq_env: HqEnv): hq_env.start_server() - hq_env.command( - ["submit", "--array=1-10", "--log", "foo.bin", "--", "/bin/nonexisting"] - ) + hq_env.command(["submit", "--array=1-10", "--log", "foo.bin", "--", "/bin/nonexisting"]) hq_env.start_workers(1) wait_for_job_state(hq_env, 1, "FAILED") @@ -875,12 +832,8 @@ def test_job_tasks_makespan(hq_env: HqEnv): hq_env.command(["job", "cancel", "1"]) wait_for_job_state(hq_env, 1, "CANCELED") - times_1 = hq_env.command(["task", "list", "1"], as_table=True).get_column_value( - "Makespan" - ) - times_2 = hq_env.command(["task", "list", "1"], as_table=True).get_column_value( - "Makespan" - ) + times_1 = hq_env.command(["task", "list", "1"], as_table=True).get_column_value("Makespan") + times_2 = hq_env.command(["task", "list", "1"], as_table=True).get_column_value("Makespan") assert times_1 == times_2 @@ -939,9 +892,7 @@ def test_job_completion_time(hq_env: HqEnv): assert not table.get_row_value("Makespan").startswith("1s") wait_for_job_state(hq_env, 1, "FINISHED") - time.sleep( - 1.2 - ) # This sleep is not redundant, we check that after finished time is not moving + time.sleep(1.2) # This sleep is not redundant, we check that after finished time is not moving table = hq_env.command(["job", "info", "1"], as_table=True) assert table.get_row_value("Makespan").startswith("1s") @@ -1060,25 +1011,18 @@ def test_job_shell_script_fail_not_executable(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FAILED") table = hq_env.command(["task", "list", "1", "-v"], as_table=True) - assert ( - """Error: Cannot execute "./test.sh": Permission denied (os error 13) + assert """Error: Cannot execute "./test.sh": Permission denied (os error 13) The script that you have tried to execute (`./test.sh`) is not executable. -Try making it executable or add a shebang line to it.""" - == table.get_column_value("Error")[0] - ) +Try making it executable or add a shebang line to it.""" == table.get_column_value("Error")[0] def test_job_shell_script_read_interpreter(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker() - Path("test.sh").write_text( - """#!/bin/bash + Path("test.sh").write_text("""#!/bin/bash echo 'Hello' > out.txt -""" - ) - for job_id, path in enumerate( - ("test.sh", "./test.sh", os.path.realpath("test.sh")) - ): +""") + for job_id, path in enumerate(("test.sh", "./test.sh", os.path.realpath("test.sh"))): hq_env.command(["submit", path]) wait_for_job_state(hq_env, job_id + 1, "FINISHED") @@ -1172,9 +1116,7 @@ def test_job_cat_header(hq_env: HqEnv): output = hq_env.command(["job", "cat", "1", "stdout", "--print-task-header"]) print(output) - assert ( - output - == """ + assert output == """ # Task 1 1 out1 @@ -1188,12 +1130,9 @@ def test_job_cat_header(hq_env: HqEnv): out1 out2 """.lstrip() - ) output = hq_env.command(["job", "cat", "1", "stderr", "--print-task-header"]) - assert ( - output - == """ + assert output == """ # Task 1 1 err1 @@ -1204,7 +1143,6 @@ def test_job_cat_header(hq_env: HqEnv): 3 err1 """.lstrip() - ) def test_job_cat_status(hq_env: HqEnv): @@ -1230,12 +1168,8 @@ def test_job_cat_status(hq_env: HqEnv): ) wait_for_job_state(hq_env, 1, "FAILED") - output = hq_env.command( - ["job", "cat", "--task-status=finished", "1", "stdout", "--print-task-header"] - ) - assert ( - output - == """ + output = hq_env.command(["job", "cat", "--task-status=finished", "1", "stdout", "--print-task-header"]) + assert output == """ # Task 3 3 out @@ -1246,14 +1180,9 @@ def test_job_cat_status(hq_env: HqEnv): 9 out """.lstrip() - ) - output = hq_env.command( - ["job", "cat", "--task-status=failed", "--tasks", "3-7", "1", "stdout"] - ) - assert ( - output - == """ + output = hq_env.command(["job", "cat", "--task-status=failed", "--tasks", "3-7", "1", "stdout"]) + assert output == """ 4 out 5 @@ -1261,16 +1190,10 @@ def test_job_cat_status(hq_env: HqEnv): 6 out """.lstrip() - ) - output_selected = hq_env.command( - ["job", "cat", "--task-status", "finished,failed", "1", "stdout"] - ) + output_selected = hq_env.command(["job", "cat", "--task-status", "finished,failed", "1", "stdout"]) output_default = hq_env.command(["job", "cat", "1", "stdout"]) - assert ( - output_selected - == output_default - == """ + assert output_selected == output_default == """ 3 out 4 @@ -1286,7 +1209,6 @@ def test_job_cat_status(hq_env: HqEnv): 9 out """.lstrip() - ) def test_job_cat_last(hq_env: HqEnv): @@ -1321,9 +1243,7 @@ def test_submit_task_dir(hq_env: HqEnv, mode): "--", "bash", "-c", - "echo $HQ_TASK_DIR; touch $HQ_TASK_DIR/xyz; sleep 2; {}".format( - "exit 1" if mode == "FAILED" else "" - ), + "echo $HQ_TASK_DIR; touch $HQ_TASK_DIR/xyz; sleep 2; {}".format("exit 1" if mode == "FAILED" else ""), ] ) wait_for_job_state(hq_env, 1, "RUNNING") @@ -1377,16 +1297,13 @@ def test_long_custom_error_message(hq_env: HqEnv): "--", "python", "-c", - "import os; f = open(os.environ['HQ_ERROR_FILENAME'], 'w');" - "f.write('a' * 10_000); f.flush(); sys.exit(1)", + "import os; f = open(os.environ['HQ_ERROR_FILENAME'], 'w');f.write('a' * 10_000); f.flush(); sys.exit(1)", ] ) wait_for_job_state(hq_env, 1, "FAILED") table = hq_env.command(["task", "list", "1", "-v"], as_table=True) - assert table.get_column_value("Error")[0].endswith( - "aaaaaa\n[The message was truncated]" - ) + assert table.get_column_value("Error")[0].endswith("aaaaaa\n[The message was truncated]") def test_zero_custom_error_message(hq_env: HqEnv): @@ -1400,16 +1317,13 @@ def test_zero_custom_error_message(hq_env: HqEnv): "--", "python", "-c", - "import os; f = open(os.environ['HQ_ERROR_FILENAME'], 'w');" "sys.exit(1)", + "import os; f = open(os.environ['HQ_ERROR_FILENAME'], 'w');sys.exit(1)", ] ) wait_for_job_state(hq_env, 1, "FAILED") table = hq_env.command(["task", "list", "1", "-v"], as_table=True) - assert ( - table.get_column_value("Error")[0] - == "Error: Task created an error file, but it is empty" - ) + assert table.get_column_value("Error")[0] == "Error: Task created an error file, but it is empty" # print(table) @@ -1462,15 +1376,13 @@ def test_kill_task_when_worker_dies(hq_env: HqEnv): [ "submit", "--", - *python( - """ + *python(""" import os import time print(os.getpid(), flush=True) time.sleep(3600) -""" - ), +"""), ] ) wait_for_job_state(hq_env, 1, "RUNNING") @@ -1519,9 +1431,7 @@ def stop_worker(worker_process): check_child_process_exited(hq_env, stop_worker) -def check_child_process_exited( - hq_env: HqEnv, stop_fn: Callable[[subprocess.Popen], None] -): +def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Popen], None]): """ Creates a task that spawns a child, and then calls `stop_fn`, which should kill either the task or the worker. The function then checks that both the task process and its child have been killed. @@ -1533,8 +1443,7 @@ def check_child_process_exited( [ "submit", "--", - *python( - """ + *python(""" import os import sys import time @@ -1543,8 +1452,7 @@ def check_child_process_exited( if pid > 0: print(pid, flush=True) time.sleep(3600) -""" - ), +"""), ] ) wait_for_job_state(hq_env, 1, "RUNNING") diff --git a/tests/test_job_mn.py b/tests/test_job_mn.py index 4dc306ccb..278d6548d 100644 --- a/tests/test_job_mn.py +++ b/tests/test_job_mn.py @@ -11,9 +11,7 @@ def test_submit_mn(hq_env: HqEnv): hq_env.start_server() hq_env.start_workers(2) - hq_env.command( - ["submit", "--nodes=3", "--", "bash", "-c", "sleep 1; cat ${HQ_NODE_FILE}"] - ) + hq_env.command(["submit", "--nodes=3", "--", "bash", "-c", "sleep 1; cat ${HQ_NODE_FILE}"]) time.sleep(0.5) table = hq_env.command(["job", "info", "1"], as_table=True) table.check_row_value("Resources", "nodes: 3") diff --git a/tests/test_jobfile.py b/tests/test_jobfile.py index 88ce095e9..434936644 100644 --- a/tests/test_jobfile.py +++ b/tests/test_jobfile.py @@ -9,12 +9,10 @@ def test_job_file_submit_minimal(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [[task]] command = ["sleep", "0"] - """ - ) + """) hq_env.command(["job", "submit-file", "job.toml"]) wait_for_job_state(hq_env, 1, "FINISHED") @@ -22,8 +20,7 @@ def test_job_file_submit_minimal(hq_env: HqEnv, tmp_path): def test_job_file_submit_maximal(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_workers(3, cpus=4, args=["--resource", "gpus=[0,1]"]) - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" name = "test-job" stream_log = "output.log" max_fails = 11 @@ -56,8 +53,7 @@ def test_job_file_submit_maximal(hq_env: HqEnv, tmp_path): id = 13 pin = "omp" command = ["sleep", "0"] -""" - ) +""") hq_env.command(["job", "submit-file", "job.toml"]) wait_for_job_state(hq_env, 1, "FINISHED") @@ -101,8 +97,7 @@ def test_job_file_resource_variants1(hq_env: HqEnv, tmp_path): hq_env.start_worker(cpus=2, args=["--resource", "gpus=[0,1]"]) hq_env.start_workers(2, cpus=4) - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [[task]] id = 0 command = ["sleep", "1"] @@ -112,8 +107,7 @@ def test_job_file_resource_variants1(hq_env: HqEnv, tmp_path): [[task.request]] resources = { "cpus" = "1", "gpus" = "1" } -""" - ) +""") hq_env.command(["job", "submit-file", "job.toml"]) wait_for_job_state(hq_env, 1, "RUNNING") @@ -132,8 +126,7 @@ def test_job_file_resource_variants2(hq_env: HqEnv, tmp_path): hq_env.start_workers(1, cpus=4, args=["--resource", "x=[0,1]"]) hq_env.start_workers(2, cpus=2, args=["--resource", "x=[0]", "--resource", "y=[0]"]) - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [[task]] id = 0 command = ["/bin/bash", @@ -147,8 +140,7 @@ def test_job_file_resource_variants2(hq_env: HqEnv, tmp_path): [[task.request]] resources = { "cpus" = "4", "x" = "1" } - """ - ) + """) hq_env.command(["job", "submit-file", "job.toml"]) wait_for_job_state(hq_env, 1, "FINISHED") table = hq_env.command(["task", "info", "1", "0"], as_table=True) @@ -161,10 +153,7 @@ def test_job_file_resource_variants3(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker(cpus=16, args=["--resource", "x=[0,1]"]) - tmp_path.joinpath("job.toml").write_text( - "\n".join( - [ - f""" + tmp_path.joinpath("job.toml").write_text("\n".join([f""" [[task]] id = {x} command = ["/bin/bash", @@ -174,11 +163,7 @@ def test_job_file_resource_variants3(hq_env: HqEnv, tmp_path): resources = {{ "cpus" = "1", "x"=1 }} [[task.request]] resources = {{ "cpus" = "4" }} - """ - for x in range(5) - ] - ) - ) + """ for x in range(5)])) hq_env.command(["job", "submit-file", "job.toml"]) wait_for_job_state(hq_env, 1, "FINISHED") @@ -191,8 +176,7 @@ def test_job_file_resource_variants3(hq_env: HqEnv, tmp_path): def test_job_file_auto_id(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [[task]] command = ["sleep", "0"] @@ -209,8 +193,7 @@ def test_job_file_auto_id(hq_env: HqEnv, tmp_path): [[task]] command = ["sleep", "0"] - """ - ) + """) hq_env.command(["job", "submit-file", "job.toml"]) wait_for_job_state(hq_env, 1, "FINISHED") r = hq_env.command(["--output-mode=json", "job", "info", "1"], as_json=True) @@ -220,13 +203,11 @@ def test_job_file_auto_id(hq_env: HqEnv, tmp_path): def test_job_file_array(hq_env: HqEnv, tmp_path): hq_env.start_server() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [array] ids = "2, 10-14, 120" command = ["sleep", "0"] - """ - ) + """) hq_env.command(["job", "submit-file", "job.toml"]) r = hq_env.command(["job", "info", "1"], as_table=True) r.check_row_value("Tasks", "7; Ids: 2, 10-14, 120") @@ -234,8 +215,7 @@ def test_job_file_array(hq_env: HqEnv, tmp_path): def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path): hq_env.start_server() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [array] ids = "2" command = ["sleep", "0"] @@ -243,8 +223,7 @@ def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path): [[task]] id = 1 command = ["sleep", "0"] - """ - ) + """) hq_env.command( ["job", "submit-file", "job.toml"], expect_fail="Definition of array job and individual task cannot be mixed", @@ -254,13 +233,11 @@ def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path): def test_job_file_array_entries_without_ids(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [array] entries = ["a", "bb", "ccc"] command = ["/bin/bash", "-c", "echo $HQ_ENTRY"] - """ - ) + """) expected = {0: "a", 1: "bb", 2: "ccc"} @@ -275,14 +252,12 @@ def test_job_file_array_entries_without_ids(hq_env: HqEnv, tmp_path): def test_job_file_array_entries_with_ids(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [array] ids = "2,10-12" entries = ["a", "bb", "ccc", "x"] command = ["/bin/bash", "-c", "echo $HQ_ENTRY"] - """ - ) + """) expected = {2: "a", 10: "bb", 11: "ccc", 12: "x"} @@ -297,8 +272,7 @@ def test_job_file_array_entries_with_ids(hq_env: HqEnv, tmp_path): def test_job_file_dependencies(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.start_worker() - tmp_path.joinpath("job.toml").write_text( - """ + tmp_path.joinpath("job.toml").write_text(""" [[task]] id = 1 command = ["sleep", "0"] @@ -311,8 +285,7 @@ def test_job_file_dependencies(hq_env: HqEnv, tmp_path): id = 5 command = ["sleep", "0"] deps = [1, 3] - """ - ) + """) hq_env.command(["job", "submit-file", "job.toml"]) table = hq_env.command(["task", "info", "1", "5"], as_table=True) table.check_row_value("Dependencies", "1,3") diff --git a/tests/test_manager.py b/tests/test_manager.py index 463422152..503af9fb3 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -57,19 +57,13 @@ def test_manager_autodetect(hq_env: HqEnv): with hq_env.mock.mock_program_with_code("qstat", qstat_return_walltime("x1234")): with hq_env.mock.mock_program_with_code("scontrol", scontrol_return("y5678")): hq_env.start_worker(cpus=1) - hq_env.start_worker( - cpus=1, env={"PBS_ENVIRONMENT": "PBS_BATCH", "PBS_JOBID": "x1234"} - ) + hq_env.start_worker(cpus=1, env={"PBS_ENVIRONMENT": "PBS_BATCH", "PBS_JOBID": "x1234"}) hq_env.start_worker(cpus=1, env={"SLURM_JOB_ID": "y5678"}) table = hq_env.command(["worker", "list"], as_table=True) table.check_columns_value(["Manager", "Manager Job ID"], 0, ["None", "N/A"]) - table.check_columns_value( - ["Manager", "Manager Job ID"], 1, ["PBS", "x1234"] - ) - table.check_columns_value( - ["Manager", "Manager Job ID"], 2, ["SLURM", "y5678"] - ) + table.check_columns_value(["Manager", "Manager Job ID"], 1, ["PBS", "x1234"]) + table.check_columns_value(["Manager", "Manager Job ID"], 2, ["SLURM", "y5678"]) table = hq_env.command(["worker", "info", "2"], as_table=True) table.check_row_value("Manager", "PBS") @@ -84,9 +78,7 @@ def test_manager_set_none(hq_env: HqEnv): hq_env.start_server() args = ["--manager", "none"] hq_env.start_worker(cpus=1, args=args) - hq_env.start_worker( - cpus=1, args=args, env={"PBS_ENVIRONMENT": "PBS_BATCH", "PBS_JOBID": "x1234"} - ) + hq_env.start_worker(cpus=1, args=args, env={"PBS_ENVIRONMENT": "PBS_BATCH", "PBS_JOBID": "x1234"}) hq_env.start_worker(cpus=1, args=args, env={"SLURM_JOB_ID": "y5678"}) table = hq_env.command(["worker", "list"], as_table=True) @@ -139,9 +131,7 @@ def test_manager_slurm(hq_env: HqEnv): hq_env.start_server() with hq_env.mock.mock_program_with_code("scontrol", scontrol_return("abcd")): - hq_env.start_worker( - cpus=1, args=["--manager", "slurm"], env={"SLURM_JOB_ID": "abcd"} - ) + hq_env.start_worker(cpus=1, args=["--manager", "slurm"], env={"SLURM_JOB_ID": "abcd"}) table = hq_env.command(["worker", "list"], as_table=True) table.check_columns_value(["Manager", "Manager Job ID"], 0, ["SLURM", "abcd"]) diff --git a/tests/test_placeholders.py b/tests/test_placeholders.py index 853d5e088..6761fc206 100644 --- a/tests/test_placeholders.py +++ b/tests/test_placeholders.py @@ -15,8 +15,7 @@ def test_cwd_recursive_placeholder(hq_env: HqEnv): hq_env.start_server() hq_env.command( ["submit", "--cwd", "%{CWD}/foo", "--", "bash", "-c", "echo 'hello'"], - expect_fail="Working directory path cannot contain the working " - "directory placeholder `%{CWD}`.", + expect_fail="Working directory path cannot contain the working directory placeholder `%{CWD}`.", ) @@ -134,9 +133,7 @@ def test_task_resolve_worker_placeholders(hq_env: HqEnv): def test_stream_submit_placeholder(hq_env: HqEnv): hq_env.start_server() - hq_env.command( - ["submit", "--log", "log-%{JOB_ID}", "--", "bash", "-c", "echo Hello"] - ) + hq_env.command(["submit", "--log", "log-%{JOB_ID}", "--", "bash", "-c", "echo Hello"]) hq_env.start_workers(1) wait_for_job_state(hq_env, 1, "FINISHED") @@ -186,12 +183,9 @@ def test_server_uid_placeholder(hq_env: HqEnv, tmp_path): @pytest.mark.parametrize("channel", ("stdout", "stderr")) def test_warning_missing_placeholder_in_output(hq_env: HqEnv, channel: str): hq_env.start_server() - output = hq_env.command( - ["submit", "--array=1-4", f"--{channel}=foo", "/bin/hostname"] - ) + output = hq_env.command(["submit", "--array=1-4", f"--{channel}=foo", "/bin/hostname"]) assert ( - f"You have submitted an array job, but the `{channel}` " - + "path does not contain the task ID placeholder." + f"You have submitted an array job, but the `{channel}` " + "path does not contain the task ID placeholder." in output ) assert f"Consider adding `%{{TASK_ID}}` to the `--{channel}` value." diff --git a/tests/test_resources.py b/tests/test_resources.py index e1be507a1..11ef22f84 100644 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -24,16 +24,11 @@ def test_worker_resources_display(hq_env: HqEnv): ], ) table = hq_env.command(["worker", "list"], as_table=True) - assert table.get_column_value("Resources") == [ - "cpus 4x2; fairy 10001000; potato 12; shark 4" - ] + assert table.get_column_value("Resources") == ["cpus 4x2; fairy 10001000; potato 12; shark 4"] table = hq_env.command(["worker", "info", "1"], as_table=True) print(table.get_row_value("Resources")) - assert ( - table.get_row_value("Resources") - == "cpus: 4x2\nfairy: 10001000\npotato: 12\nshark: 4" - ) + assert table.get_row_value("Resources") == "cpus: 4x2\nfairy: 10001000\npotato: 12\nshark: 4" def test_task_resources_ignore_worker_without_resource(hq_env: HqEnv): @@ -61,9 +56,7 @@ def check_unscheduled(): hq_env.start_worker(cpus=4, args=["--resource", "fairy=sum(1000)"]) check_unscheduled() - hq_env.start_worker( - cpus=4, args=["--resource", "fairy=sum(2)", "--resource", "potato=sum(500)"] - ) + hq_env.start_worker(cpus=4, args=["--resource", "fairy=sum(2)", "--resource", "potato=sum(500)"]) check_unscheduled() @@ -80,8 +73,10 @@ def test_task_resources_allocate(hq_env: HqEnv): "--", "bash", "-c", - "echo $HQ_RESOURCE_REQUEST_fairy:$HQ_RESOURCE_REQUEST_potato" # no comma - ":$HQ_RESOURCE_VALUES_fairy:$HQ_RESOURCE_VALUES_potato", + ( # no comma + "echo $HQ_RESOURCE_REQUEST_fairy:$HQ_RESOURCE_REQUEST_potato" + ":$HQ_RESOURCE_VALUES_fairy:$HQ_RESOURCE_VALUES_potato" + ), ] ) @@ -188,10 +183,9 @@ def test_worker_set_gpu_env_for_task(hq_env: HqEnv): ] ) wait_for_job_state(hq_env, 1, "FINISHED") - assert list( - set(int(v) for v in line.split(",")) - for line in read_file(default_task_output()).splitlines() - ) == [{0, 1}] + assert list(set(int(v) for v in line.split(",")) for line in read_file(default_task_output()).splitlines()) == [ + {0, 1} + ] @pytest.mark.parametrize( @@ -210,9 +204,7 @@ def test_worker_detect_gpus_from_env(hq_env: HqEnv, env_and_res: str): def test_worker_detect_uuid_gpus_from_env(hq_env: HqEnv): hq_env.start_server() - resources = hq_env.command( - ["worker", "hwdetect"], env={"CUDA_VISIBLE_DEVICES": "foo,bar"} - ) + resources = hq_env.command(["worker", "hwdetect"], env={"CUDA_VISIBLE_DEVICES": "foo,bar"}) assert "gpus/nvidia: [foo,bar]" in resources @@ -235,9 +227,7 @@ def test_worker_detect_multiple_gpus_from_env(hq_env: HqEnv): ) def test_worker_detect_respect_cpu_mask(hq_env: HqEnv): hq_env.start_server() - resources = hq_env.command( - ["worker", "hwdetect"], cmd_prefix=["taskset", "-c", "1"] - ) + resources = hq_env.command(["worker", "hwdetect"], cmd_prefix=["taskset", "-c", "1"]) assert "cpus: [1]" in resources @@ -295,14 +285,12 @@ def test_resource_name_ensure_normalization(hq_env: HqEnv): "--resource", f"{res_name}=1", "--", - *python( - """ + *python(""" import os import sys print(os.environ["HQ_RESOURCE_REQUEST_gpus_amd"], flush=True) print(os.environ["HQ_RESOURCE_VALUES_gpus_amd"], flush=True) -""" - ), +"""), ] ) hq_env.start_worker(args=["--resource", f"{res_name}=[0]"]) diff --git a/tests/test_stream.py b/tests/test_stream.py index b72996cea..700bef00c 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -39,11 +39,7 @@ def test_stream_submit(hq_env: HqEnv): result = hq_env.command(["log", "mylog", "show", "--channel=stderr"]) assert result == "" - lines = set( - hq_env.command( - ["log", "mylog", "show", "--channel=stderr", "--show-empty"], as_lines=True - ) - ) + lines = set(hq_env.command(["log", "mylog", "show", "--channel=stderr", "--show-empty"], as_lines=True)) for i in range(1, 21): assert "{0:02}: > stream closed".format(i) in lines @@ -57,9 +53,7 @@ def test_stream_submit(hq_env: HqEnv): result = hq_env.command(["log", "mylog", "cat", "stdout"]) assert result == "".join(["Hello from {}\n".format(i) for i in range(1, 21)]) - result = hq_env.command( - ["log", "mylog", "cat", "stdout", "--task=3-4,2"] - ).splitlines() + result = hq_env.command(["log", "mylog", "cat", "stdout", "--task=3-4,2"]).splitlines() assert result[0] == "Hello from 3" assert result[1] == "Hello from 4" assert result[2] == "Hello from 2" @@ -292,8 +286,10 @@ def test_stream_unfinished_large(hq_env: HqEnv): "--", "python3", "-c", - "import time; import sys; sys.stdout.write('ab' * (16 * 1024 + 3));" - "sys.stdout.flush(); time.sleep(2); print('end')", + ( + "import time; import sys; sys.stdout.write('ab' * (16 * 1024 + 3));" + "sys.stdout.flush(); time.sleep(2); print('end')" + ), ] ) hq_env.start_workers(1) @@ -330,10 +326,7 @@ def test_stream_task_fail(hq_env: HqEnv): ) wait_for_job_state(hq_env, 1, "FAILED") - wait_until( - lambda: hq_env.command(["log", "mylog", "show"]) - == "0:0> Start\n0: > stream closed\n" - ) + wait_until(lambda: hq_env.command(["log", "mylog", "show"]) == "0:0> Start\n0: > stream closed\n") check_no_stream_connections(hq_env) @@ -361,10 +354,7 @@ def test_stream_task_cancel(hq_env: HqEnv): hq_env.command(["job", "cancel", "1"]) wait_for_job_state(hq_env, 1, "CANCELED") - wait_until( - lambda: hq_env.command(["log", "mylog", "show"]) - == "0:0> Start\n0: > stream closed\n" - ) + wait_until(lambda: hq_env.command(["log", "mylog", "show"]) == "0:0> Start\n0: > stream closed\n") check_no_stream_connections(hq_env) @@ -391,10 +381,7 @@ def test_stream_worker_killed(hq_env: HqEnv): hq_env.kill_worker(1) wait_for_job_state(hq_env, 1, "WAITING") - wait_until( - lambda: hq_env.command(["log", "mylog", "show"]) - == "0:0> Start\n0: > stream closed\n" - ) + wait_until(lambda: hq_env.command(["log", "mylog", "show"]) == "0:0> Start\n0: > stream closed\n") table = hq_env.command(["server", "info", "--stats"], as_table=True) table.check_row_value("Stream connections", "") @@ -421,9 +408,6 @@ def test_stream_timeout(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FAILED") - wait_until( - lambda: hq_env.command(["log", "mylog", "show"]) - == "0:0> Start\n0: > stream closed\n" - ) + wait_until(lambda: hq_env.command(["log", "mylog", "show"]) == "0:0> Start\n0: > stream closed\n") check_no_stream_connections(hq_env) diff --git a/tests/test_utils.py b/tests/test_utils.py index c6b550b32..313bc6f60 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,15 +2,13 @@ def test_parse_table_horizontal(): - table = parse_table( - """+---------+--------------+ + table = parse_table("""+---------+--------------+ | Id | Name | +---------+--------------+ | a | b | | c | d | +---------+--------------+ -""" - ) +""") assert table.header == ["Id", "Name"] assert table.rows == [ ["a", "b"], @@ -19,25 +17,21 @@ def test_parse_table_horizontal(): def test_parse_table_horizontal_empty(): - table = parse_table( - """+---------+--------------+ + table = parse_table("""+---------+--------------+ | Id | Name | +---------+--------------+ -""" - ) +""") assert table.header == ["Id", "Name"] assert table.rows == [] def test_parse_table_vertical(): - table = parse_table( - """+---------+--------------+ + table = parse_table("""+---------+--------------+ | Id | 1 | | Name | 2 | | Value | c | +---------+--------------+ -""" - ) +""") assert table.header is None assert table.rows == [ ["Id", "1"], @@ -47,16 +41,14 @@ def test_parse_table_vertical(): def test_parse_table_multiline_value(): - table = parse_table( - """+---------+--------------+ + table = parse_table("""+---------+--------------+ | Id | 1 | | Name | line1 | | | line2 | | | line3 | | Value | c | +---------+--------------+ -""" - ) +""") assert table.header is None assert table.rows == [ ["Id", "1"], @@ -66,33 +58,28 @@ def test_parse_table_multiline_value(): def test_parse_table_empty(): - table = parse_table( - """ + table = parse_table(""" +---------+--------------+ +---------+--------------+ - """ - ) + """) assert table is None def test_parse_table_ignore_suffix(): - table, remainder = parse_table( - """ + table, remainder = parse_table(""" +---------+--------------+ |a|b| +---------+--------------+ |c|d| +---------+--------------+ hello world - """ - ) + """) assert table.header == ["a", "b"] assert table.rows == [["c", "d"]] def test_parse_tables_horizontal(): - tables = parse_tables( - """+---------+--------------+ + tables = parse_tables("""+---------+--------------+ | Id | Name | +---------+--------------+ | a | b | @@ -104,8 +91,7 @@ def test_parse_tables_horizontal(): | e | f | | g | h | +---------+--------------+ -""" - ) +""") print("Found tables:", len(tables)) assert tables[0].header == ["Id", "Name"] assert tables[0].rows == [ @@ -120,8 +106,7 @@ def test_parse_tables_horizontal(): def test_parse_tables_vertical(): - tables = parse_tables( - """+---------+--------------+ + tables = parse_tables("""+---------+--------------+ | Id | 1 | | Name | 2 | | Value | a | @@ -131,8 +116,7 @@ def test_parse_tables_vertical(): | Name | 4 | | Value | b | +---------+--------------+ -""" - ) +""") assert tables[0].header is None assert tables[0].rows == [ ["Id", "1"], diff --git a/tests/test_worker.py b/tests/test_worker.py index 3ac430327..890b77815 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -158,18 +158,10 @@ def test_worker_list_resources(hq_env: HqEnv): table = list_all_workers(hq_env) assert len(table) == 4 - table.check_columns_value( - ["ID", "State", "Resources"], 0, ["1", "RUNNING", "cpus 10"] - ) - table.check_columns_value( - ["ID", "State", "Resources"], 1, ["2", "RUNNING", "cpus 4x5"] - ) - table.check_columns_value( - ["ID", "State", "Resources"], 2, ["3", "RUNNING", "cpus 2x3"] - ) - table.check_columns_value( - ["ID", "State", "Resources"], 3, ["4", "RUNNING", "cpus 1x1 1x2 1x3"] - ) + table.check_columns_value(["ID", "State", "Resources"], 0, ["1", "RUNNING", "cpus 10"]) + table.check_columns_value(["ID", "State", "Resources"], 1, ["2", "RUNNING", "cpus 4x5"]) + table.check_columns_value(["ID", "State", "Resources"], 2, ["3", "RUNNING", "cpus 2x3"]) + table.check_columns_value(["ID", "State", "Resources"], 3, ["4", "RUNNING", "cpus 1x1 1x2 1x3"]) def test_idle_timeout_server_cfg(hq_env: HqEnv): @@ -259,9 +251,7 @@ def test_server_lost_stop_with_task(hq_env: HqEnv): hq_env.start_server() worker = hq_env.start_worker(on_server_lost="stop") path = os.path.join(hq_env.work_path, "finished") - hq_env.command( - ["submit", "--array=1-10", "--", "bash", "-c", f"sleep 3; touch {path}"] - ) + hq_env.command(["submit", "--array=1-10", "--", "bash", "-c", f"sleep 3; touch {path}"]) wait_for_job_state(hq_env, 1, "RUNNING") hq_env.kill_server() hq_env.check_process_exited(worker, expected_code=1) @@ -289,9 +279,7 @@ def test_server_lost_finish_running_with_task(hq_env: HqEnv): time.sleep(2.0) hq_env.check_process_exited(worker, expected_code=1) - files = [ - path for path in os.listdir(hq_env.work_path) if path.startswith("finished-") - ] + files = [path for path in os.listdir(hq_env.work_path) if path.startswith("finished-")] assert len(files) == 1 diff --git a/tests/utils/job.py b/tests/utils/job.py index 0e8367d8d..58503195c 100644 --- a/tests/utils/job.py +++ b/tests/utils/job.py @@ -5,9 +5,7 @@ from .table import Table -def default_task_output( - job_id=1, task_id=0, type="stdout", working_dir: Optional[str] = None -) -> str: +def default_task_output(job_id=1, task_id=0, type="stdout", working_dir: Optional[str] = None) -> str: working_dir = working_dir if working_dir else os.getcwd() return f"{working_dir}/job-{job_id}/{task_id}.{type}" diff --git a/tests/utils/table.py b/tests/utils/table.py index f54e25d0b..fa3c84349 100644 --- a/tests/utils/table.py +++ b/tests/utils/table.py @@ -103,9 +103,7 @@ def parse_table(table_info): current_rows = [] divider_count += 1 # End early - if ((divider_count == 3) or (divider_count == 2 and header is None)) and ( - i + 1 - ) < len(lines): + if ((divider_count == 3) or (divider_count == 2 and header is None)) and (i + 1) < len(lines): return Table(rows, header=header), lines[(i + 1) :] continue items = [x.strip() for x in line.split("|")[1:-1]] diff --git a/tests/utils/wait.py b/tests/utils/wait.py index 173ccd80e..b3a2b88ee 100644 --- a/tests/utils/wait.py +++ b/tests/utils/wait.py @@ -47,9 +47,7 @@ def check(): table = env.command(commands, as_table=True) last_table = table items = [row for row in table if row[0] in ids] - return len(items) >= len(ids) and all( - j[state_index].lower() in target_states for j in items - ) + return len(items) >= len(ids) and all(j[state_index].lower() in target_states for j in items) try: wait_until(check, **kwargs) @@ -59,15 +57,11 @@ def check(): raise e -def wait_for_job_state( - env, ids: Union[int, List[int]], target_states: Union[str, List[str]], **kwargs -): +def wait_for_job_state(env, ids: Union[int, List[int]], target_states: Union[str, List[str]], **kwargs): wait_for_state(env, ids, target_states, ["job", "list", "--all"], 2, **kwargs) -def wait_for_worker_state( - env, ids: Union[int, List[int]], target_states: Union[str, List[str]], **kwargs -): +def wait_for_worker_state(env, ids: Union[int, List[int]], target_states: Union[str, List[str]], **kwargs): wait_for_state(env, ids, target_states, ["worker", "list", "--all"], 1, **kwargs) @@ -79,6 +73,4 @@ def wait_for_pid_exit(pid: int): def wait_for_job_list_count(env, count: int): - wait_until( - lambda: len(env.command(["job", "list", "--all"], as_table=True)) == count - ) + wait_until(lambda: len(env.command(["job", "list", "--all"], as_table=True)) == count)