From 2e2505ed05433461a44cac0145a0529ae98b2198 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 11 Dec 2024 19:08:50 -0400 Subject: [PATCH] feat: use semaphore for load_transaction db reads (#137) * feat: use semaphore for load_transaction db reads * fix: typo * chore: `black .` * Update transaction.py * chore: `black .` * Update transaction.py * Update transaction.py * chore: `black .` * Update transaction.py * Update transaction.py * chore: `black .` * Update address.py * chore: `black .` * Update address.py * chore: `black .` * Update portfolio.py * chore: `black .` * Update portfolio.py * Update portfolio.py * Update portfolio.py * Update portfolio.py --------- Co-authored-by: github-actions[bot] --- eth_portfolio/_ledgers/address.py | 16 +++++ eth_portfolio/_ledgers/portfolio.py | 24 +++++++- eth_portfolio/_loaders/transaction.py | 87 ++++++++++++++++++--------- eth_portfolio/portfolio.py | 37 +++++++++++- 4 files changed, 132 insertions(+), 32 deletions(-) diff --git a/eth_portfolio/_ledgers/address.py b/eth_portfolio/_ledgers/address.py index 82d0e10d..11762dff 100644 --- a/eth_portfolio/_ledgers/address.py +++ b/eth_portfolio/_ledgers/address.py @@ -253,6 +253,22 @@ async def new(self) -> _LedgerEntryList: end_block = await get_buffered_chain_height() return self[start_block, end_block] # type: ignore [index, return-value] + async def sent( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[T]: + address = self.portfolio_address.address + async for obj in self[start_block:end_block]: + if obj.from_address == address: + yield obj + + async def received( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[T]: + address = self.portfolio_address.address + async for obj in self[start_block:end_block]: + if obj.from_address != address: + yield obj + @set_end_block_if_none @stuck_coro_debugger async def _get_new_objects(self, start_block: Block, end_block: Block) -> AsyncIterator[T]: diff --git a/eth_portfolio/_ledgers/portfolio.py b/eth_portfolio/_ledgers/portfolio.py index 16418548..58849e12 100644 --- a/eth_portfolio/_ledgers/portfolio.py +++ b/eth_portfolio/_ledgers/portfolio.py @@ -1,5 +1,5 @@ import logging -from typing import TYPE_CHECKING, AsyncIterator, Dict, Generic, TypeVar +from typing import TYPE_CHECKING, AsyncIterator, Dict, Generic, Optional, TypeVar import a_sync from pandas import DataFrame, concat # type: ignore @@ -124,6 +124,28 @@ async def df(self, start_block: Block, end_block: Block) -> DataFrame: df = self._cleanup_df(df) return df + async def sent( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[T]: + portfolio_addresses = set(self.portfolio.addresses.keys()) + async for obj in self[start_block:end_block]: + if ( + obj.from_address in portfolio_addresses + and obj.to_address not in portfolio_addresses + ): + yield obj + + async def received( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[T]: + portfolio_addresses = set(self.portfolio.addresses.keys()) + async for obj in self[start_block:end_block]: + if ( + obj.to_address in portfolio_addresses + and obj.from_address not in portfolio_addresses + ): + yield obj + async def _df_base(self, start_block: Block, end_block: Block) -> DataFrame: """ Fetches and concatenates raw ledger data into a :class:`~DataFrame` for all addresses in the portfolio. diff --git a/eth_portfolio/_loaders/transaction.py b/eth_portfolio/_loaders/transaction.py index a7cfe677..02e2a5d2 100644 --- a/eth_portfolio/_loaders/transaction.py +++ b/eth_portfolio/_loaders/transaction.py @@ -6,9 +6,10 @@ The primary focus of this module is to support eth-portfolio's internal operations such as loading transactions by address and nonce, retrieving transaction details from specific blocks, and managing transaction-related data. """ +import asyncio import logging from collections import defaultdict -from typing import Awaitable, DefaultDict, Dict, List, Optional, Tuple +from typing import DefaultDict, Dict, List, Optional, Tuple import a_sync import dank_mids @@ -38,6 +39,8 @@ nonces: DefaultDict[Address, Nonces] = defaultdict(lambda: defaultdict(int)) +_semaphore = a_sync.Semaphore(10_000, name="load transaction db semaphore") + @eth_retry.auto_retry @stuck_coro_debugger @@ -63,11 +66,12 @@ async def load_transaction( >>> print(await load_transaction(address="0x1234567890abcdef1234567890abcdef12345678", nonce=5, load_prices=True)) (5, Transaction(...)) """ - if transaction := await db.get_transaction(address, nonce): - if load_prices and transaction.price is None: - await db.delete_transaction(transaction) - else: - return nonce, transaction + async with _semaphore: + if transaction := await db.get_transaction(address, nonce): + if load_prices and transaction.price is None: + await db.delete_transaction(transaction) + else: + return nonce, transaction block = await get_block_for_nonce(address, nonce) tx = await get_transaction_by_nonce_and_block(address, nonce, block) @@ -92,31 +96,54 @@ async def load_transaction( return nonce, transaction +_nonce_cache_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict( + lambda: asyncio.Semaphore(100) +) + + async def get_block_for_nonce(address: Address, nonce: Nonce) -> int: - if known_nonces_lower_than_query := [n for n in nonces[address] if n < nonce]: - highest_known_nonce_lower_than_query_nonce = max(known_nonces_lower_than_query) - block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query_nonce] - lo = block_at_known_nonce - else: - lo = 0 - - hi = await dank_mids.eth.block_number - - # lets find the general area first before we proceed with our binary search - range_size = hi - lo + 1 - if range_size > 4: - num_chunks = _get_num_chunks(range_size) - chunk_size = range_size // num_chunks - coros: Dict[int, Awaitable[Nonce]] = { - point: get_nonce_at_block(address, point) - for point in [lo + i * chunk_size for i in range(num_chunks)] - } - points: Dict[int, Nonce] = await a_sync.gather(coros) - - for block, _nonce in points.items(): - if _nonce >= nonce: - break - lo = block + hi = None + + async with _nonce_cache_semaphores[address]: + if known_nonces_less_than_query := [n for n in nonces[address] if n < nonce]: + highest_known_nonce_lower_than_query = max(known_nonces_less_than_query) + block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query] + lo = block_at_known_nonce + del highest_known_nonce_lower_than_query, block_at_known_nonce + else: + lo = 0 + + if known_nonces_greater_than_query := [n for n in nonces[address] if n > nonce]: + lowest_known_nonce_greater_than_query = min(known_nonces_greater_than_query) + block_at_known_nonce = nonces[address][lowest_known_nonce_greater_than_query] + hi = block_at_known_nonce + del lowest_known_nonce_greater_than_query, block_at_known_nonce + + del known_nonces_less_than_query, known_nonces_greater_than_query + + # lets find the general area first before we proceed with our binary search + range_size = hi - lo + 1 + if range_size > 4: + num_chunks = _get_num_chunks(range_size) + chunk_size = range_size // num_chunks + points: Dict[int, Nonce] = await a_sync.gather( + { + point: get_nonce_at_block(address, point) + for point in (lo + i * chunk_size for i in range(num_chunks)) + } + ) + + for block, _nonce in points.items(): + if _nonce >= nonce: + hi = block + break + lo = block + + del num_chunks, chunk_size, points, block + + del range_size + + hi = hi or await dank_mids.eth.block_number while True: _nonce = await get_nonce_at_block(address, lo) diff --git a/eth_portfolio/portfolio.py b/eth_portfolio/portfolio.py index 7110fef7..0727cdfc 100644 --- a/eth_portfolio/portfolio.py +++ b/eth_portfolio/portfolio.py @@ -9,7 +9,7 @@ import asyncio import logging from functools import wraps -from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union +from typing import Any, AsyncIterator, Dict, Iterable, Iterator, List, Optional, Tuple, Union import a_sync from a_sync.a_sync import ASyncFunction @@ -31,6 +31,7 @@ from eth_portfolio._utils import _LedgeredBase from eth_portfolio.address import PortfolioAddress from eth_portfolio.constants import ADDRESSES +from eth_portfolio.structs import LedgerEntry from eth_portfolio.typing import Addresses, PortfolioBalances logger = logging.getLogger(__name__) @@ -345,6 +346,18 @@ async def describe(self, block: int) -> PortfolioBalances: block=block, ) + async def sent( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[LedgerEntry]: + async for obj in self.ledger.sent(start_block, end_block): + yield obj + + async def received( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[LedgerEntry]: + async for obj in self.ledger.received(start_block, end_block): + yield obj + async_functions = { name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, ASyncFunction) @@ -562,6 +575,28 @@ async def df(self, start_block: Block, end_block: Block, full: bool = False) -> logger.error(df.columns) raise + async def sent( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[LedgerEntry]: + portfolio_addresses = set(self.portfolio.addresses.keys()) + async for obj in self[start_block:end_block]: + if ( + obj.from_address in portfolio_addresses + and obj.to_address not in portfolio_addresses + ): + yield obj + + async def received( + self, start_block: Optional[Block] = None, end_block: Optional[Block] = None + ) -> AsyncIterator[LedgerEntry]: + portfolio_addresses = set(self.portfolio.addresses.keys()) + async for obj in self[start_block:end_block]: + if ( + obj.to_address in portfolio_addresses + and obj.from_address not in portfolio_addresses + ): + yield obj + # Use this var for a convenient way to set up your portfolio using env vars. portfolio = Portfolio(ADDRESSES)