From 6abbdc3ff22522a4936252020f67ff7c96cc4a0d Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Wed, 11 Oct 2023 14:15:08 +0100 Subject: [PATCH 01/10] feat: add support for shape touching in Dask (#966) * adapt to new shape touching in awkward/dask-awkward * style: pre-commit fixes * change set to list when selecting form columns * checkpoitn * wip * wip 2 * fix: use trivial form mapping if missing * fix: type hints * refactor: add mixin to share code * chore: fix typos * fix: take first field, not last * fix: update to newest dask-awkward API * fix: update to return report * fix: aggregate keys to read ahead of time * Revert "fix: aggregate keys to read ahead of time" This reverts commit 4a69863412e52db363adfccd0de2d7e69b3a2878. * fix: update to reflect changes to dask-awkward PR * feat: add support for "necessary_columns" * fix: use typing_extensions * fix: don't convert to array via asarray * fix: don't import `dask_awkward` * chore: update dependency warning * test: fix typo * The next release will be 5.1.0rc1. * chore: bump dask-awkward to include the pre-release * chore: bump awkward dependency * feat: export interfaces * The next release will be 5.1.0rc2. --------- Co-authored-by: Lindsey Gray Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jim Pivarski --- pyproject.toml | 5 +- src/uproot/__init__.py | 2 +- src/uproot/_dask.py | 523 ++++++++++++++++----------- src/uproot/extras.py | 8 +- src/uproot/version.py | 2 +- src/uproot/writing/_cascadetree.py | 2 +- tests/test_0603-dask-delayed-open.py | 2 +- 7 files changed, 322 insertions(+), 222 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b0da5b2c1..89c4667db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,10 +38,11 @@ classifiers = [ "Topic :: Utilities", ] dependencies = [ - "awkward>=2.0.0", + "awkward>=2.4.5", "importlib-metadata;python_version<\"3.8\"", "numpy", "packaging", + "typing_extensions>=4.1.0; python_version < \"3.11\"" ] dynamic = [ "version", @@ -50,7 +51,7 @@ dynamic = [ [project.optional-dependencies] dev = [ "boost_histogram>=0.13", - "dask-awkward>=2023.9.0;python_version >= \"3.8\"", + "dask-awkward>=2023.10.0a1;python_version >= \"3.8\"", "dask[array];python_version >= \"3.8\"", "hist>=1.2", "pandas", diff --git a/src/uproot/__init__.py b/src/uproot/__init__.py index 7e21ccd14..8777b4122 100644 --- a/src/uproot/__init__.py +++ b/src/uproot/__init__.py @@ -185,7 +185,7 @@ from uproot.behavior import behavior_of from uproot._util import no_filter -from uproot._dask import dask +from uproot._dask import dask, ImplementsFormMapping, ImplementsFormMappingInfo from uproot.pyroot import from_pyroot from uproot.pyroot import to_pyroot diff --git a/src/uproot/_dask.py b/src/uproot/_dask.py index 0737159f5..21009ee52 100644 --- a/src/uproot/_dask.py +++ b/src/uproot/_dask.py @@ -1,12 +1,26 @@ +from __future__ import annotations + import math from collections.abc import Callable, Iterable, Mapping +try: + from typing import TYPE_CHECKING + + from typing_extensions import Any, Final, Protocol, TypeVar +except ImportError: + from typing import TYPE_CHECKING, Any, Final, Protocol, TypeVar + import numpy import uproot from uproot._util import no_filter, unset from uproot.behaviors.TBranch import HasBranches, TBranch, _regularize_step_size +if TYPE_CHECKING: + from awkward._nplikes.typetracer import TypeTracerReport + from awkward.forms import Form + from awkward.highlevel import Array as AwkArray + def dask( files, @@ -731,142 +745,325 @@ def _get_dask_array_delay_open( return dask_dict -class _UprootRead: - def __init__( - self, - ttrees, - common_keys, - common_base_keys, - interp_options, - form_mapping, - rendered_form, - original_form=None, - ) -> None: - self.ttrees = ttrees - self.common_keys = common_keys - self.common_base_keys = common_base_keys - self.interp_options = interp_options - self.form_mapping = form_mapping - self.rendered_form = rendered_form - self.original_form = original_form +class ImplementsFormMappingInfo(Protocol): + @property + def behavior(self) -> dict | None: + ... - def __call__(self, i_start_stop): - i, start, stop = i_start_stop + buffer_key: str | Callable + + def parse_buffer_key(self, buffer_key: str) -> tuple[str, str]: + ... - if self.form_mapping is not None: - awkward = uproot.extras.awkward() - dask_awkward = uproot.extras.dask_awkward() + def keys_for_buffer_keys(self, buffer_keys: frozenset[str]) -> frozenset[str]: + ... - if set(self.common_keys) != set(self.rendered_form.columns()): - actual_form = self.rendered_form.select_columns(self.common_keys) + def load_buffers( + self, + tree: HasBranches, + keys: frozenset[str], + start: int, + stop: int, + options: Any, + ) -> Mapping[str, AwkArray]: + ... + + +class ImplementsFormMapping(Protocol): + def __call__(self, form: Form) -> tuple[Form, ImplementsFormMappingInfo]: + ... + + +class TrivialFormMappingInfo(ImplementsFormMappingInfo): + def __init__(self, form): + awkward = uproot.extras.awkward() + assert isinstance(form, awkward.forms.RecordForm) + + self._form = form + self._form_key_to_key = self.build_form_key_to_key(form) + + @property + def behavior(self) -> None: + return None + + @staticmethod + def build_form_key_to_key(form: Form) -> dict[str, str | None]: + form_key_to_path: dict[str, str | None] = {} + + def impl(form, column_path): + # Store columnar path + form_key_to_path[form.form_key] = column_path[0] if column_path else None + + if form.is_union: + for _i, entry in enumerate(form.contents): + impl(entry, column_path) + elif form.is_indexed: + impl(form.content, column_path) + elif form.is_list: + impl(form.content, column_path) + elif form.is_option: + impl(form.content, column_path) + elif form.is_record: + for field in form.fields: + impl(form.content(field), (*column_path, field)) + elif form.is_unknown or form.is_numpy: + pass else: - actual_form = self.rendered_form + raise AssertionError(form) - mapping, buffer_key = self.form_mapping.create_column_mapping_and_key( - self.ttrees[i], start, stop, self.interp_options - ) + impl(form, ()) - layout = awkward.from_buffers( - actual_form, - stop - start, - mapping, - buffer_key=buffer_key, - highlevel=False, - ) + return form_key_to_path - return awkward.Array( - dask_awkward.lib.unproject_layout.unproject_layout( - self.rendered_form, - layout, - ), - behavior=self.form_mapping.behavior, - ) + buffer_key: Final[str] = "{form_key}-{attribute}" - array = self.ttrees[i].arrays( - self.common_keys, + def parse_buffer_key(self, buffer_key: str) -> tuple[str, str]: + form_key, attribute = buffer_key.rsplit("-", maxsplit=1) + return form_key, attribute + + def keys_for_buffer_keys(self, buffer_keys: frozenset[str]) -> frozenset[str]: + keys: set[str] = set() + for buffer_key in buffer_keys: + # Identify form key + form_key, attribute = buffer_key.rsplit("-", maxsplit=1) + # Identify key from form_key + keys.add(self._form_key_to_key[form_key]) + return frozenset(keys) + + def load_buffers( + self, + tree: HasBranches, + keys: frozenset[str], + start: int, + stop: int, + options: Any, + ) -> Mapping[str, AwkArray]: + # First, let's read the arrays as a tuple (to associate with each key) + arrays = tree.arrays( + keys, entry_start=start, entry_stop=stop, - ak_add_doc=self.interp_options["ak_add_doc"], + ak_add_doc=options["ak_add_doc"], + how=tuple, ) - if self.original_form is not None: - awkward = uproot.extras.awkward() - dask_awkward = uproot.extras.dask_awkward() + awkward = uproot.extras.awkward() + + # The subform generated by awkward.to_buffers() has different form keys + # from those used to perform buffer projection. However, the subform + # structure should be identical to the projection optimisation + # subform, as they're derived from `branch.interpretation.awkward_form` + # Therefore, we can correlate the subform keys using `expected_from_buffers` + container = {} + for key, array in zip(keys, arrays): + # First, convert the sub-array into buffers + ttree_subform, length, ttree_container = awkward.to_buffers(array) + + # Load the associated projection subform + projection_subform = self._form.content(key) + + # Correlate each TTree form key with the projection form key + for (src, src_dtype), (dst, dst_dtype) in zip( + ttree_subform.expected_from_buffers().items(), + projection_subform.expected_from_buffers(self.buffer_key).items(), + ): + assert src_dtype == dst_dtype # Sanity check! + container[dst] = ttree_container[src] - return awkward.Array( - dask_awkward.lib.unproject_layout.unproject_layout( - self.original_form, - array.layout, + return container + + +class TrivialFormMapping(ImplementsFormMapping): + def __call__(self, form: Form) -> tuple[Form, TrivialFormMappingInfo]: + dask_awkward = uproot.extras.dask_awkward() + new_form = dask_awkward.lib.utils.form_with_unique_keys(form, "") + return new_form, TrivialFormMappingInfo(new_form) + + +T = TypeVar("T") + + +class UprootReadMixin: + form_mapping: ImplementsFormMapping + base_form: Form + common_keys: frozenset[str] + interp_options: dict[str, Any] + + def read_tree(self, tree: HasBranches, start: int, stop: int) -> AwkArray: + assert start <= stop + + from awkward._nplikes.numpy import Numpy + + awkward = uproot.extras.awkward() + nplike = Numpy.instance() + + form, form_info = self.form_mapping(self.base_form) + + # The remap implementation should correctly populate the generated + # buffer mapping in __call__, such that the high-level form can be + # used in `from_buffers` + mapping = form_info.load_buffers( + tree, self.common_keys, start, stop, self.interp_options + ) + + # Populate container with placeholders if keys aren't required + # Otherwise, read from disk + container = {} + for buffer_key, dtype in form.expected_from_buffers( + buffer_key=form_info.buffer_key + ).items(): + # Which key(s) does this buffer require. This code permits the caller + # to require multiple keys to compute a single buffer. + keys_for_buffer = form_info.keys_for_buffer_keys(frozenset({buffer_key})) + # If reading this buffer loads a permitted key, read from the tree + # We might not have _all_ keys if e.g. buffer A requires one + # but not two of the keys required for buffer B + if all(k in self.common_keys for k in keys_for_buffer): + container[buffer_key] = mapping[buffer_key] + # Otherwise, introduce a placeholder + else: + container[buffer_key] = awkward.typetracer.PlaceholderArray( + nplike=nplike, + shape=(awkward.typetracer.unknown_length,), + dtype=dtype, ) - ) - return array + return awkward.from_buffers( + form, + stop - start, + container, + behavior=form_info.behavior, + buffer_key=form_info.buffer_key, + ) - def project_columns(self, common_keys=None, original_form=None): - common_base_keys = self.common_base_keys - if self.form_mapping is not None: - awkward = uproot.extras.awkward() + def mock(self) -> AwkArray: + awkward = uproot.extras.awkward() + high_level_form, form_info = self.form_mapping(self.base_form) + return awkward.typetracer.typetracer_from_form( + high_level_form, + highlevel=True, + behavior=form_info.behavior, + ) - tt, report = awkward.typetracer.typetracer_with_report( - self.rendered_form, highlevel=True - ) + def prepare_for_projection(self) -> tuple[AwkArray, TypeTracerReport, dict]: + awkward = uproot.extras.awkward() + dask_awkward = uproot.extras.dask_awkward() - if common_keys is not None: - for key in common_keys: - tt[tuple(key.split("."))].layout._touch_data(recursive=True) + # A form mapping will (may) remap the base form into a new form + # The remapped form can be queried for structural information + high_level_form, form_info = self.form_mapping(self.base_form) - common_base_keys = [ - x - for x in self.form_mapping.extract_form_keys_base_columns( - set(report.data_touched) - ) - if x in self.common_base_keys - ] - elif common_keys is not None: - common_keys = [x for x in common_keys if x in self.common_keys] + # Build typetracer and associated report object + meta, report = awkward.typetracer.typetracer_with_report( + high_level_form, + highlevel=True, + behavior=form_info.behavior, + buffer_key=form_info.buffer_key, + ) + + return ( + meta, + report, + { + "trace": dask_awkward.lib.utils.trace_form_structure( + high_level_form, + buffer_key=form_info.buffer_key, + ), + "form_info": form_info, + }, + ) + + def project(self: T, *, report: TypeTracerReport, state: dict) -> T: + keys = self.necessary_columns(report=report, state=state) + return self.project_keys(keys) + + def necessary_columns( + self, *, report: TypeTracerReport, state: dict + ) -> frozenset[str]: + ## Read from stash + # Form hierarchy information + form_key_to_parent_form_key: dict = state["trace"][ + "form_key_to_parent_form_key" + ] + # Buffer hierarchy information + form_key_to_buffer_keys: dict = state["trace"]["form_key_to_buffer_keys"] + # Restructured form information + form_info = state["form_info"] + + # Require the data of metadata buffers above shape-only requests + dask_awkward = uproot.extras.dask_awkward() + data_buffers = { + *report.data_touched, + *dask_awkward.lib.utils.buffer_keys_required_to_compute_shapes( + form_info.parse_buffer_key, + report.shape_touched, + form_key_to_parent_form_key, + form_key_to_buffer_keys, + ), + } + + # Determine which TTree keys need to be read + return form_info.keys_for_buffer_keys(data_buffers) & frozenset( + self.common_keys + ) + + def project_keys(self: T, keys: frozenset[str]) -> T: + raise NotImplementedError + + +class _UprootRead(UprootReadMixin): + def __init__( + self, + ttrees, + common_keys, + interp_options, + form_mapping: ImplementsFormMapping, + base_form, + ) -> None: + self.ttrees = ttrees + self.common_keys = frozenset(common_keys) + self.interp_options = interp_options + self.form_mapping = form_mapping + self.base_form = base_form + def project_keys(self: T, keys: frozenset[str]) -> T: return _UprootRead( - self.ttrees, - common_keys, - common_keys if self.form_mapping is None else common_base_keys, - self.interp_options, - self.form_mapping, - self.rendered_form, - original_form, + self.ttrees, keys, self.interp_options, self.form_mapping, self.base_form ) + def __call__(self, i_start_stop) -> AwkArray: + i, start, stop = i_start_stop + + return self.read_tree(self.ttrees[i], start, stop) -class _UprootOpenAndRead: + +class _UprootOpenAndRead(UprootReadMixin): def __init__( self, custom_classes, allow_missing, real_options, common_keys, - common_base_keys, interp_options, - form_mapping, - rendered_form, - original_form=None, + form_mapping: ImplementsFormMapping, + base_form: Form, ) -> None: self.custom_classes = custom_classes self.allow_missing = allow_missing self.real_options = real_options - self.common_keys = common_keys - self.common_base_keys = common_base_keys + self.common_keys = frozenset(common_keys) self.interp_options = interp_options self.form_mapping = form_mapping - self.rendered_form = rendered_form - self.original_form = original_form + self.base_form = base_form - def __call__(self, file_path_object_path_istep_nsteps_ischunk): + def __call__(self, blockwise_args) -> AwkArray: ( file_path, object_path, - istep_or_start, - nsteps_or_stop, - ischunk, - ) = file_path_object_path_istep_nsteps_ischunk + i_step_or_start, + n_steps_or_stop, + is_chunk, + ) = blockwise_args ttree = uproot._util.regularize_object_path( file_path, object_path, @@ -875,8 +1072,8 @@ def __call__(self, file_path_object_path_istep_nsteps_ischunk): self.real_options, ) num_entries = ttree.num_entries - if ischunk: - start, stop = istep_or_start, nsteps_or_stop + if is_chunk: + start, stop = i_step_or_start, n_steps_or_stop if (not 0 <= start < num_entries) or (not 0 <= stop <= num_entries): raise ValueError( f"""explicit entry start ({start}) or stop ({stop}) from uproot.dask 'files' argument is out of bounds for file @@ -890,103 +1087,31 @@ def __call__(self, file_path_object_path_istep_nsteps_ischunk): which has {num_entries} entries""" ) else: - events_per_step = math.ceil(num_entries / nsteps_or_stop) - start, stop = min((istep_or_start * events_per_step), num_entries), min( - (istep_or_start + 1) * events_per_step, num_entries + events_per_step = math.ceil(num_entries / n_steps_or_stop) + start, stop = min((i_step_or_start * events_per_step), num_entries), min( + (i_step_or_start + 1) * events_per_step, num_entries ) assert start <= stop - if self.form_mapping is not None: - awkward = uproot.extras.awkward() - dask_awkward = uproot.extras.dask_awkward() - if set(self.common_keys) != set(self.rendered_form.columns()): - actual_form = self.rendered_form.select_columns(self.common_keys) - else: - actual_form = self.rendered_form - - mapping, buffer_key = self.form_mapping.create_column_mapping_and_key( - ttree, start, stop, self.interp_options - ) - - layout = awkward.from_buffers( - actual_form, - stop - start, - mapping, - buffer_key=buffer_key, - highlevel=False, - ) - return awkward.Array( - dask_awkward.lib.unproject_layout.unproject_layout( - self.rendered_form, - layout, - ), - behavior=self.form_mapping.behavior, - ) - - array = ttree.arrays( - self.common_keys, - entry_start=start, - entry_stop=stop, - ak_add_doc=self.interp_options["ak_add_doc"], - ) - - if self.original_form is not None: - awkward = uproot.extras.awkward() - dask_awkward = uproot.extras.dask_awkward() - - return awkward.Array( - dask_awkward.lib.unproject_layout.unproject_layout( - self.original_form, - array.layout, - ) - ) - - return array - - def project_columns(self, columns=None, original_form=None): - common_base_keys = self.common_base_keys - if self.form_mapping is not None: - awkward = uproot.extras.awkward() - - tt, report = awkward.typetracer.typetracer_with_report( - self.rendered_form, highlevel=True - ) - - if columns is not None: - for key in columns: - tt[tuple(key.split("."))].layout._touch_data(recursive=True) - - common_base_keys = [ - x - for x in self.form_mapping.extract_form_keys_base_columns( - set(report.data_touched) - ) - if x in self.common_base_keys - ] - - elif columns is not None: - columns = [x for x in columns if x in self.common_keys] + return self.read_tree(ttree, start, stop) + def project_keys(self: T, keys: frozenset[str]) -> T: return _UprootOpenAndRead( self.custom_classes, self.allow_missing, self.real_options, - columns, - columns if self.form_mapping is None else common_base_keys, + keys, self.interp_options, self.form_mapping, - self.rendered_form, - original_form=original_form, + self.base_form, ) -def _get_meta_array( +def _get_ttree_form( awkward, - dask_awkward, ttree, common_keys, - form_mapping, ak_add_doc, ): contents = [] @@ -999,17 +1124,7 @@ def _get_meta_array( parameters = {"__doc__": ttree.title} if ak_add_doc else None - form = awkward.forms.RecordForm(contents, common_keys, parameters=parameters) - - if form_mapping is not None: - form = form_mapping(form) - - empty_arr = awkward.Array( - form.length_zero_array(highlevel=False), - behavior=None if form_mapping is None else form_mapping.behavior, - ) - - return dask_awkward.core.typetracer_array(empty_arr), form + return awkward.forms.RecordForm(contents, common_keys, parameters=parameters) def _get_dak_array( @@ -1165,13 +1280,8 @@ def real_filter_branch(branch): divisions.append(divisions[-1] + length) partition_args.append((i, start, stop)) - meta, form = _get_meta_array( - awkward, - dask_awkward, - ttrees[0], - common_keys, - form_mapping, - interp_options.get("ak_add_doc"), + base_form = _get_ttree_form( + awkward, ttrees[0], common_keys, interp_options.get("ak_add_doc") ) if len(partition_args) == 0: @@ -1181,17 +1291,14 @@ def real_filter_branch(branch): return dask_awkward.from_map( _UprootRead( ttrees, - common_keys if form_mapping is None else form.columns(), common_keys, interp_options, - form_mapping=form_mapping, - rendered_form=None if form_mapping is None else form, + form_mapping=TrivialFormMapping() if form_mapping is None else form_mapping, + base_form=base_form, ), partition_args, divisions=tuple(divisions), label="from-uproot", - behavior=None if form_mapping is None else form_mapping.behavior, - meta=meta, ) @@ -1225,13 +1332,8 @@ def _get_dak_array_delay_open( full_paths=full_paths, ) - meta, form = _get_meta_array( - awkward, - dask_awkward, - obj, - common_keys, - form_mapping, - interp_options.get("ak_add_doc"), + base_form = _get_ttree_form( + awkward, obj, common_keys, interp_options.get("ak_add_doc") ) divisions = [0] @@ -1272,15 +1374,12 @@ def _get_dak_array_delay_open( custom_classes, allow_missing, real_options, - common_keys if form_mapping is None else form.columns(), common_keys, interp_options, - form_mapping=form_mapping, - rendered_form=None if form_mapping is None else form, + form_mapping=TrivialFormMapping() if form_mapping is None else form_mapping, + base_form=base_form, ), partition_args, divisions=None if divisions is None else tuple(divisions), label="from-uproot", - behavior=None if form_mapping is None else form_mapping.behavior, - meta=meta, ) diff --git a/src/uproot/extras.py b/src/uproot/extras.py index 10b6c0970..39b6e8c18 100644 --- a/src/uproot/extras.py +++ b/src/uproot/extras.py @@ -37,11 +37,11 @@ def awkward(): to output as NumPy arrays, rather than Awkward arrays. """ ) from err - if parse_version("2") <= parse_version(awkward.__version__): + if parse_version(awkward.__version__) >= parse_version("2.4.5"): return awkward else: raise ModuleNotFoundError( - "Uproot 5.x can only be used with Awkward 2.x; you have Awkward {}".format( + "Uproot 5.x can only be used with Awkward 2.4.5 or newer; you have Awkward {}".format( awkward.__version__ ) ) @@ -325,11 +325,11 @@ def dask_awkward(): or conda install -c conda-forge dask dask-awkward""" ) from err - if parse_version("2023.9.0") <= parse_version(dask_awkward.__version__): + if parse_version(dask_awkward.__version__) >= parse_version("2023.10.0a1"): return dask_awkward else: raise ModuleNotFoundError( - "Uproot 5.x can only be used with dask-awkward 2023.9.0 or newer; you have dask-awkward {}".format( + "Uproot 5.x can only be used with dask-awkward 2023.10.0a1 or newer; you have dask-awkward {}".format( dask_awkward.__version__ ) ) diff --git a/src/uproot/version.py b/src/uproot/version.py index 74738c2e6..0a777ab5e 100644 --- a/src/uproot/version.py +++ b/src/uproot/version.py @@ -12,7 +12,7 @@ import re -__version__ = "5.0.12" +__version__ = "5.1.0rc2" version = __version__ version_info = tuple(re.split(r"[-\.]", __version__)) diff --git a/src/uproot/writing/_cascadetree.py b/src/uproot/writing/_cascadetree.py index e8e53cd59..231b68df7 100644 --- a/src/uproot/writing/_cascadetree.py +++ b/src/uproot/writing/_cascadetree.py @@ -780,7 +780,7 @@ def extend(self, file, sink, data): "how did this pass the type check?\n\n" + repr(content) ) - big_endian = numpy.asarray(content, dtype=datum["dtype"]) + big_endian = numpy.asarray(content.data, dtype=datum["dtype"]) shape = tuple(shape) + big_endian.shape[1:] if shape[1:] != datum["shape"]: diff --git a/tests/test_0603-dask-delayed-open.py b/tests/test_0603-dask-delayed-open.py index ee076ff8a..8befaabd2 100644 --- a/tests/test_0603-dask-delayed-open.py +++ b/tests/test_0603-dask-delayed-open.py @@ -8,7 +8,7 @@ dask = pytest.importorskip("dask") da = pytest.importorskip("dask.array") -dask_awkward = pytest.importorskip("dask-awkward") +dask_awkward = pytest.importorskip("dask_awkward") def test_single_delay_open(): From 808baf77b15e7fac34ac1f015f7b2653aadc3fd8 Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Wed, 11 Oct 2023 14:50:31 +0100 Subject: [PATCH 02/10] chore: bump Python version (#980) * chore: bump Python version * remove python 3.7 from ci * add python 3.11 to matrix build * add python 3.12 * remove python 3.12 * remove python 3.11 from matrix build (already in vanilla build) * fix: pull out `.data` from `NumpyArray` * ci: test 3.12 * fix: don't require aiohttp for 3.12 * ci: fix line breaks Windows PS1 does not like this. * fix: deprecated `ast.Str` * fix: deprecated parameters to HTTPConnection * fix: check `node.value` * test: require aiohttp * fix: `ask.Num` is deprecated * fix: use `ast.Constant` in return * style: pre-commit fixes --------- Co-authored-by: Luis Antonio Obis Aparicio Co-authored-by: Jim Pivarski Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .github/workflows/build-test.yml | 6 +----- .pre-commit-config.yaml | 7 +++++++ pyproject.toml | 12 ++++++------ src/uproot/_dask.py | 4 ++-- src/uproot/extras.py | 7 +------ src/uproot/interpretation/identify.py | 12 +++++++----- src/uproot/language/python.py | 11 ++++++++--- src/uproot/source/http.py | 4 +--- tests/test_0692_fsspec.py | 2 ++ 9 files changed, 35 insertions(+), 30 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index f7fd78e76..83a86d9b6 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -16,10 +16,7 @@ jobs: fail-fast: false matrix: platform: ["windows-latest", "macos-latest", "ubuntu-latest"] - python-version: ["3.7", "3.8", "3.9", "3.10"] - exclude: - - platform: "windows-latest" - python-version: "3.7" + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] runs-on: "${{ matrix.platform }}" timeout-minutes: 30 @@ -73,7 +70,6 @@ jobs: runs-on: "${{ matrix.platform }}" timeout-minutes: 30 - steps: - uses: "actions/checkout@v4" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3f167b2e5..16347ae46 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -27,3 +27,10 @@ repos: hooks: - id: ruff args: ["--fix", "--show-fixes"] + + +- repo: https://github.com/asottile/pyupgrade + rev: v3.13.0 + hooks: + - id: pyupgrade + args: ["--py38-plus"] diff --git a/pyproject.toml b/pyproject.toml index 89c4667db..d7ae3aa8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ name = "uproot" description = "ROOT I/O in pure Python and NumPy." readme = "README.md" license = "BSD-3-Clause" -requires-python = ">=3.7" +requires-python = ">=3.8" authors = [ { name = "Jim Pivarski", email = "pivarski@princeton.edu" }, ] @@ -25,11 +25,11 @@ classifiers = [ "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Scientific/Engineering", "Topic :: Scientific/Engineering :: Information Analysis", "Topic :: Scientific/Engineering :: Mathematics", @@ -51,16 +51,16 @@ dynamic = [ [project.optional-dependencies] dev = [ "boost_histogram>=0.13", - "dask-awkward>=2023.10.0a1;python_version >= \"3.8\"", - "dask[array];python_version >= \"3.8\"", + "dask-awkward>=2023.9.0", + "dask[array]", "hist>=1.2", "pandas", - "awkward-pandas;python_version >= \"3.8\"", + "awkward-pandas", ] test = [ "lz4", "minio", - "aiohttp", + "aiohttp; python_version<\"3.12\"", "fsspec", "fsspec-xrootd", "pytest>=6", diff --git a/src/uproot/_dask.py b/src/uproot/_dask.py index 21009ee52..2a2a11f62 100644 --- a/src/uproot/_dask.py +++ b/src/uproot/_dask.py @@ -4,9 +4,9 @@ from collections.abc import Callable, Iterable, Mapping try: - from typing import TYPE_CHECKING + from typing import TYPE_CHECKING, Final - from typing_extensions import Any, Final, Protocol, TypeVar + from typing_extensions import Any, Protocol, TypeVar except ImportError: from typing import TYPE_CHECKING, Any, Final, Protocol, TypeVar diff --git a/src/uproot/extras.py b/src/uproot/extras.py index 39b6e8c18..cf26d7510 100644 --- a/src/uproot/extras.py +++ b/src/uproot/extras.py @@ -10,16 +10,11 @@ import atexit +import importlib.metadata as importlib_metadata import os -import sys from uproot._util import parse_version -if sys.version_info < (3, 8): - import importlib_metadata -else: - import importlib.metadata as importlib_metadata - def awkward(): """ diff --git a/src/uproot/interpretation/identify.py b/src/uproot/interpretation/identify.py index 13c50751d..1be3bdb79 100644 --- a/src/uproot/interpretation/identify.py +++ b/src/uproot/interpretation/identify.py @@ -14,6 +14,7 @@ import ast +import numbers import re import numpy @@ -164,15 +165,15 @@ def _float16_double32_walk_ast(node, branch, source): and isinstance(node.ctx, ast.Load) and node.id.lower() == "pi" ): - out = ast.Num(3.141592653589793) # TMath::Pi() + out = ast.Constant(3.141592653589793) # TMath::Pi() elif ( isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load) and node.id.lower() == "twopi" ): - out = ast.Num(6.283185307179586) # TMath::TwoPi() - elif isinstance(node, ast.Num): - out = ast.Num(float(node.n)) + out = ast.Constant(6.283185307179586) # TMath::TwoPi() + elif isinstance(node, ast.Constant) and isinstance(node.value, numbers.Number): + out = ast.Constant(float(node.value)) elif isinstance(node, ast.BinOp) and isinstance( node.op, (ast.Add, ast.Sub, ast.Mult, ast.Div) ): @@ -201,7 +202,8 @@ def _float16_double32_walk_ast(node, branch, source): isinstance(node, ast.List) and isinstance(node.ctx, ast.Load) and len(node.elts) == 3 - and isinstance(node.elts[2], ast.Num) + and isinstance(node.elts[2], ast.Constant) + and isinstance(node.elts[2].value, numbers.Number) ): out = ast.List( [ diff --git a/src/uproot/language/python.py b/src/uproot/language/python.py index 61ddcb751..8c0085ae6 100644 --- a/src/uproot/language/python.py +++ b/src/uproot/language/python.py @@ -57,8 +57,12 @@ def _walk_ast_yield_symbols(node, keys, aliases, functions, getter): and isinstance(node.func, ast.Name) and node.func.id == getter ): - if len(node.args) == 1 and isinstance(node.args[0], ast.Str): - yield node.args[0].s + if ( + len(node.args) == 1 + and isinstance(node.args[0], ast.Constant) + and isinstance(node.args[0].value, str) + ): + yield node.args[0].value else: raise TypeError( f"expected a constant string as the only argument of {getter!r}; " @@ -104,7 +108,8 @@ def _ast_as_branch_expression(node, keys, aliases, functions, getter): and isinstance(node.func, ast.Name) and node.func.id == getter and len(node.args) == 1 - and isinstance(node.args[0], ast.Str) + and isinstance(node.args[0], ast.Constant) + and isinstance(node.args[0].value, str) ): return node diff --git a/src/uproot/source/http.py b/src/uproot/source/http.py index 6278e0733..ecfd6d135 100644 --- a/src/uproot/source/http.py +++ b/src/uproot/source/http.py @@ -39,9 +39,7 @@ def make_connection(parsed_url, timeout): from http.client import HTTPConnection, HTTPSConnection if parsed_url.scheme == "https": - return HTTPSConnection( - parsed_url.hostname, parsed_url.port, None, None, timeout - ) + return HTTPSConnection(parsed_url.hostname, parsed_url.port, timeout=timeout) elif parsed_url.scheme == "http": return HTTPConnection(parsed_url.hostname, parsed_url.port, timeout) diff --git a/tests/test_0692_fsspec.py b/tests/test_0692_fsspec.py index e9c6adacf..eb6dd08c8 100644 --- a/tests/test_0692_fsspec.py +++ b/tests/test_0692_fsspec.py @@ -10,6 +10,8 @@ @pytest.mark.network def test_open_fsspec_http(): + pytest.importorskip("aiohttp") + with uproot.open( "https://github.com/scikit-hep/scikit-hep-testdata/raw/v0.4.33/src/skhep_testdata/data/uproot-issue121.root", http_handler=uproot.source.fsspec.FSSpecSource, From e133be69e5129744705104ff275cb444e31cac32 Mon Sep 17 00:00:00 2001 From: Jim Pivarski Date: Wed, 11 Oct 2023 08:52:06 -0500 Subject: [PATCH 03/10] The next version will be 5.1.0rc3. --- src/uproot/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/uproot/version.py b/src/uproot/version.py index 0a777ab5e..c5984ac81 100644 --- a/src/uproot/version.py +++ b/src/uproot/version.py @@ -12,7 +12,7 @@ import re -__version__ = "5.1.0rc2" +__version__ = "5.1.0rc3" version = __version__ version_info = tuple(re.split(r"[-\.]", __version__)) From 3522acc35e1025b17acfbedf539240d03da7e52f Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Wed, 11 Oct 2023 17:27:40 +0100 Subject: [PATCH 04/10] fix: require newer dask-awkward --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d7ae3aa8c..59c315667 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ dynamic = [ [project.optional-dependencies] dev = [ "boost_histogram>=0.13", - "dask-awkward>=2023.9.0", + "dask-awkward>=2023.10.0a1", "dask[array]", "hist>=1.2", "pandas", From 9f5e4c75ffb8fbb366c6c04ac56e8ecea7f523a5 Mon Sep 17 00:00:00 2001 From: Luis Antonio Obis Aparicio <35803280+lobis@users.noreply.github.com> Date: Thu, 12 Oct 2023 08:30:13 -0500 Subject: [PATCH 05/10] feat: improve `uproot.futures` compatibility with `concurrent.futures` (#983) * notifier return function compatible with futures `add_done_callback` api * rename argument `num_workers` to `max_workers` for compatibility with concurrent.futures api * add _max_workers attribute to class * Update src/uproot/source/futures.py Co-authored-by: Angus Hollands * update dev deps in attempt to fix pipeline * Revert "update dev deps in attempt to fix pipeline" This reverts commit 7f33b274e798529957e9d5d1874c21882687e6ac. --------- Co-authored-by: Angus Hollands --- src/uproot/source/chunk.py | 29 +++++++++++++++++++---------- src/uproot/source/futures.py | 28 +++++++++++++++++++--------- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/uproot/source/chunk.py b/src/uproot/source/chunk.py index 2d9b0d08c..398ccec22 100644 --- a/src/uproot/source/chunk.py +++ b/src/uproot/source/chunk.py @@ -10,8 +10,8 @@ :doc:`uproot.source.chunk.Source`, the primary types of the "physical layer." """ - import numbers +import queue import numpy @@ -58,7 +58,7 @@ def chunk(self, start, stop): :doc:`uproot.source.chunk.Chunk`. """ - def chunks(self, ranges, notifications): + def chunks(self, ranges, notifications: queue.Queue): """ Args: ranges (list of (int, int) 2-tuples): Intervals to fetch @@ -162,7 +162,7 @@ def chunk(self, start, stop): self._executor.submit(future) return chunk - def chunks(self, ranges, notifications): + def chunks(self, ranges, notifications: queue.Queue): self._num_requests += 1 self._num_requested_chunks += len(ranges) self._num_requested_bytes += sum(stop - start for start, stop in ranges) @@ -207,13 +207,6 @@ def __exit__(self, exception_type, exception_value, traceback): self._executor.__exit__(exception_type, exception_value, traceback) -def notifier(chunk, notifications): - def notify(): - notifications.put(chunk) - - return notify - - class Chunk: """ Args: @@ -457,3 +450,19 @@ def remainder(self, start, cursor, context): context, self._source.file_path, ) + + +def notifier(chunk: Chunk, notifications: queue.Queue): + """ + Returns a function that puts the chunk on the notifications queue when called. + The function has a 'future' argument to be compatible with the `concurrent.futures.Future.add_done_callback` method. + + Args: + chunk (:doc:`uproot.source.chunk.Chunk`): The chunk to put on the queue. + notifications (``queue.Queue``): The notifications queue. + """ + + def notify(future=None): + notifications.put(chunk) + + return notify diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index d74ad2f62..584be65b2 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -21,7 +21,6 @@ These classes implement a *subset* of Python's Future and Executor interfaces. """ - import os import queue import sys @@ -172,8 +171,9 @@ def run(self): class ThreadPoolExecutor: """ Args: - num_workers (None or int): The number of workers to start. If None, - use ``os.cpu_count()``. + max_workers (None or int): The maximum number of workers to start. + In the current implementation this is exactly the number of workers. + If None, use ``os.cpu_count()``. Like Python 3 ``concurrent.futures.ThreadPoolExecutor`` except that it has only the subset of the interface Uproot needs and is available in Python 2. @@ -182,18 +182,18 @@ class ThreadPoolExecutor: class. """ - def __init__(self, num_workers=None): - if num_workers is None: + def __init__(self, max_workers=None): + if max_workers is None: if hasattr(os, "cpu_count"): - num_workers = os.cpu_count() + self._max_workers = os.cpu_count() else: import multiprocessing - num_workers = multiprocessing.cpu_count() + self._max_workers = multiprocessing.cpu_count() self._work_queue = queue.Queue() self._workers = [] - for _ in range(num_workers): + for _ in range(self._max_workers): self._workers.append(Worker(self._work_queue)) for worker in self._workers: worker.start() @@ -204,7 +204,14 @@ def __repr__(self): ) @property - def num_workers(self): + def max_workers(self) -> int: + """ + The maximum number of workers. + """ + return self._max_workers + + @property + def num_workers(self) -> int: """ The number of workers. """ @@ -263,6 +270,9 @@ def __init__(self, task): self._notify = None def _set_notify(self, notify): + """ + Set the ``notify`` function that is called when this task is complete. + """ self._notify = notify def _set_excinfo(self, excinfo): From f999bd984ba5ce2362919c9520a775c850a44456 Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Thu, 12 Oct 2023 19:54:39 +0100 Subject: [PATCH 06/10] fix: catch `Content` objects (#986) --- src/uproot/_util.py | 14 ++++++++++---- src/uproot/writing/_cascadetree.py | 4 +++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/uproot/_util.py b/src/uproot/_util.py index 96d5bac92..b45b47ac0 100644 --- a/src/uproot/_util.py +++ b/src/uproot/_util.py @@ -77,12 +77,18 @@ def ensure_numpy(array, types=(numpy.bool_, numpy.integer, numpy.floating)): Returns an ``np.ndarray`` if ``array`` can be converted to an array of the desired type and raises TypeError if it cannot. """ + import uproot + + awkward = uproot.extras.awkward() with warnings.catch_warnings(): warnings.simplefilter("error", numpy.VisibleDeprecationWarning) - try: - out = numpy.asarray(array) - except (ValueError, numpy.VisibleDeprecationWarning) as err: - raise TypeError("cannot be converted to a NumPy array") from err + if isinstance(array, awkward.contents.Content): + out = awkward.to_numpy(array) + else: + try: + out = numpy.asarray(array) + except (ValueError, numpy.VisibleDeprecationWarning) as err: + raise TypeError("cannot be converted to a NumPy array") from err if not issubclass(out.dtype.type, types): raise TypeError(f"cannot be converted to a NumPy array of type {types}") return out diff --git a/src/uproot/writing/_cascadetree.py b/src/uproot/writing/_cascadetree.py index 231b68df7..80105f2c3 100644 --- a/src/uproot/writing/_cascadetree.py +++ b/src/uproot/writing/_cascadetree.py @@ -720,7 +720,9 @@ def extend(self, file, sink, data): ) ) else: - big_endian = numpy.asarray(branch_array, dtype=datum["dtype"]) + big_endian = uproot._util.ensure_numpy(branch_array).astype( + datum["dtype"] + ) if big_endian.shape != (len(branch_array),) + datum["shape"]: raise ValueError( "'extend' must fill branches with a consistent shape: has {}, trying to fill with {}".format( From 80718abb50a2c911eca5e179959b4f569b441beb Mon Sep 17 00:00:00 2001 From: Luis Antonio Obis Aparicio <35803280+lobis@users.noreply.github.com> Date: Thu, 12 Oct 2023 14:52:20 -0500 Subject: [PATCH 07/10] feat: Use a single `handler` argument on `uproot.reading.open` (#971) * single argument for handlers * style: pre-commit fixes * fix missing object source * add deprecation warnings for handler options * style: pre-commit fixes * use handler instead of *_handler * use file in skhep testdata * remove hyphen * use tag instead of branch (https://github.com/scikit-hep/uproot5/pull/973#discussion_r1346510055) * remove network pytest mark (https://github.com/scikit-hep/uproot5/pull/973#discussion_r1346504192) * use proper http links instead of local paths * use fsspec to split url * add comment * fix bad strip * working github test * add test for path split object * Update src/uproot/_util.py Co-authored-by: Jim Pivarski * use urllib instead of fsspec for url parsing * move tests to dedicated file * do not shadow `object` * add more test cases * fsspec source as default * fix test * add xrootd handler default * correctly strip obj * revert merge * Revert "revert merge" This reverts commit 60300d63b943083f0a2579917c7c76a24edb7bf7. * Revert "Revert "revert merge"" This reverts commit 2884e02d82b7eb8d57b2da1c28b5c7ac6a9d6bb8. * update docstrings * update docstrings * direct the user to the handler option in docs * explain order of handler options --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jim Pivarski --- src/uproot/_dask.py | 13 ++- src/uproot/_util.py | 104 ++++++++++++++++++ src/uproot/behaviors/TBranch.py | 26 +++-- src/uproot/reading.py | 34 +++--- ...st_0017-multi-basket-multi-branch-fetch.py | 24 ++-- tests/test_0692_fsspec.py | 10 +- 6 files changed, 160 insertions(+), 51 deletions(-) diff --git a/src/uproot/_dask.py b/src/uproot/_dask.py index 2a2a11f62..3d77d5f78 100644 --- a/src/uproot/_dask.py +++ b/src/uproot/_dask.py @@ -133,17 +133,18 @@ def dask( Options (type; default): - * file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`) - * xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`) - * s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`) - * http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`) - * object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`) + * handler (:doc:`uproot.source.chunk.Source` class; None) + * file_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * xrootd_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * s3_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * http_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * object_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) - * begin_chunk_size (memory_size; 512) + * begin_chunk_size (memory_size; 403, the smallest a ROOT file can be) * minimal_ttree_metadata (bool; True) Other file entry points: diff --git a/src/uproot/_util.py b/src/uproot/_util.py index b45b47ac0..bf946ea3d 100644 --- a/src/uproot/_util.py +++ b/src/uproot/_util.py @@ -315,22 +315,65 @@ def file_object_path_split(path): def file_path_to_source_class(file_path, options): """ Use a file path to get the :doc:`uproot.source.chunk.Source` class that would read it. + + Returns a tuple of (class, file_path) where the class is a subclass of :doc:`uproot.source.chunk.Source`. + + The "handler" option is the preferred way to specify a custom source class. + The "*_handler" options are for backwards compatibility and will override the "handler" option if set. """ import uproot.source.chunk file_path = regularize_path(file_path) + out = options["handler"] + if out is not None: + if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)): + raise TypeError( + "'handler' is not a class object inheriting from Source: " + repr(out) + ) + # check if "object_handler" is set + if ( + options["object_handler"] is not None + or options["file_handler"] is not None + or options["xrootd_handler"] is not None + or options["s3_handler"] is not None + or options["http_handler"] is not None + ): + # These options will override the "handler" option for backwards compatibility + warnings.warn( + """In version 5.2.0, the '*_handler' argument ('http_handler`, 's3_handler', etc.) will be removed from 'uproot.open'. Use 'handler' instead.""", + stacklevel=1, + ) + else: + return out, file_path + if ( not isstr(file_path) and hasattr(file_path, "read") and hasattr(file_path, "seek") ): out = options["object_handler"] + if out is None: + out = uproot.source.object.ObjectSource + else: + warnings.warn( + f"""In version 5.2.0, the 'object_handler' argument will be removed from 'uproot.open'. Use +uproot.open(..., handler={out!r}) +instead. + +To raise these warnings as errors (and get stack traces to find out where they're called), run +import warnings +warnings.filterwarnings("error", module="uproot.*") +after the first `import uproot` or use `@pytest.mark.filterwarnings("error:::uproot.*")` in pytest.""", + DeprecationWarning, + stacklevel=1, + ) if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)): raise TypeError( "'object_handler' is not a class object inheriting from Source: " + repr(out) ) + return out, file_path windows_absolute_path = None @@ -363,6 +406,22 @@ def file_path_to_source_class(file_path, options): file_path = windows_absolute_path out = options["file_handler"] + if out is None: + out = uproot.source.file.MemmapSource + else: + warnings.warn( + f"""In version 5.2.0, the 'file_handler' argument will be removed from 'uproot.open'. Use + uproot.open(..., handler={out!r} + instead. + + To raise these warnings as errors (and get stack traces to find out where they're called), run + import warnings + warnings.filterwarnings("error", module="uproot.*") + after the first `import uproot` or use `@pytest.mark.filterwarnings("error:::uproot.*")` in pytest.""", + DeprecationWarning, + stacklevel=1, + ) + if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)): raise TypeError( "'file_handler' is not a class object inheriting from Source: " @@ -372,6 +431,21 @@ def file_path_to_source_class(file_path, options): elif parsed_url.scheme.upper() == "ROOT": out = options["xrootd_handler"] + if out is None: + out = uproot.source.root.XRootDSource + else: + warnings.warn( + f"""In version 5.2.0, the 'xrootd_handler' argument will be removed from 'uproot.open'. Use + uproot.open(..., handler={out!r} + instead. + + To raise these warnings as errors (and get stack traces to find out where they're called), run + import warnings + warnings.filterwarnings("error", module="uproot.*") + after the first `import uproot` or use `@pytest.mark.filterwarnings("error:::uproot.*")` in pytest.""", + DeprecationWarning, + stacklevel=1, + ) if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)): raise TypeError( "'xrootd_handler' is not a class object inheriting from Source: " @@ -381,6 +455,21 @@ def file_path_to_source_class(file_path, options): elif parsed_url.scheme.upper() in {"S3"}: out = options["s3_handler"] + if out is None: + out = uproot.source.s3.S3Source + else: + warnings.warn( + f"""In version 5.2.0, the 's3_handler' argument will be removed from 'uproot.open'. Use +uproot.open(..., handler={out!r} +instead. + +To raise these warnings as errors (and get stack traces to find out where they're called), run +import warnings +warnings.filterwarnings("error", module="uproot.*") +after the first `import uproot` or use `@pytest.mark.filterwarnings("error:::uproot.*")` in pytest.""", + DeprecationWarning, + stacklevel=1, + ) if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)): raise TypeError( "'s3' is not a class object inheriting from Source: " + repr(out) @@ -389,6 +478,21 @@ def file_path_to_source_class(file_path, options): elif parsed_url.scheme.upper() in {"HTTP", "HTTPS"}: out = options["http_handler"] + if out is None: + out = uproot.source.http.HTTPSource + else: + warnings.warn( + f"""In version 5.2.0, the 'http_handler' argument will be removed from 'uproot.open'. Use +uproot.open(..., handler={out!r} +instead. + +To raise these warnings as errors (and get stack traces to find out where they're called), run +import warnings +warnings.filterwarnings("error", module="uproot.*") +after the first `import uproot` or use `@pytest.mark.filterwarnings("error:::uproot.*")` in pytest.""", + DeprecationWarning, + stacklevel=1, + ) if not (isinstance(out, type) and issubclass(out, uproot.source.chunk.Source)): raise TypeError( "'http_handler' is not a class object inheriting from Source: " diff --git a/src/uproot/behaviors/TBranch.py b/src/uproot/behaviors/TBranch.py index c277d3b30..bb66be634 100644 --- a/src/uproot/behaviors/TBranch.py +++ b/src/uproot/behaviors/TBranch.py @@ -152,17 +152,18 @@ def iterate( Options (type; default): - * file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`) - * xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`) - * s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`) - * http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`) - * object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`) + * handler (:doc:`uproot.source.chunk.Source` class; None) + * file_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * xrootd_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * s3_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * http_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * object_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) - * begin_chunk_size (memory_size; 512) + * begin_chunk_size (memory_size; 403, the smallest a ROOT file can be) * minimal_ttree_metadata (bool; True) See also :ref:`uproot.behaviors.TBranch.HasBranches.iterate` to iterate @@ -325,17 +326,18 @@ def concatenate( Options (type; default): - * file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`) - * xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`) - * s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`) - * http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`) - * object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`) + * handler (:doc:`uproot.source.chunk.Source` class; None) + * file_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * xrootd_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * s3_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * http_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * object_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) - * begin_chunk_size (memory_size; 512) + * begin_chunk_size (memory_size; 403, the smallest a ROOT file can be) * minimal_ttree_metadata (bool; True) Other file entry points: diff --git a/src/uproot/reading.py b/src/uproot/reading.py index 83d9487cb..2620245ed 100644 --- a/src/uproot/reading.py +++ b/src/uproot/reading.py @@ -8,7 +8,6 @@ and :doc:`uproot.reading.ReadOnlyKey` (``TKey``). """ - import struct import sys import uuid @@ -77,11 +76,12 @@ def open( Options (type; default): - * file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`) - * xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`) - * s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.s3.S3Source`) - * http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`) - * object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`) + * handler (:doc:`uproot.source.chunk.Source` class; None) + * file_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * xrootd_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * s3_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * http_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * object_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) @@ -178,10 +178,12 @@ def __getitem__(self, where): open.defaults = _OpenDefaults( { - "file_handler": uproot.source.file.MemmapSource, - "s3_handler": uproot.source.s3.S3Source, - "http_handler": uproot.source.http.HTTPSource, - "object_handler": uproot.source.object.ObjectSource, + "handler": None, # To be updated to fsspec source + "file_handler": None, # Deprecated + "s3_handler": None, # Deprecated + "http_handler": None, # Deprecated + "object_handler": None, # Deprecated + "xrootd_handler": None, # Deprecated "timeout": 30, "max_num_elements": None, "num_workers": 1, @@ -192,7 +194,6 @@ def __getitem__(self, where): } ) - must_be_attached = [ "TROOT", "TDirectory", @@ -534,11 +535,12 @@ class ReadOnlyFile(CommonFileMethods): Options (type; default): - * file_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.file.MemmapSource`) - * xrootd_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.XRootDSource`) - * s3_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.xrootd.S3Source`) - * http_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.http.HTTPSource`) - * object_handler (:doc:`uproot.source.chunk.Source` class; :doc:`uproot.source.object.ObjectSource`) + * handler (:doc:`uproot.source.chunk.Source` class; None) + * file_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * xrootd_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * s3_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * http_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) + * object_handler (:doc:`uproot.source.chunk.Source` class; None) (Deprecated: Use `handler` instead. If set, this will take precedence over `handler`) * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) diff --git a/tests/test_0017-multi-basket-multi-branch-fetch.py b/tests/test_0017-multi-basket-multi-branch-fetch.py index c8df24263..b42f6db4b 100644 --- a/tests/test_0017-multi-basket-multi-branch-fetch.py +++ b/tests/test_0017-multi-basket-multi-branch-fetch.py @@ -165,13 +165,13 @@ def test_ranges_or_baskets_to_arrays(): @pytest.mark.parametrize( - "file_handler", + "handler", [uproot.source.file.MultithreadedFileSource, uproot.source.file.MemmapSource], ) -def test_branch_array_1(file_handler): +def test_branch_array_1(handler): with uproot.open( skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"), - file_handler=file_handler, + handler=handler, )["sample/i4"] as branch: assert branch.array( uproot.interpretation.numerical.AsDtype(">i4"), library="np" @@ -210,13 +210,13 @@ def test_branch_array_1(file_handler): @pytest.mark.parametrize( - "file_handler", + "handler", [uproot.source.file.MultithreadedFileSource, uproot.source.file.MemmapSource], ) -def test_branch_array_2(file_handler): +def test_branch_array_2(handler): with uproot.open( skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"), - file_handler=file_handler, + handler=handler, )["sample/i4"] as branch: assert branch.array( uproot.interpretation.numerical.AsDtype(">i4"), @@ -250,14 +250,14 @@ def test_branch_array_2(file_handler): @pytest.mark.parametrize( - "file_handler", + "handler", [uproot.source.file.MultithreadedFileSource, uproot.source.file.MemmapSource], ) -def test_branch_array_3(file_handler): +def test_branch_array_3(handler): executor = uproot.ThreadPoolExecutor() with uproot.open( skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"), - file_handler=file_handler, + handler=handler, interpretation_executor=executor, decompression_executor=executor, )["sample/i4"] as branch: @@ -293,13 +293,13 @@ def test_branch_array_3(file_handler): @pytest.mark.parametrize( - "file_handler", + "handler", [uproot.source.file.MultithreadedFileSource, uproot.source.file.MemmapSource], ) -def test_branch_array_4(file_handler): +def test_branch_array_4(handler): with uproot.open( skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"), - file_handler=file_handler, + handler=handler, )["sample/i4"] as branch: with pytest.raises(ValueError): branch.array(uproot.interpretation.numerical.AsDtype(">i8"), library="np") diff --git a/tests/test_0692_fsspec.py b/tests/test_0692_fsspec.py index eb6dd08c8..c426673e7 100644 --- a/tests/test_0692_fsspec.py +++ b/tests/test_0692_fsspec.py @@ -14,7 +14,7 @@ def test_open_fsspec_http(): with uproot.open( "https://github.com/scikit-hep/scikit-hep-testdata/raw/v0.4.33/src/skhep_testdata/data/uproot-issue121.root", - http_handler=uproot.source.fsspec.FSSpecSource, + handler=uproot.source.fsspec.FSSpecSource, ) as f: data = f["Events/MET_pt"].array(library="np") assert len(data) == 40 @@ -27,7 +27,7 @@ def test_open_fsspec_github(): ) with uproot.open( "github://scikit-hep:scikit-hep-testdata@v0.4.33/src/skhep_testdata/data/uproot-issue121.root", - http_handler=uproot.source.fsspec.FSSpecSource, + handler=uproot.source.fsspec.FSSpecSource, ) as f: data = f["Events/MET_pt"].array(library="np") assert len(data) == 40 @@ -38,7 +38,7 @@ def test_open_fsspec_local(tmp_path): with uproot.open( local_path, - file_handler=uproot.source.fsspec.FSSpecSource, + handler=uproot.source.fsspec.FSSpecSource, ) as f: data = f["Events/MET_pt"].array(library="np") assert len(data) == 40 @@ -51,7 +51,7 @@ def test_open_fsspec_s3(): with uproot.open( "s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst", anon=True, - s3_handler=uproot.source.fsspec.FSSpecSource, + handler=uproot.source.fsspec.FSSpecSource, ) as f: data = f["Event/Event.mEventId"].array(library="np") assert len(data) == 8004 @@ -63,7 +63,7 @@ def test_open_fsspec_xrootd(): pytest.importorskip("XRootD") with uproot.open( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", - xrootd_handler=uproot.source.fsspec.FSSpecSource, + handler=uproot.source.fsspec.FSSpecSource, ) as f: data = f["Events/run"].array(library="np", entry_stop=20) assert len(data) == 20 From 9ea709be91b6d25b5e44cbbda0a216bffb840429 Mon Sep 17 00:00:00 2001 From: Luis Antonio Obis Aparicio <35803280+lobis@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:33:39 -0500 Subject: [PATCH 08/10] feat: `fsspec` source non-blocking chunks (#979) * attempt at non-blocking * call Future constructor correctly * working future * remove comment (https://github.com/fsspec/filesystem_spec/discussions/1388) * add test for chunks * add missing import * now chunks are retrieved correctly but is blocking * now it's non-blocking * remove print from test * remove timings from test * add executor to class (no cleanup) * cleanup executor * use concurrent thread pool * Update src/uproot/source/fsspec.py Co-authored-by: Jim Pivarski * made sources not pickleable * Revert "made sources not pickleable" This reverts commit fc2759f40f4c1677f30db7730064beffd1ac4423. * new callback * remove comment * add return type hint to `closed` and `num_bytes` * more return type hints * move "Chunk" class position in file to allow type hinting (no code changes) * add `add_done_callback` to TrivialFuture * remove tmp_path * call callback immediately in trivial future * Revert "move "Chunk" class position in file to allow type hinting (no code changes)" This reverts commit d122e2040b20ec02a15e95278569e7e49be18b1c. * import annotations from __future__, type hints * chunks return type hint * import skip for aiohttp * add annotations * add license string * remove check for http * test with "use_threads" enabled and disabled --------- Co-authored-by: Jim Pivarski --- src/uproot/source/chunk.py | 22 +++++++------ src/uproot/source/file.py | 14 ++++++--- src/uproot/source/fsspec.py | 61 +++++++++++++++++++++++++++--------- src/uproot/source/futures.py | 10 ++++-- src/uproot/source/http.py | 13 +++++--- src/uproot/source/object.py | 2 +- src/uproot/source/xrootd.py | 18 ++++++----- tests/test_0692_fsspec.py | 30 ++++++++++++++++-- 8 files changed, 122 insertions(+), 48 deletions(-) diff --git a/src/uproot/source/chunk.py b/src/uproot/source/chunk.py index 398ccec22..85905d8d4 100644 --- a/src/uproot/source/chunk.py +++ b/src/uproot/source/chunk.py @@ -10,6 +10,8 @@ :doc:`uproot.source.chunk.Source`, the primary types of the "physical layer." """ +from __future__ import annotations + import numbers import queue @@ -47,7 +49,7 @@ class Source: the file. """ - def chunk(self, start, stop): + def chunk(self, start, stop) -> Chunk: """ Args: start (int): Seek position of the first byte to include. @@ -58,7 +60,7 @@ def chunk(self, start, stop): :doc:`uproot.source.chunk.Chunk`. """ - def chunks(self, ranges, notifications: queue.Queue): + def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]: """ Args: ranges (list of (int, int) 2-tuples): Intervals to fetch @@ -95,21 +97,21 @@ def file_path(self): return self._file_path @property - def num_bytes(self): + def num_bytes(self) -> int: """ The number of bytes in the file. """ return self._num_bytes @property - def num_requests(self): + def num_requests(self) -> int: """ The number of requests that have been made (performance counter). """ return self._num_requests @property - def num_requested_chunks(self): + def num_requested_chunks(self) -> int: """ The number of :doc:`uproot.source.chunk.Chunk` objects that have been requested (performance counter). @@ -117,7 +119,7 @@ def num_requested_chunks(self): return self._num_requested_chunks @property - def num_requested_bytes(self): + def num_requested_bytes(self) -> int: """ The number of bytes that have been requested (performance counter). """ @@ -130,7 +132,7 @@ def close(self): self.__exit__(None, None, None) @property - def closed(self): + def closed(self) -> bool: """ True if the associated file/connection/thread pool is closed; False otherwise. @@ -152,7 +154,7 @@ def __repr__(self): type(self).__name__, path, self.num_workers, id(self) ) - def chunk(self, start, stop): + def chunk(self, start, stop) -> Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start @@ -162,7 +164,7 @@ def chunk(self, start, stop): self._executor.submit(future) return chunk - def chunks(self, ranges, notifications: queue.Queue): + def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]: self._num_requests += 1 self._num_requested_chunks += len(ranges) self._num_requested_bytes += sum(stop - start for start, stop in ranges) @@ -192,7 +194,7 @@ def num_workers(self): return self._executor.num_workers @property - def closed(self): + def closed(self) -> bool: """ True if the :doc:`uproot.source.futures.ResourceThreadPoolExecutor` has been shut down and the file handles have been closed. diff --git a/src/uproot/source/file.py b/src/uproot/source/file.py index f5c0630db..feae3d0b9 100644 --- a/src/uproot/source/file.py +++ b/src/uproot/source/file.py @@ -12,8 +12,10 @@ :doc:`uproot.source.file.MultithreadedFileSource` is an automatic fallback. """ +from __future__ import annotations import os.path +import queue import numpy @@ -45,7 +47,7 @@ def file(self): return self._file @property - def closed(self): + def closed(self) -> bool: return self._file.closed def __enter__(self): @@ -137,7 +139,7 @@ def __repr__(self): fallback = " with fallback" return f"<{type(self).__name__} {path}{fallback} at 0x{id(self):012x}>" - def chunk(self, start, stop): + def chunk(self, start, stop) -> uproot.source.chunk.Chunk: if self._fallback is None: if self.closed: raise OSError(f"memmap is closed for file {self._file_path}") @@ -153,7 +155,9 @@ def chunk(self, start, stop): else: return self._fallback.chunk(start, stop) - def chunks(self, ranges, notifications): + def chunks( + self, ranges, notifications: queue.Queue + ) -> list[uproot.source.chunk.Chunk]: if self._fallback is None: if self.closed: raise OSError(f"memmap is closed for file {self._file_path}") @@ -195,7 +199,7 @@ def fallback(self): return self._fallback @property - def closed(self): + def closed(self) -> bool: if self._fallback is None: return self._file._mmap.closed else: @@ -219,7 +223,7 @@ def __exit__(self, exception_type, exception_value, traceback): self._fallback.__exit__(exception_type, exception_value, traceback) @property - def num_bytes(self): + def num_bytes(self) -> int: if self._fallback is None: return self._file._mmap.size() else: diff --git a/src/uproot/source/fsspec.py b/src/uproot/source/fsspec.py index 4ebb8889b..7f4680f3a 100644 --- a/src/uproot/source/fsspec.py +++ b/src/uproot/source/fsspec.py @@ -1,4 +1,13 @@ +# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE + +from __future__ import annotations + +import concurrent.futures +import queue + +import uproot import uproot.source.chunk +import uproot.source.futures class FSSpecSource(uproot.source.chunk.Source): @@ -13,16 +22,28 @@ class FSSpecSource(uproot.source.chunk.Source): to get many chunks in one request. """ - def __init__(self, file_path, **kwargs): + def __init__(self, file_path, **options): import fsspec.core + default_options = uproot.reading.open.defaults + self._use_threads = options.get("use_threads", default_options["use_threads"]) + self._num_workers = options.get("num_workers", default_options["num_workers"]) + # TODO: is timeout always valid? # Remove uproot-specific options (should be done earlier) - exclude_keys = set(uproot.reading.open.defaults.keys()) - opts = {k: v for k, v in kwargs.items() if k not in exclude_keys} + exclude_keys = set(default_options.keys()) + opts = {k: v for k, v in options.items() if k not in exclude_keys} self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts) + + if self._use_threads: + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=self._num_workers + ) + else: + self._executor = uproot.source.futures.TrivialExecutor() + # TODO: set mode to "read-only" in a way that works for all filesystems self._file = self._fs.open(self._file_path) self._fh = None @@ -37,15 +58,25 @@ def __repr__(self): path = repr("..." + self._file_path[-10:]) return f"<{type(self).__name__} {path} at 0x{id(self):012x}>" + def __getstate__(self): + state = dict(self.__dict__) + state.pop("_executor") + return state + + def __setstate__(self, state): + self.__dict__ = state + self._open() + def __enter__(self): self._fh = self._file.__enter__() return self def __exit__(self, exception_type, exception_value, traceback): self._fh = None + self._executor.shutdown() self._file.__exit__(exception_type, exception_value, traceback) - def chunk(self, start, stop): + def chunk(self, start, stop) -> uproot.source.chunk.Chunk: """ Args: start (int): Seek position of the first byte to include. @@ -66,7 +97,9 @@ def chunk(self, start, stop): future = uproot.source.futures.TrivialFuture(data) return uproot.source.chunk.Chunk(self, start, stop, future) - def chunks(self, ranges, notifications): + def chunks( + self, ranges, notifications: queue.Queue + ) -> list[uproot.source.chunk.Chunk]: """ Args: ranges (list of (int, int) 2-tuples): Intervals to fetch @@ -97,28 +130,26 @@ def chunks(self, ranges, notifications): self._num_requests += 1 self._num_requested_chunks += len(ranges) self._num_requested_bytes += sum(stop - start for start, stop in ranges) - data = self._fs.cat_ranges( - [self._file_path] * len(ranges), - [start for start, _ in ranges], - [stop for _, stop in ranges], - ) + chunks = [] - for item, (start, stop) in zip(data, ranges): - future = uproot.source.futures.TrivialFuture(item) + for start, stop in ranges: + future = self._executor.submit( + self._fs.cat_file, self._file_path, start, stop + ) chunk = uproot.source.chunk.Chunk(self, start, stop, future) - uproot.source.chunk.notifier(chunk, notifications)() + future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications)) chunks.append(chunk) return chunks @property - def num_bytes(self): + def num_bytes(self) -> int: """ The number of bytes in the file. """ return self._fs.size(self._file_path) @property - def closed(self): + def closed(self) -> bool: """ True if the associated file/connection/thread pool is closed; False otherwise. diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index 584be65b2..e5bca5d53 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -47,6 +47,12 @@ class TrivialFuture: def __init__(self, result): self._result = result + def add_done_callback(self, callback, *, context=None): + """ + The callback is called immediately. + """ + callback(self) + def result(self, timeout=None): """ The result of this (Trivial)Future. @@ -389,7 +395,7 @@ def close(self): self.__exit__(None, None, None) @property - def closed(self): + def closed(self) -> bool: """ True if the :doc:`uproot.source.futures.ResourceWorker` threads have been stopped and their @@ -449,7 +455,7 @@ def close(self): self.__exit__(None, None, None) @property - def closed(self): + def closed(self) -> bool: """ True if the :doc:`uproot.source.futures.ResourceTrivialExecutor` has been stopped. diff --git a/src/uproot/source/http.py b/src/uproot/source/http.py index ecfd6d135..ac80cf038 100644 --- a/src/uproot/source/http.py +++ b/src/uproot/source/http.py @@ -14,6 +14,7 @@ Despite the name, both sources support secure HTTPS (selected by URL scheme). """ +from __future__ import annotations import base64 import queue @@ -593,7 +594,7 @@ def __repr__(self): fallback = " with fallback" return f"<{type(self).__name__} {path}{fallback} at 0x{id(self):012x}>" - def chunk(self, start, stop): + def chunk(self, start, stop) -> uproot.source.chunk.Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start @@ -603,7 +604,9 @@ def chunk(self, start, stop): self._executor.submit(future) return chunk - def chunks(self, ranges, notifications): + def chunks( + self, ranges, notifications: queue.Queue + ) -> list[uproot.source.chunk.Chunk]: if self._fallback is None: self._num_requests += 1 self._num_requested_chunks += len(ranges) @@ -639,7 +642,7 @@ def executor(self): return self._executor @property - def closed(self): + def closed(self) -> bool: return self._executor.closed def __enter__(self): @@ -658,7 +661,7 @@ def timeout(self): return self._timeout @property - def num_bytes(self): + def num_bytes(self) -> int: if self._num_bytes is None: self._num_bytes = get_num_bytes( self._file_path, self.parsed_url, self._timeout @@ -756,7 +759,7 @@ def timeout(self): return self._timeout @property - def num_bytes(self): + def num_bytes(self) -> int: if self._num_bytes is None: self._num_bytes = get_num_bytes( self._file_path, self.parsed_url, self._timeout diff --git a/src/uproot/source/object.py b/src/uproot/source/object.py index 019d1f57f..25d02a74a 100644 --- a/src/uproot/source/object.py +++ b/src/uproot/source/object.py @@ -40,7 +40,7 @@ def obj(self): return self._obj @property - def closed(self): + def closed(self) -> bool: return getattr(self._obj, "closed", False) def __enter__(self): diff --git a/src/uproot/source/xrootd.py b/src/uproot/source/xrootd.py index be0897b17..0efd8d883 100644 --- a/src/uproot/source/xrootd.py +++ b/src/uproot/source/xrootd.py @@ -10,8 +10,10 @@ :doc:`uproot.source.xrootd.MultithreadedXRootDSource`. """ +from __future__ import annotations import contextlib +import queue import sys import uproot @@ -141,14 +143,14 @@ def file(self): return self._file @property - def num_bytes(self): + def num_bytes(self) -> int: status, info = self._file.stat(self._xrd_timeout()) if status.error: self._xrd_error(status) return info.size @property - def closed(self): + def closed(self) -> bool: return not self._file.is_open() def __enter__(self): @@ -313,7 +315,7 @@ def __repr__(self): path = repr("..." + self._file_path[-10:]) return f"<{type(self).__name__} {path} at 0x{id(self):012x}>" - def chunk(self, start, stop): + def chunk(self, start, stop) -> uproot.source.chunk.Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start @@ -322,7 +324,9 @@ def chunk(self, start, stop): future = uproot.source.futures.TrivialFuture(data) return uproot.source.chunk.Chunk(self, start, stop, future) - def chunks(self, ranges, notifications): + def chunks( + self, ranges, notifications: queue.Queue + ) -> list[uproot.source.chunk.Chunk]: self._num_requests += 1 self._num_requested_chunks += len(ranges) self._num_requested_bytes += sum(stop - start for start, stop in ranges) @@ -422,7 +426,7 @@ def file(self): return self._resource.file @property - def closed(self): + def closed(self) -> bool: return self._resource.closed def __enter__(self): @@ -433,7 +437,7 @@ def __exit__(self, exception_type, exception_value, traceback): self._resource.__exit__(exception_type, exception_value, traceback) @property - def num_bytes(self): + def num_bytes(self) -> int: if self._num_bytes is None: self._num_bytes = self._resource.num_bytes return self._num_bytes @@ -493,7 +497,7 @@ def timeout(self): return self._timeout @property - def num_bytes(self): + def num_bytes(self) -> int: if self._num_bytes is None: self._num_bytes = self._executor.workers[0].resource.num_bytes return self._num_bytes diff --git a/tests/test_0692_fsspec.py b/tests/test_0692_fsspec.py index c426673e7..255db6524 100644 --- a/tests/test_0692_fsspec.py +++ b/tests/test_0692_fsspec.py @@ -1,20 +1,22 @@ # BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE import pytest - import uproot import uproot.source.fsspec import skhep_testdata +import queue @pytest.mark.network -def test_open_fsspec_http(): +@pytest.mark.parametrize("use_threads", [True, False]) +def test_open_fsspec_http(use_threads): pytest.importorskip("aiohttp") with uproot.open( "https://github.com/scikit-hep/scikit-hep-testdata/raw/v0.4.33/src/skhep_testdata/data/uproot-issue121.root", handler=uproot.source.fsspec.FSSpecSource, + use_threads=use_threads, ) as f: data = f["Events/MET_pt"].array(library="np") assert len(data) == 40 @@ -33,12 +35,14 @@ def test_open_fsspec_github(): assert len(data) == 40 -def test_open_fsspec_local(tmp_path): +@pytest.mark.parametrize("use_threads", [True, False]) +def test_open_fsspec_local(use_threads): local_path = skhep_testdata.data_path("uproot-issue121.root") with uproot.open( local_path, handler=uproot.source.fsspec.FSSpecSource, + use_threads=use_threads, ) as f: data = f["Events/MET_pt"].array(library="np") assert len(data) == 40 @@ -68,3 +72,23 @@ def test_open_fsspec_xrootd(): data = f["Events/run"].array(library="np", entry_stop=20) assert len(data) == 20 assert (data == 194778).all() + + +@pytest.mark.network +def test_fsspec_chunks(): + pytest.importorskip("aiohttp") + + url = "https://github.com/scikit-hep/scikit-hep-testdata/raw/v0.4.33/src/skhep_testdata/data/uproot-issue121.root" + + notifications = queue.Queue() + with uproot.source.fsspec.FSSpecSource(url) as source: + chunks = source.chunks( + [(0, 100), (50, 55), (200, 400)], notifications=notifications + ) + expected = {(chunk.start, chunk.stop): chunk for chunk in chunks} + while len(expected) > 0: + chunk = notifications.get() + expected.pop((chunk.start, chunk.stop)) + + chunk_data_sum = {sum(chunk.raw_data) for chunk in chunks} + assert chunk_data_sum == {3967, 413, 10985}, "Chunk data does not match" From 80e392c26bb0724b22ef131deb2441fbbeb9259c Mon Sep 17 00:00:00 2001 From: Jim Pivarski Date: Fri, 13 Oct 2023 12:40:29 -0500 Subject: [PATCH 09/10] Increase minimum versions of awkward and dask-awkward; improve message. --- pyproject.toml | 4 ++-- src/uproot/extras.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 59c315667..622e95125 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ classifiers = [ "Topic :: Utilities", ] dependencies = [ - "awkward>=2.4.5", + "awkward>=2.4.6", "importlib-metadata;python_version<\"3.8\"", "numpy", "packaging", @@ -51,7 +51,7 @@ dynamic = [ [project.optional-dependencies] dev = [ "boost_histogram>=0.13", - "dask-awkward>=2023.10.0a1", + "dask-awkward>=2023.10.0", "dask[array]", "hist>=1.2", "pandas", diff --git a/src/uproot/extras.py b/src/uproot/extras.py index cf26d7510..495a63cc4 100644 --- a/src/uproot/extras.py +++ b/src/uproot/extras.py @@ -32,11 +32,11 @@ def awkward(): to output as NumPy arrays, rather than Awkward arrays. """ ) from err - if parse_version(awkward.__version__) >= parse_version("2.4.5"): + if parse_version(awkward.__version__) >= parse_version("2.4.6"): return awkward else: raise ModuleNotFoundError( - "Uproot 5.x can only be used with Awkward 2.4.5 or newer; you have Awkward {}".format( + "Uproot 5.1+ can only be used with Awkward 2.4.6 or newer; you have Awkward {}".format( awkward.__version__ ) ) @@ -320,11 +320,11 @@ def dask_awkward(): or conda install -c conda-forge dask dask-awkward""" ) from err - if parse_version(dask_awkward.__version__) >= parse_version("2023.10.0a1"): + if parse_version(dask_awkward.__version__) >= parse_version("2023.10.0"): return dask_awkward else: raise ModuleNotFoundError( - "Uproot 5.x can only be used with dask-awkward 2023.10.0a1 or newer; you have dask-awkward {}".format( + "Uproot 5.1+ can only be used with dask-awkward 2023.10.0 or newer; you have dask-awkward {}".format( dask_awkward.__version__ ) ) From 491d330bb09f0be6dd448eec0c5443b72674649b Mon Sep 17 00:00:00 2001 From: Jim Pivarski Date: Fri, 13 Oct 2023 12:43:27 -0500 Subject: [PATCH 10/10] The next release will be 5.1.0rc4. --- src/uproot/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/uproot/version.py b/src/uproot/version.py index c5984ac81..33ee69d05 100644 --- a/src/uproot/version.py +++ b/src/uproot/version.py @@ -12,7 +12,7 @@ import re -__version__ = "5.1.0rc3" +__version__ = "5.1.0rc4" version = __version__ version_info = tuple(re.split(r"[-\.]", __version__))