Skip to content

Commit

Permalink
Update address.py
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 16, 2024
1 parent 8c8f37c commit c5b9568
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from aiohttp import ClientResponseError
from async_lru import alru_cache
from dank_mids.eth import TraceFilterParams
from eth_typing import ChecksumAddress
from evmspec import FilterTrace
from evmspec.structs.receipt import Status
from evmspec.structs.trace import call, reward
Expand Down Expand Up @@ -475,29 +476,33 @@ def _ensure_workers(self, num_workers: int) -> None:
len_workers = len(self._workers)
if len_workers < num_workers:
logger.info("ensuring %s workers for %s", num_workers, self)
queue = self._queue
ready = self._ready

create_task = asyncio.create_task
worker_coro = self.__worker_coro
worker_fn = self.__worker_fn
address = self.address
load_prices = self.load_prices
queue_get = stuck_coro_debugger(self._queue.get)
put_ready = self._ready.put_nowait

self._workers.extend(
create_task(worker_coro(queue, ready)) for _ in range(num_workers - len_workers)
create_task(worker_fn(address, load_prices, queue_get, put_ready)) for _ in range(num_workers - len_workers)
)
logger.info(f"{self} workers: {self._workers}")

@staticmethod
async def __worker_coro(queue: asyncio.Queue, ready_queue: asyncio.Queue) -> NoReturn:
address = self.address
load_prices = self.load_prices
get_next_job = queue.get
put_result = ready_queue.put_nowait

async def __worker_fn(
address: ChecksumAddress,
load_prices: bool,
queue_get: Callable[[], Nonce],
put_ready: Callable[[Nonce, Optional[structs.Transaction]], None],
) -> NoReturn:
try:
while True:
nonce = await get_next_job()
nonce = await queue_get()
try:
put_result(await load_transaction(address, nonce, load_prices))
put_ready(await load_transaction(address, nonce, load_prices))
except Exception as e:
put_result((nonce, e))
put_ready((nonce, e))
except Exception as e:
logger.error(f"%s in %s __worker_coro", type(e), self)
logger.exception(e)
Expand Down

0 comments on commit c5b9568

Please sign in to comment.