Skip to content

Commit

Permalink
feat: replace run_in_executor with executor.run to reduce overhead (#148
Browse files Browse the repository at this point in the history
)

* feat: replace run_in_executor with executor.run to reduce overhead

* chore: `black .`

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Dec 16, 2024
1 parent 28b2ade commit eb54f35
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
13 changes: 6 additions & 7 deletions eth_portfolio/_cache.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import functools
import inspect
import os
from hashlib import md5
from os import makedirs
from os.path import exists, join
from pickle import dumps, load, loads
from typing import Any
from typing import Any, Callable

from a_sync import PruningThreadPoolExecutor
from a_sync._typing import AnyFn, P, T
from a_sync._typing import P, T
from a_sync.asyncio import create_task
from aiofiles import open as _aio_open
from aiofiles.ospath import exists as _aio_path_exists
from brownie import chain

BASE_PATH = f"./cache/{chain.id}/"
EXECUTOR = PruningThreadPoolExecutor(32)


def cache_to_disk(fn: AnyFn[P, T]) -> AnyFn[P, T]:
def cache_to_disk(fn: Callable[P, T]) -> Callable[P, T]:
# sourcery skip: use-contextlib-suppress
cache_path_for_fn = f"{BASE_PATH}{fn.__module__.replace('.', '/')}/{fn.__name__}"

def get_cache_file_path(args, kwargs):
Expand All @@ -33,7 +32,7 @@ def get_cache_file_path(args, kwargs):
@functools.wraps(fn)
async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
cache_path = get_cache_file_path(args, kwargs)
if await _aio_path_exists(cache_path, executor=EXECUTOR):
if await EXECUTOR.run(exists, cache_path):
async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f:
try:
return loads(await f.read())
Expand All @@ -59,7 +58,7 @@ def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
sync_result: T = fn(*args, **kwargs) # type: ignore [assignment, return-value]
try:
create_task(
coro=__cache_write(cache_path, result),
coro=__cache_write(cache_path, sync_result),
skip_gc_until_done=True,
)
except RuntimeError:
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/_ydb/token_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def yield_thru_block(self, block) -> AsyncIterator["Task[TokenTransfer]"]:
async for task in self._objects_thru(block=block):
yield task
return

_logger_log(DEBUG, "%s yielding all objects thru block %s", (self, block))
async for task in self._objects_thru(block=block):
_logger_log(
Expand Down

0 comments on commit eb54f35

Please sign in to comment.