Skip to content

Commit

Permalink
feat: ypm 4 (#54)
Browse files Browse the repository at this point in the history
* feat: implement SmartProcessingQueue

* fix: match dict api with `__contains__`

* fix: missing import

* fix: pin numpy < 2.0.0

* fix: attempt to fix missing logs

* fix: bump ypricemagic to v4

* fix: NonStandardERC20

* chore: bump ypm
  • Loading branch information
BobTheBuidler authored Jul 29, 2024
1 parent 8eae8d1 commit 20f85dc
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 54 deletions.
6 changes: 6 additions & 0 deletions eth_portfolio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@

import a_sync._smart

a_sync._smart.set_smart_task_factory()


from eth_portfolio.portfolio import Portfolio, portfolio
# make sure we init the extended db before we init ypm somewhere
from eth_portfolio._db import utils
Expand Down
19 changes: 8 additions & 11 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import dank_mids
import eth_retry
from pandas import DataFrame # type: ignore
from tqdm.asyncio import tqdm_asyncio
from y import ERC20
from y._decorators import stuck_coro_debugger
from y.datatypes import Block
from y.decorators import stuck_coro_debugger
from y.utils.events import BATCH_SIZE

from eth_portfolio import _loaders
Expand All @@ -22,8 +21,7 @@
from eth_portfolio._loaders.transaction import get_nonce_at_block
from eth_portfolio._ydb.token_transfers import TokenTransfers
from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction
from eth_portfolio.utils import (_AiterMixin, PandableList, _unpack_indicies,
get_buffered_chain_height)
from eth_portfolio.utils import _AiterMixin, PandableList, get_buffered_chain_height

if TYPE_CHECKING:
from eth_portfolio.address import PortfolioAddress
Expand Down Expand Up @@ -235,19 +233,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
class InternalTransfersList(PandableList[InternalTransfer]):
pass

trace_semaphore = a_sync.Semaphore(32, __name__ + ".trace_semaphore")

@cache_to_disk
# we double stack these because high-volume wallets will likely need it
@eth_retry.auto_retry
@eth_retry.auto_retry
@stuck_coro_debugger
@a_sync.Semaphore(32, __name__ + ".trace_semaphore")
@eth_retry.auto_retry
async def get_traces(params: list) -> List[dict]:
async with trace_semaphore:
traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc]
if 'result' not in traces:
raise BadResponse(traces)
return [trace for trace in traces['result'] if "error" not in trace]
traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc]
if 'result' not in traces:
raise BadResponse(traces)
return [trace for trace in traces['result'] if "error" not in trace]

class AddressInternalTransfersLedger(AddressLedgerBase[InternalTransfersList, InternalTransfer]):
_list_type = InternalTransfersList
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/_loaders/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import dank_mids
import eth_retry
from y import ERC20, NonStandardERC20, get_price
from y._decorators import stuck_coro_debugger
from y.constants import WRAPPED_GAS_COIN
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio.typing import Balance
from eth_portfolio.utils import _get_price
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/_loaders/internal_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import Optional

from brownie import chain
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.decorators import stuck_coro_debugger

from eth_portfolio._loaders.utils import checksum, get_transaction_receipt, underscore
from eth_portfolio.structs import InternalTransfer
Expand Down
18 changes: 12 additions & 6 deletions eth_portfolio/_loaders/token_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from brownie.network.event import _EventItem as brownie_EventItem
from pony.orm import TransactionIntegrityError
from y import ERC20, Contract
from y.decorators import stuck_coro_debugger
from y._decorators import stuck_coro_debugger
from y.exceptions import ContractNotVerified, NonStandardERC20
from y.utils.events import decode_logs

Expand Down Expand Up @@ -41,11 +41,17 @@ async def load_token_transfer(transfer_log: dict, load_prices: bool) -> Optional
return None
token = ERC20(decoded.address, asynchronous=True)
coros = [token.scale, get_symbol(token), get_transaction_index(decoded.transaction_hash)]
if load_prices:
coros.append(_get_price(token.address, decoded.block_number))
scale, symbol, transaction_index, price = await asyncio.gather(*coros)
else:
scale, symbol, transaction_index = await asyncio.gather(*coros)

try:
if load_prices:
coros.append(_get_price(token.address, decoded.block_number))
scale, symbol, transaction_index, price = await asyncio.gather(*coros)
else:
scale, symbol, transaction_index = await asyncio.gather(*coros)
except NonStandardERC20 as e:
# NOTE: if we cant fetch scale or symbol or both, this is probably either a shitcoin or an NFT (which we don't support at this time)
logger.debug(f"{e} for {transfer_log}, skipping.")
return None

