Skip to content

Commit

Permalink
feat: use semaphore for load_transaction db reads (#137)
Browse files Browse the repository at this point in the history
* feat: use semaphore for load_transaction db reads

* fix: typo

* chore: `black .`

* Update transaction.py

* chore: `black .`

* Update transaction.py

* Update transaction.py

* chore: `black .`

* Update transaction.py

* Update transaction.py

* chore: `black .`

* Update address.py

* chore: `black .`

* Update address.py

* chore: `black .`

* Update portfolio.py

* chore: `black .`

* Update portfolio.py

* Update portfolio.py

* Update portfolio.py

* Update portfolio.py

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Dec 11, 2024
1 parent c9cf9cc commit 2e2505e
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 32 deletions.
16 changes: 16 additions & 0 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ async def new(self) -> _LedgerEntryList:
end_block = await get_buffered_chain_height()
return self[start_block, end_block] # type: ignore [index, return-value]

async def sent(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[T]:
address = self.portfolio_address.address
async for obj in self[start_block:end_block]:
if obj.from_address == address:
yield obj

async def received(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[T]:
address = self.portfolio_address.address
async for obj in self[start_block:end_block]:
if obj.from_address != address:
yield obj

@set_end_block_if_none
@stuck_coro_debugger
async def _get_new_objects(self, start_block: Block, end_block: Block) -> AsyncIterator[T]:
Expand Down
24 changes: 23 additions & 1 deletion eth_portfolio/_ledgers/portfolio.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import TYPE_CHECKING, AsyncIterator, Dict, Generic, TypeVar
from typing import TYPE_CHECKING, AsyncIterator, Dict, Generic, Optional, TypeVar

import a_sync
from pandas import DataFrame, concat # type: ignore
Expand Down Expand Up @@ -124,6 +124,28 @@ async def df(self, start_block: Block, end_block: Block) -> DataFrame:
df = self._cleanup_df(df)
return df

async def sent(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[T]:
portfolio_addresses = set(self.portfolio.addresses.keys())
async for obj in self[start_block:end_block]:
if (
obj.from_address in portfolio_addresses
and obj.to_address not in portfolio_addresses
):
yield obj

async def received(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[T]:
portfolio_addresses = set(self.portfolio.addresses.keys())
async for obj in self[start_block:end_block]:
if (
obj.to_address in portfolio_addresses
and obj.from_address not in portfolio_addresses
):
yield obj

async def _df_base(self, start_block: Block, end_block: Block) -> DataFrame:
"""
Fetches and concatenates raw ledger data into a :class:`~DataFrame` for all addresses in the portfolio.
Expand Down
87 changes: 57 additions & 30 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
The primary focus of this module is to support eth-portfolio's internal operations such as loading transactions by address and nonce, retrieving transaction details from specific blocks, and managing transaction-related data.
"""

import asyncio
import logging
from collections import defaultdict
from typing import Awaitable, DefaultDict, Dict, List, Optional, Tuple
from typing import DefaultDict, Dict, List, Optional, Tuple

import a_sync
import dank_mids
Expand Down Expand Up @@ -38,6 +39,8 @@

nonces: DefaultDict[Address, Nonces] = defaultdict(lambda: defaultdict(int))

_semaphore = a_sync.Semaphore(10_000, name="load transaction db semaphore")


@eth_retry.auto_retry
@stuck_coro_debugger
Expand All @@ -63,11 +66,12 @@ async def load_transaction(
>>> print(await load_transaction(address="0x1234567890abcdef1234567890abcdef12345678", nonce=5, load_prices=True))
(5, Transaction(...))
"""
if transaction := await db.get_transaction(address, nonce):
if load_prices and transaction.price is None:
await db.delete_transaction(transaction)
else:
return nonce, transaction
async with _semaphore:
if transaction := await db.get_transaction(address, nonce):
if load_prices and transaction.price is None:
await db.delete_transaction(transaction)
else:
return nonce, transaction

block = await get_block_for_nonce(address, nonce)
tx = await get_transaction_by_nonce_and_block(address, nonce, block)
Expand All @@ -92,31 +96,54 @@ async def load_transaction(
return nonce, transaction


_nonce_cache_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict(
lambda: asyncio.Semaphore(100)
)


async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
if known_nonces_lower_than_query := [n for n in nonces[address] if n < nonce]:
highest_known_nonce_lower_than_query_nonce = max(known_nonces_lower_than_query)
block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query_nonce]
lo = block_at_known_nonce
else:
lo = 0

hi = await dank_mids.eth.block_number

# lets find the general area first before we proceed with our binary search
range_size = hi - lo + 1
if range_size > 4:
num_chunks = _get_num_chunks(range_size)
chunk_size = range_size // num_chunks
coros: Dict[int, Awaitable[Nonce]] = {
point: get_nonce_at_block(address, point)
for point in [lo + i * chunk_size for i in range(num_chunks)]
}
points: Dict[int, Nonce] = await a_sync.gather(coros)

for block, _nonce in points.items():
if _nonce >= nonce:
break
lo = block
hi = None

async with _nonce_cache_semaphores[address]:
if known_nonces_less_than_query := [n for n in nonces[address] if n < nonce]:
highest_known_nonce_lower_than_query = max(known_nonces_less_than_query)
block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query]
lo = block_at_known_nonce
del highest_known_nonce_lower_than_query, block_at_known_nonce
else:
lo = 0

if known_nonces_greater_than_query := [n for n in nonces[address] if n > nonce]:
lowest_known_nonce_greater_than_query = min(known_nonces_greater_than_query)
block_at_known_nonce = nonces[address][lowest_known_nonce_greater_than_query]
hi = block_at_known_nonce
del lowest_known_nonce_greater_than_query, block_at_known_nonce

del known_nonces_less_than_query, known_nonces_greater_than_query

# lets find the general area first before we proceed with our binary search
range_size = hi - lo + 1
if range_size > 4:
num_chunks = _get_num_chunks(range_size)
chunk_size = range_size // num_chunks
points: Dict[int, Nonce] = await a_sync.gather(
{
point: get_nonce_at_block(address, point)
for point in (lo + i * chunk_size for i in range(num_chunks))
}
)

for block, _nonce in points.items():
if _nonce >= nonce:
hi = block
break
lo = block

del num_chunks, chunk_size, points, block

del range_size

hi = hi or await dank_mids.eth.block_number

while True:
_nonce = await get_nonce_at_block(address, lo)
Expand Down
37 changes: 36 additions & 1 deletion eth_portfolio/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import asyncio
import logging
from functools import wraps
from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union
from typing import Any, AsyncIterator, Dict, Iterable, Iterator, List, Optional, Tuple, Union

import a_sync
from a_sync.a_sync import ASyncFunction
Expand All @@ -31,6 +31,7 @@
from eth_portfolio._utils import _LedgeredBase
from eth_portfolio.address import PortfolioAddress
from eth_portfolio.constants import ADDRESSES
from eth_portfolio.structs import LedgerEntry
from eth_portfolio.typing import Addresses, PortfolioBalances

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -345,6 +346,18 @@ async def describe(self, block: int) -> PortfolioBalances:
block=block,
)

async def sent(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[LedgerEntry]:
async for obj in self.ledger.sent(start_block, end_block):
yield obj

async def received(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[LedgerEntry]:
async for obj in self.ledger.received(start_block, end_block):
yield obj


async_functions = {
name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, ASyncFunction)
Expand Down Expand Up @@ -562,6 +575,28 @@ async def df(self, start_block: Block, end_block: Block, full: bool = False) ->
logger.error(df.columns)
raise

async def sent(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[LedgerEntry]:
portfolio_addresses = set(self.portfolio.addresses.keys())
async for obj in self[start_block:end_block]:
if (
obj.from_address in portfolio_addresses
and obj.to_address not in portfolio_addresses
):
yield obj

async def received(
self, start_block: Optional[Block] = None, end_block: Optional[Block] = None
) -> AsyncIterator[LedgerEntry]:
portfolio_addresses = set(self.portfolio.addresses.keys())
async for obj in self[start_block:end_block]:
if (
obj.to_address in portfolio_addresses
and obj.from_address not in portfolio_addresses
):
yield obj


# Use this var for a convenient way to set up your portfolio using env vars.
portfolio = Portfolio(ADDRESSES)

0 comments on commit 2e2505e

Please sign in to comment.