Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fsspec source non-blocking chunks #979

Merged
merged 36 commits into from
Oct 12, 2023
Merged

feat: fsspec source non-blocking chunks #979

merged 36 commits into from
Oct 12, 2023

Conversation

lobis
Copy link
Collaborator

@lobis lobis commented Oct 6, 2023

This PR adds a non-blocking implementation of the chunks interface.

It uses the AbstractFileSystem.cat_file api to get a byte range of the file. This api should only request the selected range (as opposed to the whole file, then slicing). In order to make sure this is the case I opened a discussion here.

It also adds some type annotations to the uproot.source files.

@lobis lobis marked this pull request as ready for review October 9, 2023 14:14
@lobis lobis requested review from jpivarski and nsmith- October 9, 2023 14:14
@lobis lobis mentioned this pull request Oct 6, 2023
Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually make the Source non-blocking (see below).

It could help to see how these chunks are going to be used. Calls to the singular chunk method are all over the rest-of-Uproot codebase, but most of those use the single chunk immediately after requesting it, so it's not very important for them to be non-blocking, but they're non-blocking anyway.

The plural chunks method is only used by the array-fetching loop:

notifications = queue.Queue()
branchid_arrays = {}
branchid_num_baskets = {}
ranges = []
range_args = {}
range_original_index = {}
original_index = 0
branchid_to_branch = {}
for cache_key in branchid_interpretation:
branchid_num_baskets[cache_key] = 0
for branch, basket_num, range_or_basket in ranges_or_baskets:
branchid_num_baskets[branch.cache_key] += 1
if branch.cache_key not in branchid_arrays:
branchid_arrays[branch.cache_key] = {}
if isinstance(range_or_basket, tuple) and len(range_or_basket) == 2:
range_or_basket = ( # noqa: PLW2901 (overwriting range_or_basket)
int(range_or_basket[0]),
int(range_or_basket[1]),
)
ranges.append(range_or_basket)
range_args[range_or_basket] = (branch, basket_num)
range_original_index[range_or_basket] = original_index
else:
notifications.put(range_or_basket)
original_index += 1
branchid_to_branch[branch.cache_key] = branch
for cache_key, interpretation in branchid_interpretation.items():
if branchid_num_baskets[cache_key] == 0 and cache_key not in arrays:
arrays[cache_key] = interpretation.final_array(
{}, 0, 0, [0], library, None, interp_options
)
# check for CannotBeAwkward errors on the main thread before reading any data
if isinstance(library, uproot.interpretation.library.Awkward) and isinstance(
interpretation, uproot.interpretation.objects.AsObjects
):
branchid_to_branch[cache_key]._awkward_check(interpretation)
hasbranches._file.source.chunks(ranges, notifications=notifications)
def replace(ranges_or_baskets, original_index, basket):
branch, basket_num, range_or_basket = ranges_or_baskets[original_index]
ranges_or_baskets[original_index] = branch, basket_num, basket
def chunk_to_basket(chunk, branch, basket_num):
try:
cursor = uproot.source.cursor.Cursor(chunk.start)
basket = uproot.models.TBasket.Model_TBasket.read(
chunk,
cursor,
{"basket_num": basket_num},
hasbranches._file,
hasbranches._file,
branch,
)
original_index = range_original_index[(chunk.start, chunk.stop)]
if update_ranges_or_baskets:
replace(ranges_or_baskets, original_index, basket)
except Exception:
notifications.put(sys.exc_info())
else:
notifications.put(basket)
# all threads (if multithreaded), per branch, share a thread-local context for Forth
forth_context = {x: threading.local() for x in branchid_interpretation}
def basket_to_array(basket):
try:
assert basket.basket_num is not None
branch = basket.parent
interpretation = branchid_interpretation[branch.cache_key]
basket_arrays = branchid_arrays[branch.cache_key]
context = dict(branch.context)
context["forth"] = forth_context[branch.cache_key]
basket_arrays[basket.basket_num] = interpretation.basket_array(
basket.data,
basket.byte_offsets,
basket,
branch,
context,
basket.member("fKeylen"),
library,
interp_options,
)
if basket.num_entries != len(basket_arrays[basket.basket_num]):
raise ValueError(
"""basket {} in tree/branch {} has the wrong number of entries """
"""(expected {}, obtained {}) when interpreted as {}
in file {}""".format(
basket.basket_num,
branch.object_path,
basket.num_entries,
len(basket_arrays[basket.basket_num]),
interpretation,
branch.file.file_path,
)
)
basket = None
if len(basket_arrays) == branchid_num_baskets[branch.cache_key]:
arrays[branch.cache_key] = interpretation.final_array(
basket_arrays,
entry_start,
entry_stop,
branch.entry_offsets,
library,
branch,
interp_options,
)
# no longer needed, save memory
basket_arrays.clear()
except Exception:
notifications.put(sys.exc_info())
else:
notifications.put(None)
while len(arrays) < len(branchid_interpretation):
obj = notifications.get()
if isinstance(obj, uproot.source.chunk.Chunk):
args = range_args[(obj.start, obj.stop)]
decompression_executor.submit(chunk_to_basket, obj, *args)
elif isinstance(obj, uproot.models.TBasket.Model_TBasket):
interpretation_executor.submit(basket_to_array, obj)
elif obj is None:
pass
elif isinstance(obj, tuple) and len(obj) == 3:
uproot.source.futures.delayed_raise(*obj)
else:
raise AssertionError(obj)
obj = None # release before blocking

