Skip to content

Commit

Permalink
Don't call extend_report_with_coverage_gains in apply_async callback. (
Browse files Browse the repository at this point in the history
…#709)

Per

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async,
callbacks should return immediately or they will otherwise block the
entire Pool from making progress.

For large experiments, this is likely causing problems causing our
throughput to slow to a crawl as the experiment runs, as every single
benchmark experiment finishing requires this expensive calculation.

From debugging with GDB on
#692, it looks like a large
number of worker processes are stuck waiting to report results:

```
(gdb) py-bt
Traceback (most recent call first):
  File "/usr/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/usr/lib/python3.11/multiprocessing/queues.py", line 376, in put
    with self._wlock:
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 131, in worker
    put((job, i, result))
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
```

This partially reverts #566.
We instead just create a new sub-process to periodically call this in
the background to avoid blocking anything.
  • Loading branch information
oliverchang authored Nov 8, 2024
1 parent 6c04059 commit be6a32a
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions run_all_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import time
import traceback
from datetime import timedelta
from multiprocessing import Pool
from multiprocessing import Pool, Process
from typing import Any

import run_one_experiment
Expand Down Expand Up @@ -60,8 +60,6 @@
LOG_FMT = ('%(asctime)s.%(msecs)03d %(levelname)s '
'%(module)s - %(funcName)s: %(message)s')

EXPERIMENT_RESULTS = []


class Result:
benchmark: benchmarklib.Benchmark
Expand Down Expand Up @@ -335,15 +333,24 @@ def extend_report_with_coverage_gains() -> None:
comparative_cov_gains)


def _print_and_dump_experiment_result(result: Result):
def extend_report_with_coverage_gains_process():
"""A process that continuously runs to update coverage gains in the
background."""
while True:
time.sleep(300) # 5 minutes.
try:
extend_report_with_coverage_gains()
except Exception:
logger.error('Failed to extend report with coverage gains')
traceback.print_exc()


def _print_experiment_result(result: Result):
"""Prints the |result| of a single experiment."""
logger.info('\n**** Finished benchmark %s, %s ****\n%s',
result.benchmark.project, result.benchmark.function_signature,
result.result)

EXPERIMENT_RESULTS.append(result)
extend_report_with_coverage_gains()


def _print_experiment_results(results: list[Result],
cov_gain: dict[str, dict[str, Any]]):
Expand Down Expand Up @@ -503,7 +510,7 @@ def _process_total_coverage_gain() -> dict[str, dict[str, Any]]:


def main():
global WORK_DIR, EXPERIMENT_RESULTS
global WORK_DIR

args = parse_args()
_setup_logging(args.log_level)
Expand All @@ -528,27 +535,38 @@ def main():
len(experiment_targets), str(NUM_EXP))

# Set global variables that are updated throughout experiment runs.
EXPERIMENT_RESULTS = []
WORK_DIR = args.work_dir

coverage_gains_process = Process(
target=extend_report_with_coverage_gains_process)
coverage_gains_process.start()

experiment_results = []
if NUM_EXP == 1:
for target_benchmark in experiment_targets:
result = run_experiments(target_benchmark, args)
_print_and_dump_experiment_result(result)
_print_experiment_result(result)
experiment_results.append(result)
else:
experiment_tasks = []
with Pool(NUM_EXP, maxtasksperchild=1) as p:
for target_benchmark in experiment_targets:
experiment_task = p.apply_async(
run_experiments, (target_benchmark, args),
callback=_print_and_dump_experiment_result)
experiment_task = p.apply_async(run_experiments,
(target_benchmark, args),
callback=_print_experiment_result)
experiment_tasks.append(experiment_task)
time.sleep(args.delay)

experiment_results = [task.get() for task in experiment_tasks]

# Signal that no more work will be submitte to the pool.
p.close()

# Wait for all workers to complete.
p.join()

# Do a final coverage aggregation.
coverage_gains_process.kill()
extend_report_with_coverage_gains()

# Capture time at end
Expand All @@ -559,7 +577,7 @@ def main():
str(timedelta(seconds=end - start)))

coverage_gain_dict = _process_total_coverage_gain()
_print_experiment_results(EXPERIMENT_RESULTS, coverage_gain_dict)
_print_experiment_results(experiment_results, coverage_gain_dict)


if __name__ == '__main__':
Expand Down

0 comments on commit be6a32a

Please sign in to comment.