Skip to content

Commit

Permalink
Revert "do not use thread pool executor, use loop executor instead"
Browse files Browse the repository at this point in the history
This reverts commit ba7a6bf.
  • Loading branch information
lobis committed Oct 17, 2023
1 parent ba7a6bf commit 80049db
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import concurrent.futures
import queue

import uproot
Expand All @@ -26,6 +27,7 @@ def __init__(self, file_path, **options):

default_options = uproot.reading.open.defaults
self._use_threads = options.get("use_threads", default_options["use_threads"])
self._num_workers = options.get("num_workers", default_options["num_workers"])

# TODO: is timeout always valid?

Expand All @@ -36,17 +38,22 @@ def __init__(self, file_path, **options):
self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)

if self._use_threads:
self._executor = uproot.source.futures.LoopExecutor()

# Bind the loop to the filesystem
async def make_fs():
return fsspec.filesystem(
protocol=self._fs.protocol, loop=self._executor.loop
if self._fs.async_impl:
self._executor = uproot.source.futures.LoopExecutor()

# Bind the loop to the filesystem
async def make_fs():
return fsspec.filesystem(
protocol=self._fs.protocol, loop=self._executor.loop
)

self._fs = self._executor.submit(make_fs).result()
assert self._fs.loop is self._executor.loop, "loop not bound"
assert self._fs.loop.is_running(), "loop not running"
else:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)

self._fs = self._executor.submit(make_fs).result()
assert self._executor.loop is self._executor.loop, "loop not bound"
assert self._executor.loop.is_running(), "loop not running"
else:
self._executor = uproot.source.futures.TrivialExecutor()

Expand Down Expand Up @@ -138,18 +145,11 @@ def chunks(
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

chunks = []

async def cat_file_wrapped(*args):
# wrapper for the filesystems without async_impl
return await self._executor.loop.run_in_executor(
None, self._fs.cat_file, *args
)

# The fsspec source will always use the LoopExecutor if `use_threads` is True,
# this means that the `cat_file` method must be async, so we wrap it in case the filesystem is not async-capable
cat_file_async = self._fs._cat_file if self._fs.async_impl else cat_file_wrapped
cat_file = cat_file_async if self._use_threads else self._fs.cat_file

# _cat_file is async while cat_file is not
use_async = self._fs.async_impl and isinstance(
self._executor, uproot.source.futures.LoopExecutor
)
cat_file = self._fs._cat_file if use_async else self._fs.cat_file
for start, stop in ranges:
future = self._executor.submit(cat_file, self._file_path, start, stop)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
Expand Down

0 comments on commit 80049db

Please sign in to comment.