From 548da7c6db9666b6effc2a6f8ed7a08574cd4040 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Mon, 16 Dec 2024 18:31:10 -0400 Subject: [PATCH] feat: use queue instead of semaphore for transaction loading (#142) * feat: use queue instead of semaphore for transaction loading faster, less blocking, less ram usage * feat: AddressLedgerBase repr * fix: AddressTransactionLedger.__stop_workers * chore: reduce num workers 50k -> 25k * chore: `black .` --------- Co-authored-by: github-actions[bot] --- eth_portfolio/_ledgers/address.py | 85 ++++++++++++-- eth_portfolio/_loaders/transaction.py | 158 ++++++++++++++------------ eth_portfolio/address.py | 15 ++- eth_portfolio/portfolio.py | 79 ++++++++----- 4 files changed, 222 insertions(+), 115 deletions(-) diff --git a/eth_portfolio/_ledgers/address.py b/eth_portfolio/_ledgers/address.py index b0d10de5..2a4cf285 100644 --- a/eth_portfolio/_ledgers/address.py +++ b/eth_portfolio/_ledgers/address.py @@ -19,8 +19,10 @@ TYPE_CHECKING, AsyncGenerator, AsyncIterator, + Callable, Generic, List, + NoReturn, Optional, Tuple, Type, @@ -34,10 +36,12 @@ from aiohttp import ClientResponseError from async_lru import alru_cache from dank_mids.eth import TraceFilterParams +from eth_typing import ChecksumAddress from evmspec import FilterTrace from evmspec.structs.receipt import Status from evmspec.structs.trace import call, reward from pandas import DataFrame # type: ignore +from tqdm import tqdm from y import ERC20 from y._decorators import stuck_coro_debugger from y.datatypes import Block @@ -46,7 +50,7 @@ from eth_portfolio import _exceptions, _loaders from eth_portfolio._cache import cache_to_disk from eth_portfolio._decorators import set_end_block_if_none -from eth_portfolio._loaders.transaction import get_nonce_at_block +from eth_portfolio._loaders.transaction import get_nonce_at_block, load_transaction from eth_portfolio._utils import PandableList, _AiterMixin, get_buffered_chain_height from eth_portfolio._ydb.token_transfers import TokenTransfers from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction @@ -146,6 +150,9 @@ def __hash__(self) -> int: """ return hash(self.address) + def __repr__(self) -> str: + return f"<{type(self).__name__} for {self.address} at {hex(id(self))}>" + @abc.abstractproperty def _list_type(self) -> Type[_LedgerEntryList]: """ @@ -387,15 +394,18 @@ def _df(self) -> DataFrame: return df +Nonce = int + + class AddressTransactionsLedger(AddressLedgerBase[TransactionsList, Transaction]): """ A ledger for managing transaction entries. """ _list_type = TransactionsList - __slots__ = ("cached_thru_nonce",) + __slots__ = ("cached_thru_nonce", "_queue", "_ready", "_num_workers", "_workers") - def __init__(self, portfolio_address: "PortfolioAddress"): + def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 25_000): """ Initializes the AddressTransactionsLedger instance. @@ -407,6 +417,13 @@ def __init__(self, portfolio_address: "PortfolioAddress"): """ The nonce through which all transactions have been loaded into memory. """ + self._queue = asyncio.Queue() + self._ready = asyncio.Queue() + self._num_workers = num_workers + self._workers = [] + + def __del__(self) -> None: + self.__stop_workers() @set_end_block_if_none @stuck_coro_debugger @@ -425,16 +442,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T return end_block_nonce: int = await get_nonce_at_block(self.address, end_block) if nonces := list(range(self.cached_thru_nonce + 1, end_block_nonce + 1)): - coros = [ - _loaders.load_transaction(self.address, nonce, self.load_prices) for nonce in nonces - ] + for nonce in nonces: + self._queue.put_nowait(nonce) + + self._ensure_workers(min(len(nonces), self._num_workers)) transactions = [] - transaction: Transaction - async for nonce, transaction in a_sync.as_completed( # type: ignore [assignment] - coros, aiter=True, tqdm=True, desc=f"Transactions {self.address}" - ): + transaction: Optional[Transaction] + for _ in tqdm(nonces, desc=f"Transactions {self.address}"): + nonce, transaction = await self._ready.get() if transaction: + if isinstance(transaction, Exception): + raise transaction transactions.append(transaction) yield transaction elif nonce == 0 and self.cached_thru_nonce == -1: @@ -444,6 +463,8 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T # NOTE Are we sure this is the correct way to handle this scenario? Are we sure it will ever even occur with the new gnosis handling? logger.warning("No transaction with nonce %s for %s", nonce, self.address) + self.__stop_workers() + if transactions: self.objects.extend(transactions) if self.objects: @@ -455,6 +476,50 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T if self.cached_thru is None or end_block > self.cached_thru: self.cached_thru = end_block + def _ensure_workers(self, num_workers: int) -> None: + len_workers = len(self._workers) + if len_workers < num_workers: + logger.info("ensuring %s workers for %s", num_workers, self) + + create_task = asyncio.create_task + worker_fn = self.__worker_fn + address = self.address + load_prices = self.load_prices + queue_get = stuck_coro_debugger(self._queue.get) + put_ready = self._ready.put_nowait + + self._workers.extend( + create_task(worker_fn(address, load_prices, queue_get, put_ready)) + for _ in range(num_workers - len_workers) + ) + logger.info(f"{self} workers: {self._workers}") + + @staticmethod + async def __worker_fn( + address: ChecksumAddress, + load_prices: bool, + queue_get: Callable[[], Nonce], + put_ready: Callable[[Nonce, Optional[Transaction]], None], + ) -> NoReturn: + try: + while True: + nonce = await queue_get() + try: + put_ready(await load_transaction(address, nonce, load_prices)) + except Exception as e: + put_ready((nonce, e)) + except Exception as e: + logger.error(f"%s in %s __worker_coro", type(e), self) + logger.exception(e) + raise + + def __stop_workers(self) -> None: + logger.info("stopping workers for %s", self) + workers = self._workers + pop_next = workers.pop + for _ in range(len(workers)): + pop_next().cancel() + class InternalTransfersList(PandableList[InternalTransfer]): """ diff --git a/eth_portfolio/_loaders/transaction.py b/eth_portfolio/_loaders/transaction.py index 086b7f7b..7f2f7202 100644 --- a/eth_portfolio/_loaders/transaction.py +++ b/eth_portfolio/_loaders/transaction.py @@ -7,6 +7,7 @@ """ import asyncio +import itertools import logging from collections import defaultdict from typing import DefaultDict, Dict, List, Optional, Tuple @@ -39,8 +40,6 @@ nonces: DefaultDict[Address, Nonces] = defaultdict(lambda: defaultdict(int)) -_semaphore = a_sync.Semaphore(10_000, name="load transaction db semaphore") - @eth_retry.auto_retry @stuck_coro_debugger @@ -66,12 +65,11 @@ async def load_transaction( >>> print(await load_transaction(address="0x1234567890abcdef1234567890abcdef12345678", nonce=5, load_prices=True)) (5, Transaction(...)) """ - async with _semaphore: - if transaction := await db.get_transaction(address, nonce): - if load_prices and transaction.price is None: - await db.delete_transaction(transaction) - else: - return nonce, transaction + if transaction := await db.get_transaction(address, nonce): + if load_prices and transaction.price is None: + await db.delete_transaction(transaction) + else: + return nonce, transaction block = await get_block_for_nonce(address, nonce) tx = await get_transaction_by_nonce_and_block(address, nonce, block) @@ -96,78 +94,90 @@ async def load_transaction( return nonce, transaction -_nonce_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict( - lambda: asyncio.Semaphore(50_000) -) -_nonce_cache_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict( - lambda: asyncio.Semaphore(100) -) +@alru_cache(maxsize=None, ttl=5) +async def _get_block_number(): + return await dank_mids.eth.block_number + + +_nonce_cache_locks: DefaultDict[Address, asyncio.Lock] = defaultdict(asyncio.Lock) async def get_block_for_nonce(address: Address, nonce: Nonce) -> int: hi = None + async with _nonce_cache_locks[address]: + highest_known_nonce_lower_than_query = None + lowest_known_nonce_greater_than_query = None + + # it is impossible for n to == nonce + for less_than, ns in itertools.groupby(nonces[address], lambda n: n < nonce): + if less_than: + max_value = max(ns) + if ( + highest_known_nonce_lower_than_query is None + or max_value > highest_known_nonce_lower_than_query + ): + highest_known_nonce_lower_than_query = max_value - async with _nonce_semaphores[address]: - - async with _nonce_cache_semaphores[address]: - if known_nonces_less_than_query := [n for n in nonces[address] if n < nonce]: - highest_known_nonce_lower_than_query = max(known_nonces_less_than_query) - block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query] - lo = block_at_known_nonce - del highest_known_nonce_lower_than_query, block_at_known_nonce else: - lo = 0 - - if known_nonces_greater_than_query := [n for n in nonces[address] if n > nonce]: - lowest_known_nonce_greater_than_query = min(known_nonces_greater_than_query) - block_at_known_nonce = nonces[address][lowest_known_nonce_greater_than_query] - hi = block_at_known_nonce - del lowest_known_nonce_greater_than_query, block_at_known_nonce - - del known_nonces_less_than_query, known_nonces_greater_than_query - - # lets find the general area first before we proceed with our binary search - range_size = hi - lo + 1 - if range_size > 4: - num_chunks = _get_num_chunks(range_size) - chunk_size = range_size // num_chunks - points: Dict[int, Nonce] = await a_sync.gather( - { - point: get_nonce_at_block(address, point) - for point in (lo + i * chunk_size for i in range(num_chunks)) - } - ) - - for block, _nonce in points.items(): - if _nonce >= nonce: - hi = block - break - lo = block - - del num_chunks, chunk_size, points, block - - del range_size - - hi = hi or await dank_mids.eth.block_number - - while True: - _nonce = await get_nonce_at_block(address, lo) - - if _nonce < nonce: - old_lo = lo - lo += int((hi - lo) / 2) or 1 - logger.debug("Nonce at %s is %s, checking higher block %s", old_lo, _nonce, lo) - continue - - prev_block_nonce: int = await get_nonce_at_block(address, lo - 1) - if prev_block_nonce >= nonce: - hi = lo - lo = int(lo / 2) - logger.debug("Nonce at %s is %s, checking lower block %s", hi, _nonce, lo) - continue - - logger.debug("Found nonce %s at block %s", nonce, lo) - return lo + min_value = min(ns) + if ( + lowest_known_nonce_greater_than_query is None + or min_value < lowest_known_nonce_greater_than_query + ): + lowest_known_nonce_greater_than_query = min_value + + if highest_known_nonce_lower_than_query is not None: + lo = nonces[address][highest_known_nonce_lower_than_query] + else: + lo = 0 + + if lowest_known_nonce_greater_than_query is not None: + hi = nonces[address][lowest_known_nonce_greater_than_query] + + del highest_known_nonce_lower_than_query, lowest_known_nonce_greater_than_query + + # lets find the general area first before we proceed with our binary search + range_size = hi - lo + 1 + if range_size > 4: + num_chunks = _get_num_chunks(range_size) + chunk_size = range_size // num_chunks + points: Dict[int, Nonce] = await a_sync.gather( + { + point: get_nonce_at_block(address, point) + for point in (lo + i * chunk_size for i in range(num_chunks)) + } + ) + + for block, _nonce in points.items(): + if _nonce >= nonce: + hi = block + break + lo = block + + del num_chunks, chunk_size, points, block + + del range_size + + hi = hi or await _get_block_number() + + while True: + _nonce = await get_nonce_at_block(address, lo) + + if _nonce < nonce: + old_lo = lo + lo += int((hi - lo) / 2) or 1 + logger.debug("Nonce at %s is %s, checking higher block %s", old_lo, _nonce, lo) + continue + + prev_block_nonce: int = await get_nonce_at_block(address, lo - 1) + if prev_block_nonce >= nonce: + hi = lo + lo = int(lo / 2) + logger.debug("Nonce at %s is %s, checking lower block %s", hi, _nonce, lo) + continue + + logger.debug("Found nonce %s at block %s", nonce, lo) + return lo async def _insert_to_db(transaction: structs.Transaction, load_prices: bool) -> None: diff --git a/eth_portfolio/address.py b/eth_portfolio/address.py index e60e33de..ffecdd7e 100644 --- a/eth_portfolio/address.py +++ b/eth_portfolio/address.py @@ -50,7 +50,14 @@ class PortfolioAddress(_LedgeredBase[AddressLedgerBase]): Represents a portfolio address within the eth-portfolio system. """ - def __init__(self, address: Address, portfolio: "Portfolio", num_workers_transactions: int = 25_000, asynchronous: bool = False) -> None: # type: ignore + def __init__( + self, + address: Address, + start_block: Block, + load_prices: bool, + num_workers_transactions: int = 25_000, + asynchronous: bool = False, + ) -> None: # type: ignore """ Initializes the PortfolioAddress instance. @@ -76,14 +83,14 @@ def __init__(self, address: Address, portfolio: "Portfolio", num_workers_transac """ Flag indicating if the operations are asynchronous. """ - self.load_prices = portfolio.load_prices + self.load_prices = load_prices """ Indicates if price loading is enabled. """ - super().__init__(portfolio._start_block) + super().__init__(start_block) - self.transactions = AddressTransactionsLedger(self) + self.transactions = AddressTransactionsLedger(self, num_workers_transactions) """ Ledger for tracking transactions. """ diff --git a/eth_portfolio/portfolio.py b/eth_portfolio/portfolio.py index 9b054930..9622fe11 100644 --- a/eth_portfolio/portfolio.py +++ b/eth_portfolio/portfolio.py @@ -50,7 +50,13 @@ class PortfolioWallets(Iterable[PortfolioAddress], Dict[Address, PortfolioAddres _wallets: ChecksumAddressDict[PortfolioAddress] - def __init__(self, portfolio: "Portfolio", addresses: Iterable[Address]) -> None: + def __init__( + self, + addresses: Iterable[Address], + start_block: Block, + load_prices: bool, + num_workers_transactions: int, + ) -> None: """ Initialize a PortfolioWallets instance. @@ -68,7 +74,11 @@ def __init__(self, portfolio: "Portfolio", addresses: Iterable[Address]) -> None for address in addresses: self._wallets[address] = PortfolioAddress( - address, portfolio, asynchronous=portfolio.asynchronous + address, + start_block, + load_prices, + num_workers_transactions=num_workers_transactions, + asynchronous=portfolio.asynchronous, ) def __repr__(self) -> str: @@ -159,6 +169,9 @@ def items(self) -> Iterable[Tuple[Address, PortfolioAddress]]: return self._wallets.items() +_DEFAULT_LABEL = "your portfolio" + + class Portfolio(a_sync.ASyncGenericBase): """ Used to export information about a group of :class:`~eth_portfolio.address.PortfolioAddress` objects. @@ -167,12 +180,33 @@ class Portfolio(a_sync.ASyncGenericBase): - All calls to `function(*args, **kwargs)` will return `{address: PortfolioAddress(Address).function(*args, **kwargs)}` """ + label: str = _DEFAULT_LABEL + """ + A label for the portfolio. Defaults to "your portfolio" + """ + + load_prices: bool = True + """ + Whether to load prices. Defaults to True. + """ + + asynchronous: bool = False + """ + Whether to use asynchronous operations. Defaults to False. + """ + + _start_block = 0 + """ + The starting block number. Defaults to 0. + """ + def __init__( self, addresses: Addresses, start_block: int = 0, - label: str = "your portfolio", + label: str = _DEFAULT_LABEL, load_prices: bool = True, + num_workers_transactions: int = 25_000, asynchronous: bool = False, ) -> None: """ @@ -190,40 +224,31 @@ def __init__( raise ValueError("`start_block` must be >= 0") super().__init__() - self._start_block = start_block - """ - The starting block number. Defaults to 0. - """ + if start_block: + self._start_block = start_block assert isinstance(label, str), f"`label` must be a string, you passed {type(label)}" - self.label = label - """ - A label for the portfolio. Defaults to "your portfolio" - """ + if label != _DEFAULT_LABEL: + self.label = label - if not isinstance(load_prices, bool): + if load_prices is False: + self.load_prices = False + elif load_prices is not True: raise TypeError(f"`load_prices` must be a boolean, you passed {type(load_prices)}") - self.load_prices: bool = load_prices - """ - Whether to load prices. Defaults to True. - """ - if not isinstance(asynchronous, bool): + if asynchronous is True: + self.asynchronous = True + elif asynchronous is not False: raise TypeError(f"`asynchronous` must be a boolean, you passed {type(asynchronous)}") - self.asynchronous: bool = asynchronous - """ - Whether to use asynchronous operations. Defaults to False. - """ - if not isinstance(addresses, Iterable): - raise TypeError(f"`addresses` must be an iterable, not {type(addresses)}") if isinstance(addresses, str): addresses = [addresses] - """ - The addresses to include in your portfolio - """ + elif not isinstance(addresses, Iterable): + raise TypeError(f"`addresses` must be an iterable, not {type(addresses)}") - self.addresses = PortfolioWallets(self, addresses) + self.addresses = PortfolioWallets( + addresses, start_block, load_prices, num_workers_transactions + ) """ A container for the :class:`~eth_portfolio.Portfolio`'s :class:`~eth_portfolio.address.PortfolioAddress` objects.