Skip to content

Commit

Permalink
feat: use queue instead of semaphore for transaction loading (#142)
Browse files Browse the repository at this point in the history
* feat: use queue instead of semaphore for transaction loading

faster, less blocking, less ram usage

* feat: AddressLedgerBase repr

* fix: AddressTransactionLedger.__stop_workers

* chore: reduce num workers 50k -> 25k

* chore: `black .`
---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Dec 16, 2024
1 parent eb54f35 commit 548da7c
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 115 deletions.
85 changes: 75 additions & 10 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
TYPE_CHECKING,
AsyncGenerator,
AsyncIterator,
Callable,
Generic,
List,
NoReturn,
Optional,
Tuple,
Type,
Expand All @@ -34,10 +36,12 @@
from aiohttp import ClientResponseError
from async_lru import alru_cache
from dank_mids.eth import TraceFilterParams
from eth_typing import ChecksumAddress
from evmspec import FilterTrace
from evmspec.structs.receipt import Status
from evmspec.structs.trace import call, reward
from pandas import DataFrame # type: ignore
from tqdm import tqdm
from y import ERC20
from y._decorators import stuck_coro_debugger
from y.datatypes import Block
Expand All @@ -46,7 +50,7 @@
from eth_portfolio import _exceptions, _loaders
from eth_portfolio._cache import cache_to_disk
from eth_portfolio._decorators import set_end_block_if_none
from eth_portfolio._loaders.transaction import get_nonce_at_block
from eth_portfolio._loaders.transaction import get_nonce_at_block, load_transaction
from eth_portfolio._utils import PandableList, _AiterMixin, get_buffered_chain_height
from eth_portfolio._ydb.token_transfers import TokenTransfers
from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction
Expand Down Expand Up @@ -146,6 +150,9 @@ def __hash__(self) -> int:
"""
return hash(self.address)

def __repr__(self) -> str:
return f"<{type(self).__name__} for {self.address} at {hex(id(self))}>"

@abc.abstractproperty
def _list_type(self) -> Type[_LedgerEntryList]:
"""
Expand Down Expand Up @@ -387,15 +394,18 @@ def _df(self) -> DataFrame:
return df


Nonce = int


class AddressTransactionsLedger(AddressLedgerBase[TransactionsList, Transaction]):
"""
A ledger for managing transaction entries.
"""

_list_type = TransactionsList
__slots__ = ("cached_thru_nonce",)
__slots__ = ("cached_thru_nonce", "_queue", "_ready", "_num_workers", "_workers")

def __init__(self, portfolio_address: "PortfolioAddress"):
def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 25_000):
"""
Initializes the AddressTransactionsLedger instance.
Expand All @@ -407,6 +417,13 @@ def __init__(self, portfolio_address: "PortfolioAddress"):
"""
The nonce through which all transactions have been loaded into memory.
"""
self._queue = asyncio.Queue()
self._ready = asyncio.Queue()
self._num_workers = num_workers
self._workers = []

def __del__(self) -> None:
self.__stop_workers()

@set_end_block_if_none
@stuck_coro_debugger
Expand All @@ -425,16 +442,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
return
end_block_nonce: int = await get_nonce_at_block(self.address, end_block)
if nonces := list(range(self.cached_thru_nonce + 1, end_block_nonce + 1)):
coros = [
_loaders.load_transaction(self.address, nonce, self.load_prices) for nonce in nonces
]
for nonce in nonces:
self._queue.put_nowait(nonce)

self._ensure_workers(min(len(nonces), self._num_workers))

transactions = []
transaction: Transaction
async for nonce, transaction in a_sync.as_completed( # type: ignore [assignment]
coros, aiter=True, tqdm=True, desc=f"Transactions {self.address}"
):
transaction: Optional[Transaction]
for _ in tqdm(nonces, desc=f"Transactions {self.address}"):
nonce, transaction = await self._ready.get()
if transaction:
if isinstance(transaction, Exception):
raise transaction
transactions.append(transaction)
yield transaction
elif nonce == 0 and self.cached_thru_nonce == -1:
Expand All @@ -444,6 +463,8 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
# NOTE Are we sure this is the correct way to handle this scenario? Are we sure it will ever even occur with the new gnosis handling?
logger.warning("No transaction with nonce %s for %s", nonce, self.address)

self.__stop_workers()

if transactions:
self.objects.extend(transactions)
if self.objects:
Expand All @@ -455,6 +476,50 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
if self.cached_thru is None or end_block > self.cached_thru:
self.cached_thru = end_block

def _ensure_workers(self, num_workers: int) -> None:
len_workers = len(self._workers)
if len_workers < num_workers:
logger.info("ensuring %s workers for %s", num_workers, self)

create_task = asyncio.create_task
worker_fn = self.__worker_fn
address = self.address
load_prices = self.load_prices
queue_get = stuck_coro_debugger(self._queue.get)
put_ready = self._ready.put_nowait

self._workers.extend(
create_task(worker_fn(address, load_prices, queue_get, put_ready))
for _ in range(num_workers - len_workers)
)
logger.info(f"{self} workers: {self._workers}")

@staticmethod
async def __worker_fn(
address: ChecksumAddress,
load_prices: bool,
queue_get: Callable[[], Nonce],
put_ready: Callable[[Nonce, Optional[Transaction]], None],
) -> NoReturn:
try:
while True:
nonce = await queue_get()
try:
put_ready(await load_transaction(address, nonce, load_prices))
except Exception as e:
put_ready((nonce, e))
except Exception as e:
logger.error(f"%s in %s __worker_coro", type(e), self)
logger.exception(e)
raise

def __stop_workers(self) -> None:
logger.info("stopping workers for %s", self)
workers = self._workers
pop_next = workers.pop
for _ in range(len(workers)):
pop_next().cancel()


class InternalTransfersList(PandableList[InternalTransfer]):
"""
Expand Down
158 changes: 84 additions & 74 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import asyncio
import itertools
import logging
from collections import defaultdict
from typing import DefaultDict, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -39,8 +40,6 @@

