diff --git a/eth_portfolio/__init__.py b/eth_portfolio/__init__.py index c25be218..b2f17e4c 100644 --- a/eth_portfolio/__init__.py +++ b/eth_portfolio/__init__.py @@ -1,3 +1,9 @@ + +import a_sync._smart + +a_sync._smart.set_smart_task_factory() + + from eth_portfolio.portfolio import Portfolio, portfolio # make sure we init the extended db before we init ypm somewhere from eth_portfolio._db import utils diff --git a/eth_portfolio/_ledgers/address.py b/eth_portfolio/_ledgers/address.py index c5424b01..6d58ffd2 100644 --- a/eth_portfolio/_ledgers/address.py +++ b/eth_portfolio/_ledgers/address.py @@ -10,10 +10,9 @@ import dank_mids import eth_retry from pandas import DataFrame # type: ignore -from tqdm.asyncio import tqdm_asyncio from y import ERC20 +from y._decorators import stuck_coro_debugger from y.datatypes import Block -from y.decorators import stuck_coro_debugger from y.utils.events import BATCH_SIZE from eth_portfolio import _loaders @@ -22,8 +21,7 @@ from eth_portfolio._loaders.transaction import get_nonce_at_block from eth_portfolio._ydb.token_transfers import TokenTransfers from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction -from eth_portfolio.utils import (_AiterMixin, PandableList, _unpack_indicies, - get_buffered_chain_height) +from eth_portfolio.utils import _AiterMixin, PandableList, get_buffered_chain_height if TYPE_CHECKING: from eth_portfolio.address import PortfolioAddress @@ -235,19 +233,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T class InternalTransfersList(PandableList[InternalTransfer]): pass -trace_semaphore = a_sync.Semaphore(32, __name__ + ".trace_semaphore") @cache_to_disk # we double stack these because high-volume wallets will likely need it @eth_retry.auto_retry -@eth_retry.auto_retry @stuck_coro_debugger +@a_sync.Semaphore(32, __name__ + ".trace_semaphore") +@eth_retry.auto_retry async def get_traces(params: list) -> List[dict]: - async with trace_semaphore: - traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc] - if 'result' not in traces: - raise BadResponse(traces) - return [trace for trace in traces['result'] if "error" not in trace] + traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc] + if 'result' not in traces: + raise BadResponse(traces) + return [trace for trace in traces['result'] if "error" not in trace] class AddressInternalTransfersLedger(AddressLedgerBase[InternalTransfersList, InternalTransfer]): _list_type = InternalTransfersList diff --git a/eth_portfolio/_loaders/balances.py b/eth_portfolio/_loaders/balances.py index edc86da8..923704b5 100644 --- a/eth_portfolio/_loaders/balances.py +++ b/eth_portfolio/_loaders/balances.py @@ -6,9 +6,9 @@ import dank_mids import eth_retry from y import ERC20, NonStandardERC20, get_price +from y._decorators import stuck_coro_debugger from y.constants import WRAPPED_GAS_COIN from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.typing import Balance from eth_portfolio.utils import _get_price diff --git a/eth_portfolio/_loaders/internal_transfer.py b/eth_portfolio/_loaders/internal_transfer.py index cbd5694d..42af180a 100644 --- a/eth_portfolio/_loaders/internal_transfer.py +++ b/eth_portfolio/_loaders/internal_transfer.py @@ -3,8 +3,8 @@ from typing import Optional from brownie import chain +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS -from y.decorators import stuck_coro_debugger from eth_portfolio._loaders.utils import checksum, get_transaction_receipt, underscore from eth_portfolio.structs import InternalTransfer diff --git a/eth_portfolio/_loaders/token_transfer.py b/eth_portfolio/_loaders/token_transfer.py index 3a54563f..6678bb28 100644 --- a/eth_portfolio/_loaders/token_transfer.py +++ b/eth_portfolio/_loaders/token_transfer.py @@ -10,7 +10,7 @@ from brownie.network.event import _EventItem as brownie_EventItem from pony.orm import TransactionIntegrityError from y import ERC20, Contract -from y.decorators import stuck_coro_debugger +from y._decorators import stuck_coro_debugger from y.exceptions import ContractNotVerified, NonStandardERC20 from y.utils.events import decode_logs @@ -41,11 +41,17 @@ async def load_token_transfer(transfer_log: dict, load_prices: bool) -> Optional return None token = ERC20(decoded.address, asynchronous=True) coros = [token.scale, get_symbol(token), get_transaction_index(decoded.transaction_hash)] - if load_prices: - coros.append(_get_price(token.address, decoded.block_number)) - scale, symbol, transaction_index, price = await asyncio.gather(*coros) - else: - scale, symbol, transaction_index = await asyncio.gather(*coros) + + try: + if load_prices: + coros.append(_get_price(token.address, decoded.block_number)) + scale, symbol, transaction_index, price = await asyncio.gather(*coros) + else: + scale, symbol, transaction_index = await asyncio.gather(*coros) + except NonStandardERC20 as e: + # NOTE: if we cant fetch scale or symbol or both, this is probably either a shitcoin or an NFT (which we don't support at this time) + logger.debug(f"{e} for {transfer_log}, skipping.") + return None sender, receiver, value = decoded.values() value = Decimal(value) / Decimal(scale) diff --git a/eth_portfolio/_loaders/transaction.py b/eth_portfolio/_loaders/transaction.py index c403a03a..f5b429da 100644 --- a/eth_portfolio/_loaders/transaction.py +++ b/eth_portfolio/_loaders/transaction.py @@ -11,9 +11,9 @@ from pony.orm import TransactionIntegrityError from web3.types import TxData from y import get_price +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio._db import utils as db from eth_portfolio._loaders.utils import get_transaction_receipt, underscore @@ -105,11 +105,15 @@ async def get_nonce_at_block(address: Address, block: Block) -> int: return -1 raise ValueError(f"For {address} at {block}: {e}") +@alru_cache(ttl=60*60) @eth_retry.auto_retry @stuck_coro_debugger -async def get_block_transactions(block: Block) -> List[TxData]: - async with _full_block_semaphore: - block = await dank_mids.eth.get_block(block, full_transactions=True) - return block.transactions +async def _get_block_transactions(block: Block) -> List[TxData]: + block = await dank_mids.eth.get_block(block, full_transactions=True) + return block.transactions -_full_block_semaphore = a_sync.Semaphore(1_000, name = __name__ + "._full_block_semaphore") +get_block_transactions = a_sync.SmartProcessingQueue( + _get_block_transactions, + num_workers=1_000, + name=__name__ + ".get_block_transactions", +) diff --git a/eth_portfolio/_loaders/utils.py b/eth_portfolio/_loaders/utils.py index d882c556..9fd94d50 100644 --- a/eth_portfolio/_loaders/utils.py +++ b/eth_portfolio/_loaders/utils.py @@ -8,16 +8,20 @@ from async_lru import alru_cache from eth_utils import to_checksum_address from web3.types import TxReceipt -from y.decorators import stuck_coro_debugger +from y._decorators import stuck_coro_debugger -receipt_semaphore = a_sync.Semaphore(1_000) @eth_retry.auto_retry -@alru_cache +@alru_cache(ttl=60*60) @stuck_coro_debugger -async def get_transaction_receipt(txhash: str) -> TxReceipt: - async with receipt_semaphore: - return await dank_mids.eth.get_transaction_receipt(txhash) +async def _get_transaction_receipt(txhash: str) -> TxReceipt: + return await dank_mids.eth.get_transaction_receipt(txhash) + +get_transaction_receipt = a_sync.SmartProcessingQueue( + _get_transaction_receipt, + num_workers=1000, + name=__name__ + ".get_transaction_receipt", +) def checksum(addr: str) -> str: """We keep a mapping here to save cpu cycles, checksumming is arduous.""" diff --git a/eth_portfolio/_ydb/token_transfers.py b/eth_portfolio/_ydb/token_transfers.py index ec5f9b14..203dcc38 100644 --- a/eth_portfolio/_ydb/token_transfers.py +++ b/eth_portfolio/_ydb/token_transfers.py @@ -44,7 +44,7 @@ async def yield_thru_block(self, block) -> AsyncIterator["asyncio.Task[TokenTran logger.debug("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value) yield task logger.debug("%s yield thru %s complete", self, block) - def _extend(self, objs: List[LogReceipt]) -> None: + async def _extend(self, objs: List[LogReceipt]) -> None: for log in objs: task = asyncio.create_task( coro=_loaders.load_token_transfer(log, self._load_prices), @@ -66,13 +66,13 @@ class InboundTokenTransfers(_TokenTransfers): """A container that fetches and iterates over all inbound token transfers for a particular wallet address""" @property def _topics(self) -> List: - return [TRANSFER_SIGS, None, [encode_address(self.address)]] + return [TRANSFER_SIGS, None, encode_address(self.address)] class OutboundTokenTransfers(_TokenTransfers): """A container that fetches and iterates over all outbound token transfers for a particular wallet address""" @property def _topics(self) -> List: - return [TRANSFER_SIGS, [encode_address(self.address)]] + return [TRANSFER_SIGS, encode_address(self.address)] class TokenTransfers(a_sync.ASyncIterable[TokenTransfer]): """ diff --git a/eth_portfolio/address.py b/eth_portfolio/address.py index afac1b86..bdb17b14 100644 --- a/eth_portfolio/address.py +++ b/eth_portfolio/address.py @@ -6,9 +6,9 @@ import a_sync from a_sync.exceptions import MappingIsEmptyError from y import convert +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio._ledgers.address import (AddressInternalTransfersLedger, AddressLedgerBase, diff --git a/eth_portfolio/constants.py b/eth_portfolio/constants.py index 8ff7a965..8d796abc 100644 --- a/eth_portfolio/constants.py +++ b/eth_portfolio/constants.py @@ -2,7 +2,7 @@ import os from brownie import chain -from a_sync.primitives.executor import PruningThreadPoolExecutor +from a_sync.executor import PruningThreadPoolExecutor from y import Network, convert, weth ERC20_TRANSFER_EVENT_HASH = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' diff --git a/eth_portfolio/portfolio.py b/eth_portfolio/portfolio.py index 9fd0bd0b..28315aa5 100644 --- a/eth_portfolio/portfolio.py +++ b/eth_portfolio/portfolio.py @@ -2,10 +2,10 @@ import asyncio import logging from functools import cached_property, wraps -from typing import Any, Dict, Iterable, Iterator, List, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union import a_sync -import a_sync.modified +from a_sync.a_sync import ASyncFunction from brownie import web3 from checksum_dict import ChecksumAddressDict from pandas import DataFrame, concat # type: ignore @@ -41,6 +41,8 @@ def __init__(self, portfolio: "Portfolio", addresses: Iterable[Address]) -> None }) def __repr__(self) -> str: return f"<{type(self).__name__} wallets={list(self._wallets.values())}>" + def __contains__(self, address: Union[Address, PortfolioAddress]) -> bool: + return address in self._wallets def __getitem__(self, address: Address) -> PortfolioAddress: return self._wallets[address] def __iter__(self) -> Iterator[PortfolioAddress]: @@ -133,7 +135,7 @@ async def describe(self, block: int) -> PortfolioBalances: return PortfolioBalances(await a_sync.gather({address: address.describe(block, sync=False) for address in self})) -async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, a_sync.modified.ASyncFunction)} +async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, ASyncFunction)} for func_name, func in async_functions.items(): if not callable(getattr(PortfolioAddress, func_name)): raise RuntimeError(f"A PortfolioAddress object should not have a non-callable attribute suffixed with '_async'") diff --git a/eth_portfolio/protocols/__init__.py b/eth_portfolio/protocols/__init__.py index 7effa75a..c9cbd5c7 100644 --- a/eth_portfolio/protocols/__init__.py +++ b/eth_portfolio/protocols/__init__.py @@ -19,6 +19,8 @@ def __init__(self) -> None: @a_sync.future async def balances(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances: + if not self.protocols: + return RemoteTokenBalances() return RemoteTokenBalances({ type(protocol).__name__: protocol_balances async for protocol, protocol_balances diff --git a/eth_portfolio/protocols/_base.py b/eth_portfolio/protocols/_base.py index 86e278fd..cbb217fb 100644 --- a/eth_portfolio/protocols/_base.py +++ b/eth_portfolio/protocols/_base.py @@ -4,15 +4,14 @@ from typing import List, Optional import a_sync -from a_sync.modified import ASyncFunctionSyncDefault -from a_sync.property import HiddenMethod +from a_sync.a_sync import ASyncFunctionSyncDefault, HiddenMethod from brownie.network.contract import ContractCall from eth_portfolio.typing import Balance, TokenBalances from eth_portfolio.utils import Decimal from y import ERC20, Contract +from y._decorators import stuck_coro_debugger from y.contracts import contract_creation_block from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger class ProtocolABC(metaclass=abc.ABCMeta): diff --git a/eth_portfolio/protocols/lending/__init__.py b/eth_portfolio/protocols/lending/__init__.py index fd0ad758..b7689c7d 100644 --- a/eth_portfolio/protocols/lending/__init__.py +++ b/eth_portfolio/protocols/lending/__init__.py @@ -8,8 +8,8 @@ from eth_portfolio.typing import RemoteTokenBalances from eth_portfolio.utils import (_get_protocols_for_submodule, _import_submodules) +from y._decorators import stuck_coro_debugger from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger _import_submodules() @@ -31,6 +31,8 @@ async def collateral(self, address: Address, block: Optional[Block] = None) -> R @a_sync.future @stuck_coro_debugger async def debt(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances: + if not self.protocols: + return RemoteTokenBalances() return RemoteTokenBalances({ type(protocol).__name__: token_balances async for protocol, token_balances in a_sync.map(lambda p: p.debt(address, block), self.protocols) diff --git a/eth_portfolio/protocols/lending/compound.py b/eth_portfolio/protocols/lending/compound.py index 3cfc0831..55ae26c8 100644 --- a/eth_portfolio/protocols/lending/compound.py +++ b/eth_portfolio/protocols/lending/compound.py @@ -5,9 +5,9 @@ import a_sync from async_lru import alru_cache from brownie import ZERO_ADDRESS, Contract -from y import ERC20, Contract, get_prices, weth +from y import ERC20, Contract, map_prices, weth +from y._decorators import stuck_coro_debugger from y.datatypes import Block -from y.decorators import stuck_coro_debugger from y.exceptions import ContractNotVerified from y.prices.lending.compound import CToken, compound @@ -65,11 +65,11 @@ async def _debt(self, address: Address, block: Optional[Block] = None) -> TokenB asyncio.gather(*[underlying.__scale__ for underlying in underlyings]), ) - debts = {underlying: Decimal(debt) / scale for underlying, scale, debt in zip(underlyings, underlying_scale, debt_data) if debt} - prices = await get_prices(debts.keys(), block=block, sync=False) balances: TokenBalances = TokenBalances() - for (underlying, debt), price in zip(debts.items(), prices): - balances[underlying] += Balance(debt, debt * Decimal(price)) + if debts := {underlying: Decimal(debt) / scale for underlying, scale, debt in zip(underlyings, underlying_scale, debt_data) if debt}: + async for underlying, price in map_prices(debts, block=block): + debt = debts.pop(underlying) + balances[underlying] += Balance(debt, debt * Decimal(price)) return balances @stuck_coro_debugger diff --git a/eth_portfolio/protocols/lending/liquity.py b/eth_portfolio/protocols/lending/liquity.py index 012ce7bd..9e5aa937 100644 --- a/eth_portfolio/protocols/lending/liquity.py +++ b/eth_portfolio/protocols/lending/liquity.py @@ -3,9 +3,9 @@ from async_lru import alru_cache from y import Contract, Network, get_price +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.protocols.lending._base import \ LendingProtocolWithLockedCollateral diff --git a/eth_portfolio/protocols/lending/maker.py b/eth_portfolio/protocols/lending/maker.py index fbefeb0e..9626d27a 100644 --- a/eth_portfolio/protocols/lending/maker.py +++ b/eth_portfolio/protocols/lending/maker.py @@ -4,10 +4,10 @@ from async_lru import alru_cache from y import Network, get_price +from y._decorators import stuck_coro_debugger from y.constants import dai from y.contracts import Contract from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.protocols.lending._base import \ LendingProtocolWithLockedCollateral diff --git a/eth_portfolio/protocols/lending/unit.py b/eth_portfolio/protocols/lending/unit.py index fd685cda..b06ae63d 100644 --- a/eth_portfolio/protocols/lending/unit.py +++ b/eth_portfolio/protocols/lending/unit.py @@ -3,8 +3,8 @@ from brownie import chain from y import Contract, Network, get_price +from y._decorators import stuck_coro_debugger from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.protocols.lending._base import \ LendingProtocolWithLockedCollateral diff --git a/requirements.txt b/requirements.txt index 348341bc..34e2b8ba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ checksum_dict>=1.1.2 dank_mids>=4.20.85 eth-brownie>=1.19.3,<1.21 eth_retry>=0.1.15,<1 -ez-a-sync>=0.19.5,<0.21 +ez-a-sync>=0.22.1 +numpy<2 pandas>=1.4.3,<1.6 -ypricemagic>=3.3.0,<4 +ypricemagic>=4.0.0,<4.1