diff --git a/CHANGES b/CHANGES index a656fc6..ba652f0 100644 --- a/CHANGES +++ b/CHANGES @@ -10,6 +10,12 @@ may have moved code there. Many features of Curio were simply higher-level modules implemented on top of an existing core and can be added back to your code without much effort. -- Dave +04/11/2024 Removed undocumented ProcessPool and ThreadPool names. + +04/11/2024 Removed block_in_thread(). This functionality can be + replicated by protecting calls to run_in_thread() with a + semaphore. + 04/10/2024 Removed run_in_executor(). If you want to wait on work submitted to a traditional executor (from concurrent.futures), use curio.traps._future_wait(). diff --git a/curio/queue.py b/curio/queue.py index 5f084c4..cd46b62 100644 --- a/curio/queue.py +++ b/curio/queue.py @@ -293,7 +293,7 @@ def join_sync(self): @awaitable(join_sync) async def join(self): - await workers.block_in_thread(self.join_sync) + return await workers.run_in_thread(self.join_sync) @asyncioable(join) async def join(self): diff --git a/curio/workers.py b/curio/workers.py index c553341..e86f948 100644 --- a/curio/workers.py +++ b/curio/workers.py @@ -1,10 +1,9 @@ # curio/workers.py # # Functions for performing work outside of curio. This includes -# running functions in threads, processes, and executors from the -# concurrent.futures module. +# running functions in threads and processes. -__all__ = ['run_in_thread', 'run_in_process', 'block_in_thread'] +__all__ = ['run_in_thread', 'run_in_process'] # -- Standard Library @@ -13,7 +12,6 @@ import threading import traceback import signal -from collections import Counter, defaultdict # -- Curio @@ -28,7 +26,6 @@ # as originating from curio as opposed to multiprocessing.pool). class RemoteTraceback(Exception): - def __init__(self, tb): self.tb = tb @@ -37,7 +34,6 @@ def __str__(self): class ExceptionWithTraceback: - def __init__(self, exc, tb): tb = traceback.format_exception(type(exc), exc, tb) tb = ''.join(tb) @@ -72,6 +68,7 @@ async def run_in_thread(callable, *args, call_on_cancel=None): executed. If it start running, it will run fully to completion as a kind of zombie. ''' + assert call_on_cancel is None, call_on_cancel worker = None try: worker = await reserve_thread_worker() @@ -80,85 +77,6 @@ async def run_in_thread(callable, *args, call_on_cancel=None): if worker: await worker.release() -# Support for blocking in threads. -# -# Discussion: -# -# The run_in_thread() function can be used to run any synchronous function -# in a separate thread. However, certain kinds of operations are -# inherently unsafe. For example, consider a worker task that wants -# to wait on a threading Event like this: -# -# evt = threading.Event() # Foreign Event... -# -# async def worker(): -# await run_in_thread(evt.wait) -# print('Alive!') -# -# Now suppose Curio spins up a huge number of workers: -# -# for n in range(1000): -# await spawn(worker()) -# -# At this point, you're in a bad situation. The worker tasks have all -# called run_in_thread() and are blocked indefinitely. Because the -# pool of worker threads is limited, you've exhausted all available -# resources. Nobody can now call run_in_thread() without blocking. -# There's a pretty good chance that your code is permanently -# deadlocked. There are dark clouds. -# -# This problem can be solved by wrapping run_in_thread() with a -# semaphore. Like this: -# -# _barrier = curio.Semaphore() -# -# async def worker(): -# async with _barrier: -# await run_in_thread(evt.wait) -# -# However, to make it much more convenient, we can take care of -# a lot of fiddly details. We can cache the requested callable, -# build a set of semaphores and synchronize things in the background. -# That's what the block_in_thread() function is doing. For example: -# -# async def worker(): -# await block_in_thread(evt.wait) -# print('Alive!') -# -# Unlike run_in_thread(), spawning up 1000 workers creates a -# situation where only 1 worker is actually blocked in a thread. -# The other 999 workers are blocked on a semaphore waiting for service. - -_pending = Counter() -_barrier = defaultdict(sync.Semaphore) - -async def block_in_thread(callable, *args, call_on_cancel=None): - ''' - Run callable(*args) in a thread with the expectation that the - operation is going to block for an indeterminate amount of time. - Guarantees that at most only one background thread is used - regardless of how many curio tasks are actually waiting on the - same callable (e.g., if 1000 Curio tasks all decide to call - block_on_thread on the same callable, they'll all be handled by a - single thread). Primary use of this function is on foreign locks, - queues, and other synchronization primitives where you have to use - a thread, but you just don't have any idea when the operation will - complete. - ''' - if hasattr(callable, '__self__'): - call_key = (callable.__name__, id(callable.__self__)) - else: - call_key = id(callable) - _pending[call_key] += 1 - async with _barrier[call_key]: - try: - return await run_in_thread(callable, *args, call_on_cancel=call_on_cancel) - finally: - _pending[call_key] -= 1 - if not _pending[call_key]: - del _pending[call_key] - del _barrier[call_key] - MAX_WORKER_PROCESSES = multiprocessing.cpu_count() @@ -436,7 +354,6 @@ async def apply(self, func, args=()): # control over the whole process of how workers get managed. class WorkerPool(object): - def __init__(self, workercls, nworkers): self.nworkers = sync.Semaphore(nworkers) self.workercls = workercls @@ -459,7 +376,3 @@ async def release(self, worker): self.workers.append(worker) await self.nworkers.release() - -# Pool definitions should anyone want to use them directly -ProcessPool = lambda nworkers: WorkerPool(ProcessWorker, nworkers) -ThreadPool = lambda nworkers: WorkerPool(ThreadWorker, nworkers)