nonces: DefaultDict[Address, Nonces] = defaultdict(lambda: defaultdict(int))

_semaphore = a_sync.Semaphore(10_000, name="load transaction db semaphore")


@eth_retry.auto_retry
@stuck_coro_debugger
Expand All @@ -66,12 +65,11 @@ async def load_transaction(
>>> print(await load_transaction(address="0x1234567890abcdef1234567890abcdef12345678", nonce=5, load_prices=True))
(5, Transaction(...))
"""
async with _semaphore:
if transaction := await db.get_transaction(address, nonce):
if load_prices and transaction.price is None:
await db.delete_transaction(transaction)
else:
return nonce, transaction
if transaction := await db.get_transaction(address, nonce):
if load_prices and transaction.price is None:
await db.delete_transaction(transaction)
else:
return nonce, transaction

block = await get_block_for_nonce(address, nonce)
tx = await get_transaction_by_nonce_and_block(address, nonce, block)
Expand All @@ -96,78 +94,90 @@ async def load_transaction(
return nonce, transaction


_nonce_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict(
lambda: asyncio.Semaphore(50_000)
)
_nonce_cache_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict(
lambda: asyncio.Semaphore(100)
)
@alru_cache(maxsize=None, ttl=5)
async def _get_block_number():
return await dank_mids.eth.block_number


_nonce_cache_locks: DefaultDict[Address, asyncio.Lock] = defaultdict(asyncio.Lock)


async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
hi = None
async with _nonce_cache_locks[address]:
highest_known_nonce_lower_than_query = None
lowest_known_nonce_greater_than_query = None

# it is impossible for n to == nonce
for less_than, ns in itertools.groupby(nonces[address], lambda n: n < nonce):
if less_than:
max_value = max(ns)
if (
highest_known_nonce_lower_than_query is None
or max_value > highest_known_nonce_lower_than_query
):
highest_known_nonce_lower_than_query = max_value

async with _nonce_semaphores[address]:

async with _nonce_cache_semaphores[address]:
if known_nonces_less_than_query := [n for n in nonces[address] if n < nonce]:
highest_known_nonce_lower_than_query = max(known_nonces_less_than_query)
block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query]
lo = block_at_known_nonce
del highest_known_nonce_lower_than_query, block_at_known_nonce
else:
lo = 0

if known_nonces_greater_than_query := [n for n in nonces[address] if n > nonce]:
lowest_known_nonce_greater_than_query = min(known_nonces_greater_than_query)
block_at_known_nonce = nonces[address][lowest_known_nonce_greater_than_query]
hi = block_at_known_nonce
del lowest_known_nonce_greater_than_query, block_at_known_nonce

del known_nonces_less_than_query, known_nonces_greater_than_query

# lets find the general area first before we proceed with our binary search
range_size = hi - lo + 1
if range_size > 4:
num_chunks = _get_num_chunks(range_size)
chunk_size = range_size // num_chunks
points: Dict[int, Nonce] = await a_sync.gather(
{
point: get_nonce_at_block(address, point)
for point in (lo + i * chunk_size for i in range(num_chunks))
}
)

for block, _nonce in points.items():
if _nonce >= nonce:
hi = block
break
lo = block

del num_chunks, chunk_size, points, block

del range_size

hi = hi or await dank_mids.eth.block_number

while True:
_nonce = await get_nonce_at_block(address, lo)

if _nonce < nonce:
old_lo = lo
lo += int((hi - lo) / 2) or 1
logger.debug("Nonce at %s is %s, checking higher block %s", old_lo, _nonce, lo)
continue

prev_block_nonce: int = await get_nonce_at_block(address, lo - 1)
if prev_block_nonce >= nonce:
hi = lo
lo = int(lo / 2)
logger.debug("Nonce at %s is %s, checking lower block %s", hi, _nonce, lo)
continue

logger.debug("Found nonce %s at block %s", nonce, lo)
return lo
min_value = min(ns)
if (
lowest_known_nonce_greater_than_query is None
or min_value < lowest_known_nonce_greater_than_query
):
lowest_known_nonce_greater_than_query = min_value

if highest_known_nonce_lower_than_query is not None:
lo = nonces[address][highest_known_nonce_lower_than_query]
else:
lo = 0

if lowest_known_nonce_greater_than_query is not None:
hi = nonces[address][lowest_known_nonce_greater_than_query]

del highest_known_nonce_lower_than_query, lowest_known_nonce_greater_than_query

# lets find the general area first before we proceed with our binary search
range_size = hi - lo + 1
if range_size > 4:
num_chunks = _get_num_chunks(range_size)
chunk_size = range_size // num_chunks
points: Dict[int, Nonce] = await a_sync.gather(
{
point: get_nonce_at_block(address, point)
for point in (lo + i * chunk_size for i in range(num_chunks))
}
)

for block, _nonce in points.items():
if _nonce >= nonce:
hi = block
break
lo = block

del num_chunks, chunk_size, points, block

del range_size

hi = hi or await _get_block_number()

while True:
_nonce = await get_nonce_at_block(address, lo)

if _nonce < nonce:
old_lo = lo
lo += int((hi - lo) / 2) or 1
logger.debug("Nonce at %s is %s, checking higher block %s", old_lo, _nonce, lo)
continue

prev_block_nonce: int = await get_nonce_at_block(address, lo - 1)
if prev_block_nonce >= nonce:
hi = lo
lo = int(lo / 2)
logger.debug("Nonce at %s is %s, checking lower block %s", hi, _nonce, lo)
continue

logger.debug("Found nonce %s at block %s", nonce, lo)
return lo


async def _insert_to_db(transaction: structs.Transaction, load_prices: bool) -> None:
Expand Down
15 changes: 11 additions & 4 deletions eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ class PortfolioAddress(_LedgeredBase[AddressLedgerBase]):
Represents a portfolio address within the eth-portfolio system.
"""

def __init__(self, address: Address, portfolio: "Portfolio", num_workers_transactions: int = 25_000, asynchronous: bool = False) -> None: # type: ignore
def __init__(
self,
address: Address,
start_block: Block,
load_prices: bool,
num_workers_transactions: int = 25_000,
asynchronous: bool = False,
) -> None: # type: ignore
"""
Initializes the PortfolioAddress instance.
Expand All @@ -76,14 +83,14 @@ def __init__(self, address: Address, portfolio: "Portfolio", num_workers_transac
"""
Flag indicating if the operations are asynchronous.
"""
self.load_prices = portfolio.load_prices
self.load_prices = load_prices
"""
Indicates if price loading is enabled.
"""

super().__init__(portfolio._start_block)
super().__init__(start_block)

self.transactions = AddressTransactionsLedger(self)
self.transactions = AddressTransactionsLedger(self, num_workers_transactions)
"""
Ledger for tracking transactions.
"""
Expand Down
Loading

0 comments on commit 548da7c

Please sign in to comment.