diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index 20673a293..53c474d0e 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -217,6 +217,8 @@ def __init__(self, max_workers: int | None = None): import multiprocessing self._max_workers = multiprocessing.cpu_count() + else: + self._max_workers = max_workers self._work_queue = queue.Queue() self._workers = [] diff --git a/tests/test_1254_test_threadpool_executor_for_dask.py b/tests/test_1254_test_threadpool_executor_for_dask.py new file mode 100644 index 000000000..67852de0e --- /dev/null +++ b/tests/test_1254_test_threadpool_executor_for_dask.py @@ -0,0 +1,44 @@ +import pytest +import skhep_testdata + +import uproot + +pytest.importorskip("pandas") + + +def test_decompression_threadpool_executor_for_dask(): + + class TestThreadPoolExecutor(uproot.source.futures.ThreadPoolExecutor): + def __init__(self, max_workers=None): + super().__init__(max_workers=max_workers) + self.submit_count = 0 + + def submit(self, task, /, *args, **kwargs): + self.submit_count += 1 + super().submit(task, *args, **kwargs) + + implicitexecutor = TestThreadPoolExecutor(max_workers=None) + + a = uproot.dask( + {skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"): "sample"}, + decompression_executor=implicitexecutor, + ) + + a["i4"].compute() + + assert implicitexecutor.max_workers > 0 + + assert implicitexecutor.submit_count > 0 + + explicitexecutor = TestThreadPoolExecutor(max_workers=1) + + b = uproot.dask( + {skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"): "sample"}, + decompression_executor=explicitexecutor, + ) + + b["i4"].compute() + + assert explicitexecutor.max_workers == 1 + + assert explicitexecutor.submit_count > 0