Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use queue instead of semaphore for transaction loading #142

Merged
merged 51 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
4abaa55
feat: use queue instead of semaphore for transaction loading
BobTheBuidler Dec 16, 2024
24aebdf
chore: `black .`
github-actions[bot] Dec 16, 2024
3e64591
Update address.py
BobTheBuidler Dec 16, 2024
335f96d
Update address.py
BobTheBuidler Dec 16, 2024
273d332
chore: `black .`
github-actions[bot] Dec 16, 2024
6ba3f4a
Update address.py
BobTheBuidler Dec 16, 2024
d69f8e1
Update transaction.py
BobTheBuidler Dec 16, 2024
7df6adf
Update transaction.py
BobTheBuidler Dec 16, 2024
1432e25
chore: reduce num workers 50k -> 25k
BobTheBuidler Dec 16, 2024
006ff87
Update portfolio.py
BobTheBuidler Dec 16, 2024
65c12ae
chore: `black .`
github-actions[bot] Dec 16, 2024
388d1ca
Update transaction.py
BobTheBuidler Dec 16, 2024
a83874d
Update transaction.py
BobTheBuidler Dec 16, 2024
fe51c2f
Update transaction.py
BobTheBuidler Dec 16, 2024
5cf4378
chore: `black .`
github-actions[bot] Dec 16, 2024
775b4e8
Update transaction.py
BobTheBuidler Dec 16, 2024
0bec571
Update transaction.py
BobTheBuidler Dec 16, 2024
3479a8b
Update transaction.py
BobTheBuidler Dec 16, 2024
1bcb5e3
Update address.py
BobTheBuidler Dec 16, 2024
2a9008b
chore: `black .`
github-actions[bot] Dec 16, 2024
b8d7d69
Update portfolio.py
BobTheBuidler Dec 16, 2024
2ee5776
chore: `black .`
github-actions[bot] Dec 16, 2024
86dc287
Update portfolio.py
BobTheBuidler Dec 16, 2024
80d8480
Update address.py
BobTheBuidler Dec 16, 2024
569b895
chore: `black .`
github-actions[bot] Dec 16, 2024
bce5c5f
Update address.py
BobTheBuidler Dec 16, 2024
4fff4fe
chore: `black .`
github-actions[bot] Dec 16, 2024
6d3e703
Update address.py
BobTheBuidler Dec 16, 2024
1c857f3
Update address.py
BobTheBuidler Dec 16, 2024
8b38b20
Update address.py
BobTheBuidler Dec 16, 2024
af85451
chore: `black .`
github-actions[bot] Dec 16, 2024
0912a7e
Update address.py
BobTheBuidler Dec 16, 2024
a87bcc0
chore: `black .`
github-actions[bot] Dec 16, 2024
7cf0d8b
Update portfolio.py
BobTheBuidler Dec 16, 2024
c9a8c25
chore: `black .`
github-actions[bot] Dec 16, 2024
3f8d30d
Update portfolio.py
BobTheBuidler Dec 16, 2024
8ddebea
fix: missing import
BobTheBuidler Dec 16, 2024
35a9526
Update address.py
BobTheBuidler Dec 16, 2024
21c142b
Update address.py
BobTheBuidler Dec 16, 2024
d754823
Update address.py
BobTheBuidler Dec 16, 2024
4bb9f22
Update address.py
BobTheBuidler Dec 16, 2024
d1353ce
feat: AddressLedgerBase repr
BobTheBuidler Dec 16, 2024
d2f5ceb
chore: `black .`
github-actions[bot] Dec 16, 2024
58f81e2
Update address.py
BobTheBuidler Dec 16, 2024
0e1e031
Update address.py
BobTheBuidler Dec 16, 2024
63f17b8
chore: `black .`
github-actions[bot] Dec 16, 2024
455653f
Update address.py
BobTheBuidler Dec 16, 2024
e57f6a0
chore: `black .`
github-actions[bot] Dec 16, 2024
de8ea92
fix: missing import
BobTheBuidler Dec 16, 2024
1b8fd46
fix: missing import
BobTheBuidler Dec 16, 2024
493be4e
fix: AddressTransactionLedger.__stop_workers
BobTheBuidler Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 75 additions & 10 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
TYPE_CHECKING,
AsyncGenerator,
AsyncIterator,
Callable,
Generic,
List,
NoReturn,
Optional,
Tuple,
Type,
Expand All @@ -34,10 +36,12 @@
from aiohttp import ClientResponseError
from async_lru import alru_cache
from dank_mids.eth import TraceFilterParams
from eth_typing import ChecksumAddress
from evmspec import FilterTrace
from evmspec.structs.receipt import Status
from evmspec.structs.trace import call, reward
from pandas import DataFrame # type: ignore
from tqdm import tqdm
from y import ERC20
from y._decorators import stuck_coro_debugger
from y.datatypes import Block
Expand All @@ -46,7 +50,7 @@
from eth_portfolio import _exceptions, _loaders
from eth_portfolio._cache import cache_to_disk
from eth_portfolio._decorators import set_end_block_if_none
from eth_portfolio._loaders.transaction import get_nonce_at_block
from eth_portfolio._loaders.transaction import get_nonce_at_block, load_transaction
from eth_portfolio._utils import PandableList, _AiterMixin, get_buffered_chain_height
from eth_portfolio._ydb.token_transfers import TokenTransfers
from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction
Expand Down Expand Up @@ -146,6 +150,9 @@ def __hash__(self) -> int:
"""
return hash(self.address)

def __repr__(self) -> str:
return f"<{type(self).__name__} for {self.address} at {hex(id(self))}>"

@abc.abstractproperty
def _list_type(self) -> Type[_LedgerEntryList]:
"""
Expand Down Expand Up @@ -387,15 +394,18 @@ def _df(self) -> DataFrame:
return df


