Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
do not use thread pool executor, use loop executor instead
Browse files Browse the repository at this point in the history
lobis committed Oct 17, 2023

Verified

This commit was signed with the committer’s verified signature.
lobis Luis Antonio Obis Aparicio
1 parent ffbd7cd commit ba7a6bf
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@

from __future__ import annotations

import concurrent.futures
import queue

import uproot
@@ -27,7 +26,6 @@ def __init__(self, file_path, **options):

default_options = uproot.reading.open.defaults
self._use_threads = options.get("use_threads", default_options["use_threads"])
self._num_workers = options.get("num_workers", default_options["num_workers"])

# TODO: is timeout always valid?

@@ -38,22 +36,17 @@ def __init__(self, file_path, **options):
self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)

if self._use_threads:
if self._fs.async_impl:
self._executor = uproot.source.futures.LoopExecutor()

# Bind the loop to the filesystem
async def make_fs():
return fsspec.filesystem(
protocol=self._fs.protocol, loop=self._executor.loop
)

self._fs = self._executor.submit(make_fs).result()
assert self._fs.loop is self._executor.loop, "loop not bound"
assert self._fs.loop.is_running(), "loop not running"
else:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
self._executor = uproot.source.futures.LoopExecutor()

# Bind the loop to the filesystem
async def make_fs():
return fsspec.filesystem(
protocol=self._fs.protocol, loop=self._executor.loop
)

self._fs = self._executor.submit(make_fs).result()
assert self._executor.loop is self._executor.loop, "loop not bound"
assert self._executor.loop.is_running(), "loop not running"
else:
self._executor = uproot.source.futures.TrivialExecutor()

@@ -145,11 +138,18 @@ def chunks(
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

chunks = []
# _cat_file is async while cat_file is not
use_async = self._fs.async_impl and isinstance(
self._executor, uproot.source.futures.LoopExecutor
)
cat_file = self._fs._cat_file if use_async else self._fs.cat_file

async def cat_file_wrapped(*args):
# wrapper for the filesystems without async_impl
return await self._executor.loop.run_in_executor(
None, self._fs.cat_file, *args
)

# The fsspec source will always use the LoopExecutor if `use_threads` is True,
# this means that the `cat_file` method must be async, so we wrap it in case the filesystem is not async-capable
cat_file_async = self._fs._cat_file if self._fs.async_impl else cat_file_wrapped
cat_file = cat_file_async if self._use_threads else self._fs.cat_file

for start, stop in ranges:
future = self._executor.submit(cat_file, self._file_path, start, stop)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)

0 comments on commit ba7a6bf

Please sign in to comment.