sender, receiver, value = decoded.values()
value = Decimal(value) / Decimal(scale)
Expand Down
16 changes: 10 additions & 6 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from pony.orm import TransactionIntegrityError
from web3.types import TxData
from y import get_price
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio._db import utils as db
from eth_portfolio._loaders.utils import get_transaction_receipt, underscore
Expand Down Expand Up @@ -105,11 +105,15 @@ async def get_nonce_at_block(address: Address, block: Block) -> int:
return -1
raise ValueError(f"For {address} at {block}: {e}")

@alru_cache(ttl=60*60)
@eth_retry.auto_retry
@stuck_coro_debugger
async def get_block_transactions(block: Block) -> List[TxData]:
async with _full_block_semaphore:
block = await dank_mids.eth.get_block(block, full_transactions=True)
return block.transactions
async def _get_block_transactions(block: Block) -> List[TxData]:
block = await dank_mids.eth.get_block(block, full_transactions=True)
return block.transactions

_full_block_semaphore = a_sync.Semaphore(1_000, name = __name__ + "._full_block_semaphore")
get_block_transactions = a_sync.SmartProcessingQueue(
_get_block_transactions,
num_workers=1_000,
name=__name__ + ".get_block_transactions",
)
16 changes: 10 additions & 6 deletions eth_portfolio/_loaders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@
from async_lru import alru_cache
from eth_utils import to_checksum_address
from web3.types import TxReceipt
from y.decorators import stuck_coro_debugger
from y._decorators import stuck_coro_debugger

receipt_semaphore = a_sync.Semaphore(1_000)

@eth_retry.auto_retry
@alru_cache
@alru_cache(ttl=60*60)
@stuck_coro_debugger
async def get_transaction_receipt(txhash: str) -> TxReceipt:
async with receipt_semaphore:
return await dank_mids.eth.get_transaction_receipt(txhash)
async def _get_transaction_receipt(txhash: str) -> TxReceipt:
return await dank_mids.eth.get_transaction_receipt(txhash)

get_transaction_receipt = a_sync.SmartProcessingQueue(
_get_transaction_receipt,
num_workers=1000,
name=__name__ + ".get_transaction_receipt",
)