The notifications queue is the central object of the event loop. Although the Source is only responsible for putting Chunk objects on that queue, the same queue is used for all steps of array-building:

  1. start-stop pair of seek positions → Chunk
  2. Chunkuproot.models.TBasket, which is chunk_to_basket
  3. uproot.models.TBasket → per-basket array, which is basket_to_array

Outside of the loop, the per-basket arrays are concatenated into one array, which is returned to the user.

The event loop itself is this while loop.

So—what you want to do is return not-necessarily-filled Chunks from the chunks method and instrument the response of a request-response pair with a notifier function. As soon as a response comes in, call the notifier so that the Chunk goes onto the notifications queue, and the above code will get past the

obj = notifications.get()

line.

src/uproot/source/fsspec.py Outdated Show resolved Hide resolved
src/uproot/source/fsspec.py Outdated Show resolved Hide resolved
src/uproot/source/fsspec.py Outdated Show resolved Hide resolved
tests/test_0692_fsspec.py Outdated Show resolved Hide resolved
@lobis lobis requested a review from jpivarski October 10, 2023 16:06
@lobis
Copy link
Collaborator Author

lobis commented Oct 10, 2023

The PR is again ready to be reviewed.

Now it should work as intended, the chunks method should be non-blocking.

I have some questions regarding the management of the executor. Currently I am creating a local executor in the chunks method but perhaps I should do proper lifecycle management in a similar fashion to what is being done in the http source.

Looks like doing the lifecycle management would mean refactoring the current code though, since my ResourceThreadPoolExecutor is constructed from the requested byte ranges...

Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes down to a fundamental question: what is fsspec's scope? How much does it do? There's this cat_file interface, which does the whole process from URL to file bytes, but it's a blocking interface. Is there an async interface? If so, does it have a way to attach a function to call when the async function is done?

If so, then FSSpecSource doesn't need to have an executor. Suppose there's a function

FS.async_cat_file(path: str, start: int, stop: int) -> concurrent.futures.Future

then you could use it like this:

for item, (start, stop) in zip(data, ranges):
    future = self._fs.async_cat_file(self._file_path, start, stop)
    chunk = uproot.source.chunk.Chunk(self, start, stop, future)
    future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
    chunks.append(chunk)

(although I think that add_done_callback expects the function to take 1 argument and our notifier takes 0 arguments, but these things are easy to work around). If the fsspec interface uses asyncio.Future instead of concurrent.futures.Future (which we designed around), then it would be a little different, which could be done by wrapping it with the interface we expect (i.e. future.result() instead of await future).

So... what can fsspec do for us?

src/uproot/source/fsspec.py Outdated Show resolved Hide resolved
@lobis
Copy link
Collaborator Author

lobis commented Oct 10, 2023

If so, then FSSpecSource doesn't need to have an executor

I don't understand this, don't you always need some kind of executor to run futures?

@jpivarski
Copy link
Member

The executor might be inside of the fsspec library. I don't know that—I'm just thinking it might be, and if it is, we should take advantage of it.

Also, it might use async instead of an executor, but the effect would be the same: we might be able to get it to do the concurrent waiting-for-I/O for us.

@lobis
Copy link
Collaborator Author

lobis commented Oct 10, 2023

This comes down to a fundamental question: what is fsspec's scope? How much does it do? There's this cat_file interface, which does the whole process from URL to file bytes, but it's a blocking interface. Is there an async interface? If so, does it have a way to attach a function to call when the async function is done?

From my understanding fsspec supports an async interface only for the http file system. This is enabled by passing the option asynchronous=True on creation. If this option is set to true, some methods of the filesystem will become asynchronous (such as cat_file). Running in asynchronous mode would need an event loop.

I imagine it would be possible to work with the sync filesystem but use an async one inside the chunks method so all the asyncio logic will be contained, otherwise it can get complicated. I will look into this.

Regarding this PR: I think it's done (unless there is some mistake) as currently the chunks interface is non-blocking. Depending on the complexity of leveraging the fsspec asyncio interface I will include it into this PR, a separate PR, or do nothing.

Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good. Does it depend on #983?

src/uproot/source/fsspec.py Outdated Show resolved Hide resolved
src/uproot/source/fsspec.py Show resolved Hide resolved
@lobis
Copy link
Collaborator Author

lobis commented Oct 11, 2023

This is looking good. Does it depend on #983?

It does not.

@lobis lobis changed the base branch from main to main-v510 October 12, 2023 13:22
@jpivarski
Copy link
Member

test_0692_fsspec.py::test_fsspec_chunks needs

pytest.importorskip("aiohttp")

in the failing function. That's a non-strict dependency, so any of our tests that use it have to be skipped when the package isn't available.

This one failed because it's Python 3.12 and presumably the package isn't available yet, so I'm glad we got this chance to check.

If there are any tests that need fsspec or another non-strict dependency to run, those tests need to have pytest.importorskip, either in the individual function or at the top of the Python file for module-level.

@jpivarski
Copy link
Member

I think file.py needs from __future__ import annotations.

@lobis lobis merged commit 9ea709b into main-v510 Oct 12, 2023
@lobis lobis deleted the fsspec-chunks branch October 12, 2023 20:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants