Skip to content

Commit

Permalink
feat: unblock cache_to_disk with separate executors for each decorated
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 20, 2024
1 parent 761ce6a commit babdc81
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions eth_portfolio/_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import inspect
from asyncio import PriorityQueue, get_event_loop, sleep
from concurrent.futures import Executor
from hashlib import md5
from logging import getLogger
from os import makedirs
Expand All @@ -17,7 +18,8 @@
from brownie import chain

BASE_PATH = f"./cache/{chain.id}/"
EXECUTOR = PruningThreadPoolExecutor(32, "eth-portfolio-cache-decorator")
_THREAD_NAME_PREFIX = "eth-portfolio-cache-decorator"
_EXISTS_EXECUTOR = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-exists")


def cache_to_disk(fn: Callable[P, T]) -> Callable[P, T]:
Expand All @@ -31,18 +33,21 @@ def get_cache_file_path(args, kwargs):
key = md5(dumps((args, sorted(kwargs.items())))).hexdigest()
return join(cache_path_for_fn, f"{key}.json")

write_executor = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-write")

makedirs(cache_path_for_fn, exist_ok=True)

if inspect.iscoroutinefunction(fn):

read_executor = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-read")

queue = PriorityQueue()

@log_broken
async def cache_deco_worker_coro(func) -> NoReturn:
while True:
_, fut, cache_path, args, kwargs = await queue.get()
try:
async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f:
async with _aio_open(cache_path, "rb", executor=read_executor) as f:
fut.set_result(loads(await f.read()))
except Exception as e:
fut.set_exception(e)
Expand All @@ -52,7 +57,7 @@ async def cache_deco_worker_coro(func) -> NoReturn:
@functools.wraps(fn)
async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
cache_path = get_cache_file_path(args, kwargs)
if await EXECUTOR.run(exists, cache_path):
if await _EXISTS_EXECUTOR.run(exists, cache_path):
fut = get_event_loop().create_future()
# we intentionally mix up the order to break up heavy load block ranges
queue.put_nowait((random(), fut, cache_path, args, kwargs))
Expand All @@ -65,7 +70,7 @@ async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:

async_result: T = await fn(*args, **kwargs)
try:
await __cache_write(cache_path, async_result)
await __cache_write(cache_path, async_result, write_executor)
except OSError as e:
# I was having some weird issues in docker that I don't want to debug,
# so I'm going to assume you have another means to let you know you're
Expand All @@ -89,7 +94,7 @@ def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
sync_result: T = fn(*args, **kwargs) # type: ignore [assignment, return-value]
try:
create_task(
coro=__cache_write(cache_path, sync_result),
coro=__cache_write(cache_path, sync_result, write_executor),
skip_gc_until_done=True,
)
except RuntimeError:
Expand All @@ -99,7 +104,7 @@ def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
return disk_cache_wrap


async def __cache_write(cache_path: str, result: Any) -> None:
async def __cache_write(cache_path: str, result: Any, executor: Executor) -> None:
result = dumps(result)
async with _aio_open(cache_path, "wb", executor=EXECUTOR) as f:
async with _aio_open(cache_path, "wb", executor=executor) as f:
await f.write(result)

0 comments on commit babdc81

Please sign in to comment.