diff --git a/eth_portfolio/_cache.py b/eth_portfolio/_cache.py index b18210aa..344ea6a2 100644 --- a/eth_portfolio/_cache.py +++ b/eth_portfolio/_cache.py @@ -1,6 +1,6 @@ import functools import inspect -from asyncio import Queue, create_task +from asyncio import Queue from hashlib import md5 from logging import getLogger from os import makedirs @@ -37,27 +37,12 @@ def get_cache_file_path(args, kwargs): async def cache_deco_worker_coro() -> NoReturn: try: while True: - fut, args, kwargs = await queue.get() + fut, cache_path, args, kwargs = await queue.get() try: - cache_path = get_cache_file_path(args, kwargs) + cache_path = if await EXECUTOR.run(exists, cache_path): - async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f: - try: - return loads(await f.read()) - except EOFError: - pass - - async_result: T = await fn(*args, **kwargs) - try: - await __cache_write(cache_path, async_result) - except OSError as e: - # I was having some weird issues in docker that I don't want to debug, - # so I'm going to assume you have another means to let you know you're - # out of disk space and will pass right on through here so my script - # can continue - if e.strerror != "No space left on device": - raise - fut.set_result(async_result) + async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f: + fut.set_result(loads(await f.read())) except Exception as e: fut.set_result(e) except Exception as e: @@ -65,13 +50,29 @@ async def cache_deco_worker_coro() -> NoReturn: logger.exception(e) raise - workers = tuple(create_task(cache_deco_worker_coro()) for _ in range(5000)) + workers = tuple(create_task(cache_deco_worker_coro()) for _ in range(1000)) @functools.wraps(fn) async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: - fut = get_event_loop().create_future() - queue.put_nowait((fut, args, kwargs)) - return await fut + cache_path = get_cache_file_path(args, kwargs) + if await EXECUTOR.run(exists, cache_path): + fut = get_event_loop().create_future() + queue.put_nowait((fut, cache_path, args, kwargs)) + try: + return await fut + except EOFError: + pass + + async_result: T = await fn(*args, **kwargs) + try: + await __cache_write(cache_path, async_result) + except OSError as e: + # I was having some weird issues in docker that I don't want to debug, + # so I'm going to assume you have another means to let you know you're + # out of disk space and will pass right on through here so my script + # can continue + if e.strerror != "No space left on device": + raise else: