From 379a7a9875f0a50d515e886f2545c7cb0ee6a7ff Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Thu, 19 Dec 2024 01:08:44 -0400 Subject: [PATCH] Update _cache.py --- eth_portfolio/_cache.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/eth_portfolio/_cache.py b/eth_portfolio/_cache.py index 61dcc49a..3ff3bb0f 100644 --- a/eth_portfolio/_cache.py +++ b/eth_portfolio/_cache.py @@ -11,6 +11,7 @@ from a_sync import PruningThreadPoolExecutor from a_sync._typing import P, T from a_sync.asyncio import create_task +from a_sync.primitives.queue import log_broken from aiofiles import open as _aio_open from brownie import chain @@ -34,19 +35,15 @@ def get_cache_file_path(args, kwargs): if inspect.iscoroutinefunction(fn): queue = Queue() - async def cache_deco_worker_coro() -> NoReturn: - try: - while True: - fut, cache_path, args, kwargs = await queue.get() - try: - async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f: - fut.set_result(loads(await f.read())) - except Exception as e: - fut.set_result(e) - except Exception as e: - logger.error(f"{type(e).__name__} in {fn}:") - logger.exception(e) - raise + @log_broken + async def cache_deco_worker_coro(func) -> NoReturn: + while True: + fut, cache_path, args, kwargs = await queue.get() + try: + async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f: + fut.set_result(loads(await f.read())) + except Exception as e: + fut.set_result(e) workers = [] @@ -56,7 +53,7 @@ async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: fut = get_event_loop().create_future() queue.put_nowait((fut, cache_path, args, kwargs)) if not workers: - workers.extend(create_task(cache_deco_worker_coro()) for _ in range(1000)) + workers.extend(create_task(cache_deco_worker_coro(fn)) for _ in range(1000)) try: return await fut except (FileNotFoundError, EOFError):