diff --git a/eth_portfolio/_cache.py b/eth_portfolio/_cache.py index 1ca879b0..01fe4edb 100644 --- a/eth_portfolio/_cache.py +++ b/eth_portfolio/_cache.py @@ -1,6 +1,7 @@ import functools import inspect from asyncio import PriorityQueue, get_event_loop, sleep +from concurrent.futures import Executor from hashlib import md5 from logging import getLogger from os import makedirs @@ -17,7 +18,8 @@ from brownie import chain BASE_PATH = f"./cache/{chain.id}/" -EXECUTOR = PruningThreadPoolExecutor(32, "eth-portfolio-cache-decorator") +_THREAD_NAME_PREFIX = "eth-portfolio-cache-decorator" +_EXISTS_EXECUTOR = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-exists") def cache_to_disk(fn: Callable[P, T]) -> Callable[P, T]: @@ -31,10 +33,13 @@ def get_cache_file_path(args, kwargs): key = md5(dumps((args, sorted(kwargs.items())))).hexdigest() return join(cache_path_for_fn, f"{key}.json") + write_executor = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-write") + makedirs(cache_path_for_fn, exist_ok=True) if inspect.iscoroutinefunction(fn): - + read_executor = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-read") + queue = PriorityQueue() @log_broken @@ -42,7 +47,7 @@ async def cache_deco_worker_coro(func) -> NoReturn: while True: _, fut, cache_path, args, kwargs = await queue.get() try: - async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f: + async with _aio_open(cache_path, "rb", executor=read_executor) as f: fut.set_result(loads(await f.read())) except Exception as e: fut.set_exception(e) @@ -52,7 +57,7 @@ async def cache_deco_worker_coro(func) -> NoReturn: @functools.wraps(fn) async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: cache_path = get_cache_file_path(args, kwargs) - if await EXECUTOR.run(exists, cache_path): + if await _EXISTS_EXECUTOR.run(exists, cache_path): fut = get_event_loop().create_future() # we intentionally mix up the order to break up heavy load block ranges queue.put_nowait((random(), fut, cache_path, args, kwargs)) @@ -65,7 +70,7 @@ async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: async_result: T = await fn(*args, **kwargs) try: - await __cache_write(cache_path, async_result) + await __cache_write(cache_path, async_result, write_executor) except OSError as e: # I was having some weird issues in docker that I don't want to debug, # so I'm going to assume you have another means to let you know you're @@ -89,7 +94,7 @@ def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: sync_result: T = fn(*args, **kwargs) # type: ignore [assignment, return-value] try: create_task( - coro=__cache_write(cache_path, sync_result), + coro=__cache_write(cache_path, sync_result, write_executor), skip_gc_until_done=True, ) except RuntimeError: @@ -99,7 +104,7 @@ def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: return disk_cache_wrap -async def __cache_write(cache_path: str, result: Any) -> None: +async def __cache_write(cache_path: str, result: Any, executor: Executor) -> None: result = dumps(result) - async with _aio_open(cache_path, "wb", executor=EXECUTOR) as f: + async with _aio_open(cache_path, "wb", executor=executor) as f: await f.write(result)