Skip to content

Commit

Permalink
Update _cache.py
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 19, 2024
1 parent 558b73b commit 379a7a9
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions eth_portfolio/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from a_sync import PruningThreadPoolExecutor
from a_sync._typing import P, T
from a_sync.asyncio import create_task
from a_sync.primitives.queue import log_broken
from aiofiles import open as _aio_open
from brownie import chain

Expand All @@ -34,19 +35,15 @@ def get_cache_file_path(args, kwargs):
if inspect.iscoroutinefunction(fn):
queue = Queue()

async def cache_deco_worker_coro() -> NoReturn:
try:
while True:
fut, cache_path, args, kwargs = await queue.get()
try:
async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f:
fut.set_result(loads(await f.read()))
except Exception as e:
fut.set_result(e)
except Exception as e:
logger.error(f"{type(e).__name__} in {fn}:")
logger.exception(e)
raise
@log_broken
async def cache_deco_worker_coro(func) -> NoReturn:
while True:
fut, cache_path, args, kwargs = await queue.get()
try:
async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f:
fut.set_result(loads(await f.read()))
except Exception as e:
fut.set_result(e)

workers = []

Expand All @@ -56,7 +53,7 @@ async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
fut = get_event_loop().create_future()
queue.put_nowait((fut, cache_path, args, kwargs))
if not workers:
workers.extend(create_task(cache_deco_worker_coro()) for _ in range(1000))
workers.extend(create_task(cache_deco_worker_coro(fn)) for _ in range(1000))
try:
return await fut
except (FileNotFoundError, EOFError):
Expand Down

0 comments on commit 379a7a9

Please sign in to comment.