Nonce = int


class AddressTransactionsLedger(AddressLedgerBase[TransactionsList, Transaction]):
"""
A ledger for managing transaction entries.
"""

_list_type = TransactionsList
__slots__ = ("cached_thru_nonce",)
__slots__ = ("cached_thru_nonce", "_queue", "_ready", "_num_workers", "_workers")

def __init__(self, portfolio_address: "PortfolioAddress"):
def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 25_000):
"""
Initializes the AddressTransactionsLedger instance.

Expand All @@ -407,6 +417,13 @@ def __init__(self, portfolio_address: "PortfolioAddress"):
"""
The nonce through which all transactions have been loaded into memory.
"""
self._queue = asyncio.Queue()
self._ready = asyncio.Queue()
self._num_workers = num_workers
self._workers = []

def __del__(self) -> None:
self.__stop_workers()

@set_end_block_if_none
@stuck_coro_debugger
Expand All @@ -425,16 +442,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
return
end_block_nonce: int = await get_nonce_at_block(self.address, end_block)
if nonces := list(range(self.cached_thru_nonce + 1, end_block_nonce + 1)):
coros = [
_loaders.load_transaction(self.address, nonce, self.load_prices) for nonce in nonces
]
for nonce in nonces:
self._queue.put_nowait(nonce)

self._ensure_workers(min(len(nonces), self._num_workers))

transactions = []
transaction: Transaction
async for nonce, transaction in a_sync.as_completed( # type: ignore [assignment]
coros, aiter=True, tqdm=True, desc=f"Transactions {self.address}"
):
transaction: Optional[Transaction]
for _ in tqdm(nonces, desc=f"Transactions {self.address}"):
nonce, transaction = await self._ready.get()
if transaction:
if isinstance(transaction, Exception):
raise transaction
transactions.append(transaction)
yield transaction
elif nonce == 0 and self.cached_thru_nonce == -1:
Expand All @@ -444,6 +463,8 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
# NOTE Are we sure this is the correct way to handle this scenario? Are we sure it will ever even occur with the new gnosis handling?
logger.warning("No transaction with nonce %s for %s", nonce, self.address)

self.__stop_workers()

if transactions:
self.objects.extend(transactions)
if self.objects:
Expand All @@ -455,6 +476,50 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
if self.cached_thru is None or end_block > self.cached_thru:
self.cached_thru = end_block

def _ensure_workers(self, num_workers: int) -> None:
len_workers = len(self._workers)
if len_workers < num_workers:
logger.info("ensuring %s workers for %s", num_workers, self)

create_task = asyncio.create_task
worker_fn = self.__worker_fn
address = self.address
load_prices = self.load_prices
queue_get = stuck_coro_debugger(self._queue.get)
put_ready = self._ready.put_nowait

self._workers.extend(
create_task(worker_fn(address, load_prices, queue_get, put_ready))
for _ in range(num_workers - len_workers)
)
logger.info(f"{self} workers: {self._workers}")

@staticmethod
async def __worker_fn(
address: ChecksumAddress,
load_prices: bool,
queue_get: Callable[[], Nonce],
put_ready: Callable[[Nonce, Optional[Transaction]], None],
) -> NoReturn:
try:
while True:
nonce = await queue_get()
try:
put_ready(await load_transaction(address, nonce, load_prices))
except Exception as e:
put_ready((nonce, e))
except Exception as e:
logger.error(f"%s in %s __worker_coro", type(e), self)
logger.exception(e)
raise

def __stop_workers(self) -> None:
logger.info("stopping workers for %s", self)
workers = self._workers
pop_next = workers.pop
for _ in range(len(workers)):
pop_next().cancel()


class InternalTransfersList(PandableList[InternalTransfer]):
"""
Expand Down
158 changes: 84 additions & 74 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import asyncio
import itertools
import logging
from collections import defaultdict
from typing import DefaultDict, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -39,8 +40,6 @@