def checksum(addr: str) -> str:
"""We keep a mapping here to save cpu cycles, checksumming is arduous."""
Expand Down
6 changes: 3 additions & 3 deletions eth_portfolio/_ydb/token_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def yield_thru_block(self, block) -> AsyncIterator["asyncio.Task[TokenTran
logger.debug("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value)
yield task
logger.debug("%s yield thru %s complete", self, block)
def _extend(self, objs: List[LogReceipt]) -> None:
async def _extend(self, objs: List[LogReceipt]) -> None:
for log in objs:
task = asyncio.create_task(
coro=_loaders.load_token_transfer(log, self._load_prices),
Expand All @@ -66,13 +66,13 @@ class InboundTokenTransfers(_TokenTransfers):
"""A container that fetches and iterates over all inbound token transfers for a particular wallet address"""
@property
def _topics(self) -> List:
return [TRANSFER_SIGS, None, [encode_address(self.address)]]
return [TRANSFER_SIGS, None, encode_address(self.address)]

class OutboundTokenTransfers(_TokenTransfers):
"""A container that fetches and iterates over all outbound token transfers for a particular wallet address"""
@property
def _topics(self) -> List:
return [TRANSFER_SIGS, [encode_address(self.address)]]
return [TRANSFER_SIGS, encode_address(self.address)]

class TokenTransfers(a_sync.ASyncIterable[TokenTransfer]):
"""
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import a_sync
from a_sync.exceptions import MappingIsEmptyError
from y import convert
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio._ledgers.address import (AddressInternalTransfersLedger,
AddressLedgerBase,
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

from brownie import chain
from a_sync.primitives.executor import PruningThreadPoolExecutor
from a_sync.executor import PruningThreadPoolExecutor
from y import Network, convert, weth

ERC20_TRANSFER_EVENT_HASH = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
Expand Down
8 changes: 5 additions & 3 deletions eth_portfolio/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import asyncio
import logging
from functools import cached_property, wraps
from typing import Any, Dict, Iterable, Iterator, List, Tuple
from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union

import a_sync
import a_sync.modified
from a_sync.a_sync import ASyncFunction
from brownie import web3
from checksum_dict import ChecksumAddressDict
from pandas import DataFrame, concat # type: ignore
Expand Down Expand Up @@ -41,6 +41,8 @@ def __init__(self, portfolio: "Portfolio", addresses: Iterable[Address]) -> None
})
def __repr__(self) -> str:
return f"<{type(self).__name__} wallets={list(self._wallets.values())}>"
def __contains__(self, address: Union[Address, PortfolioAddress]) -> bool:
return address in self._wallets
def __getitem__(self, address: Address) -> PortfolioAddress:
return self._wallets[address]
def __iter__(self) -> Iterator[PortfolioAddress]:
Expand Down Expand Up @@ -133,7 +135,7 @@ async def describe(self, block: int) -> PortfolioBalances:
return PortfolioBalances(await a_sync.gather({address: address.describe(block, sync=False) for address in self}))


async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, a_sync.modified.ASyncFunction)}
async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, ASyncFunction)}
for func_name, func in async_functions.items():
if not callable(getattr(PortfolioAddress, func_name)):
raise RuntimeError(f"A PortfolioAddress object should not have a non-callable attribute suffixed with '_async'")
Expand Down
2 changes: 2 additions & 0 deletions eth_portfolio/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self) -> None:

@a_sync.future
async def balances(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances:
if not self.protocols:
return RemoteTokenBalances()
return RemoteTokenBalances({
type(protocol).__name__: protocol_balances
async for protocol, protocol_balances
Expand Down
5 changes: 2 additions & 3 deletions eth_portfolio/protocols/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from typing import List, Optional

import a_sync
from a_sync.modified import ASyncFunctionSyncDefault
from a_sync.property import HiddenMethod
from a_sync.a_sync import ASyncFunctionSyncDefault, HiddenMethod
from brownie.network.contract import ContractCall
from eth_portfolio.typing import Balance, TokenBalances
from eth_portfolio.utils import Decimal
from y import ERC20, Contract
from y._decorators import stuck_coro_debugger
from y.contracts import contract_creation_block
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger


class ProtocolABC(metaclass=abc.ABCMeta):
Expand Down
4 changes: 3 additions & 1 deletion eth_portfolio/protocols/lending/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from eth_portfolio.typing import RemoteTokenBalances
from eth_portfolio.utils import (_get_protocols_for_submodule,
_import_submodules)
from y._decorators import stuck_coro_debugger
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

_import_submodules()

Expand All @@ -31,6 +31,8 @@ async def collateral(self, address: Address, block: Optional[Block] = None) -> R
@a_sync.future
@stuck_coro_debugger
async def debt(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances:
if not self.protocols:
return RemoteTokenBalances()
return RemoteTokenBalances({
type(protocol).__name__: token_balances
async for protocol, token_balances in a_sync.map(lambda p: p.debt(address, block), self.protocols)
Expand Down
12 changes: 6 additions & 6 deletions eth_portfolio/protocols/lending/compound.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import a_sync
from async_lru import alru_cache
from brownie import ZERO_ADDRESS, Contract
from y import ERC20, Contract, get_prices, weth
from y import ERC20, Contract, map_prices, weth
from y._decorators import stuck_coro_debugger
from y.datatypes import Block
from y.decorators import stuck_coro_debugger
from y.exceptions import ContractNotVerified
from y.prices.lending.compound import CToken, compound

Expand Down Expand Up @@ -65,11 +65,11 @@ async def _debt(self, address: Address, block: Optional[Block] = None) -> TokenB
asyncio.gather(*[underlying.__scale__ for underlying in underlyings]),
)

debts = {underlying: Decimal(debt) / scale for underlying, scale, debt in zip(underlyings, underlying_scale, debt_data) if debt}
prices = await get_prices(debts.keys(), block=block, sync=False)
balances: TokenBalances = TokenBalances()
for (underlying, debt), price in zip(debts.items(), prices):
balances[underlying] += Balance(debt, debt * Decimal(price))
if debts := {underlying: Decimal(debt) / scale for underlying, scale, debt in zip(underlyings, underlying_scale, debt_data) if debt}:
async for underlying, price in map_prices(debts, block=block):
debt = debts.pop(underlying)
balances[underlying] += Balance(debt, debt * Decimal(price))
return balances

@stuck_coro_debugger
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/protocols/lending/liquity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from async_lru import alru_cache
from y import Contract, Network, get_price
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio.protocols.lending._base import \
LendingProtocolWithLockedCollateral
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/protocols/lending/maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

from async_lru import alru_cache
from y import Network, get_price
from y._decorators import stuck_coro_debugger
from y.constants import dai
from y.contracts import Contract
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio.protocols.lending._base import \
LendingProtocolWithLockedCollateral
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/protocols/lending/unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from brownie import chain
from y import Contract, Network, get_price
from y._decorators import stuck_coro_debugger
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio.protocols.lending._base import \
LendingProtocolWithLockedCollateral
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ checksum_dict>=1.1.2
dank_mids>=4.20.85
eth-brownie>=1.19.3,<1.21
eth_retry>=0.1.15,<1
ez-a-sync>=0.19.5,<0.21
ez-a-sync>=0.22.1
numpy<2
pandas>=1.4.3,<1.6
ypricemagic>=3.3.0,<4
ypricemagic>=4.0.0,<4.1

0 comments on commit 20f85dc

Please sign in to comment.