nonces: DefaultDict[Address, Nonces] = defaultdict(lambda: defaultdict(int))

_semaphore = a_sync.Semaphore(10_000, name="load transaction db semaphore")


@eth_retry.auto_retry
@stuck_coro_debugger
Expand All @@ -66,12 +65,11 @@ async def load_transaction(
>>> print(await load_transaction(address="0x1234567890abcdef1234567890abcdef12345678", nonce=5, load_prices=True))
(5, Transaction(...))
"""
async with _semaphore:
if transaction := await db.get_transaction(address, nonce):
if load_prices and transaction.price is None:
await db.delete_transaction(transaction)
else:
return nonce, transaction
if transaction := await db.get_transaction(address, nonce):
if load_prices and transaction.price is None:
await db.delete_transaction(transaction)
else:
return nonce, transaction

block = await get_block_for_nonce(address, nonce)
tx = await get_transaction_by_nonce_and_block(address, nonce, block)
Expand All @@ -96,78 +94,90 @@ async def load_transaction(
return nonce, transaction


_nonce_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict(
lambda: asyncio.Semaphore(50_000)
)
_nonce_cache_semaphores: DefaultDict[Address, asyncio.Semaphore] = defaultdict(
lambda: asyncio.Semaphore(100)
)
@alru_cache(maxsize=None, ttl=5)
async def _get_block_number():
return await dank_mids.eth.block_number


_nonce_cache_locks: DefaultDict[Address, asyncio.Lock] = defaultdict(asyncio.Lock)


async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
hi = None
async with _nonce_cache_locks[address]:
highest_known_nonce_lower_than_query = None
lowest_known_nonce_greater_than_query = None

# it is impossible for n to == nonce
for less_than, ns in itertools.groupby(nonces[address], lambda n: n < nonce):
if less_than:
max_value = max(ns)
if (
highest_known_nonce_lower_than_query is None
or max_value > highest_known_nonce_lower_than_query
):
highest_known_nonce_lower_than_query = max_value

async with _nonce_semaphores[address]:

async with _nonce_cache_semaphores[address]:
if known_nonces_less_than_query := [n for n in nonces[address] if n < nonce]:
highest_known_nonce_lower_than_query = max(known_nonces_less_than_query)
block_at_known_nonce = nonces[address][highest_known_nonce_lower_than_query]
lo = block_at_known_nonce
del highest_known_nonce_lower_than_query, block_at_known_nonce
else:
lo = 0

if known_nonces_greater_than_query := [n for n in nonces[address] if n > nonce]:
lowest_known_nonce_greater_than_query = min(known_nonces_greater_than_query)
block_at_known_nonce = nonces[address][lowest_known_nonce_greater_than_query]
hi = block_at_known_nonce
del lowest_known_nonce_greater_than_query, block_at_known_nonce

del known_nonces_less_than_query, known_nonces_greater_than_query

# lets find the general area first before we proceed with our binary search
range_size = hi - lo + 1
if range_size > 4:
num_chunks = _get_num_chunks(range_size)
chunk_size = range_size // num_chunks
points: Dict[int, Nonce] = await a_sync.gather(
{
point: get_nonce_at_block(address, point)
for point in (lo + i * chunk_size for i in range(num_chunks))
}
)

for block, _nonce in points.items():
if _nonce >= nonce:
hi = block
break
lo = block

del num_chunks, chunk_size, points, block

del range_size

hi = hi or await dank_mids.eth.block_number

while True:
_nonce = await get_nonce_at_block(address, lo)

if _nonce < nonce:
old_lo = lo
lo += int((hi - lo) / 2) or 1
logger.debug("Nonce at %s is %s, checking higher block %s", old_lo, _nonce, lo)
continue

prev_block_nonce: int = await get_nonce_at_block(address, lo - 1)
if prev_block_nonce >= nonce:
hi = lo
lo = int(lo / 2)
logger.debug("Nonce at %s is %s, checking lower block %s", hi, _nonce, lo)
continue

logger.debug("Found nonce %s at block %s", nonce, lo)
return lo
min_value = min(ns)
if (
lowest_known_nonce_greater_than_query is None
or min_value < lowest_known_nonce_greater_than_query
):
lowest_known_nonce_greater_than_query = min_value

if highest_known_nonce_lower_than_query is not None:
lo = nonces[address][highest_known_nonce_lower_than_query]
else:
lo = 0

if lowest_known_nonce_greater_than_query is not None:
hi = nonces[address][lowest_known_nonce_greater_than_query]

del highest_known_nonce_lower_than_query, lowest_known_nonce_greater_than_query

# lets find the general area first before we proceed with our binary search
range_size = hi - lo + 1
if range_size > 4:
num_chunks = _get_num_chunks(range_size)
chunk_size = range_size // num_chunks
points: Dict[int, Nonce] = await a_sync.gather(
{
point: get_nonce_at_block(address, point)
for point in (lo + i * chunk_size for i in range(num_chunks))
}
)

for block, _nonce in points.items():
if _nonce >= nonce:
hi = block
break
lo = block

del num_chunks, chunk_size, points, block

del range_size

hi = hi or await _get_block_number()

while True:
_nonce = await get_nonce_at_block(address, lo)

if _nonce < nonce:
old_lo = lo
lo += int((hi - lo) / 2) or 1
logger.debug("Nonce at %s is %s, checking higher block %s", old_lo, _nonce, lo)
continue

prev_block_nonce: int = await get_nonce_at_block(address, lo - 1)
if prev_block_nonce >= nonce:
hi = lo
lo = int(lo / 2)
logger.debug("Nonce at %s is %s, checking lower block %s", hi, _nonce, lo)
continue

logger.debug("Found nonce %s at block %s", nonce, lo)
return lo


async def _insert_to_db(transaction: structs.Transaction, load_prices: bool) -> None:
Expand Down
15 changes: 11 additions & 4 deletions eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ class PortfolioAddress(_LedgeredBase[AddressLedgerBase]):
Represents a portfolio address within the eth-portfolio system.
"""

def __init__(self, address: Address, portfolio: "Portfolio", num_workers_transactions: int = 25_000, asynchronous: bool = False) -> None: # type: ignore
def __init__(
self,
address: Address,
start_block: Block,
load_prices: bool,
num_workers_transactions: int = 25_000,
asynchronous: bool = False,
) -> None: # type: ignore
"""
Initializes the PortfolioAddress instance.

Expand All @@ -76,14 +83,14 @@ def __init__(self, address: Address, portfolio: "Portfolio", num_workers_transac
"""
Flag indicating if the operations are asynchronous.
"""
self.load_prices = portfolio.load_prices
self.load_prices = load_prices
"""
Indicates if price loading is enabled.
"""

super().__init__(portfolio._start_block)
super().__init__(start_block)

self.transactions = AddressTransactionsLedger(self)
self.transactions = AddressTransactionsLedger(self, num_workers_transactions)
"""
Ledger for tracking transactions.
"""
Expand Down
Loading
Loading