From 6def4228141ed380a9b48386b46b99ca0758c4b2 Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Tue, 17 Dec 2024 15:17:26 +0000 Subject: [PATCH] feat: optimize loading internal transfers at startup --- eth_portfolio/_ledgers/address.py | 51 ++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/eth_portfolio/_ledgers/address.py b/eth_portfolio/_ledgers/address.py index 66c210d1..258ab219 100644 --- a/eth_portfolio/_ledgers/address.py +++ b/eth_portfolio/_ledgers/address.py @@ -673,28 +673,45 @@ async def _load_new_objects( a_sync.as_completed, tqdm=True, desc=f"Trace Filters {self.address}" ) - if tasks := [ - create_task( - coro=InternalTransfer.from_trace(trace, self.load_prices), - name="InternalTransfer.from_trace", - ) - for traces in generator_function(trace_filter_coros) - for trace in await traces + if traces := [ + trace for traces in generator_function(trace_filter_coros) for trace in await traces ]: internal_transfers = [] append_transfer = internal_transfers.append + load = InternalTransfer.from_trace + tqdm_desc = f"Internal Transfers {self.address}" done = 0 - async for internal_transfer in a_sync.as_completed( - tasks, aiter=True, tqdm=True, desc=f"Internal Transfers {self.address}" - ): - if internal_transfer: - append_transfer(internal_transfer) - yield internal_transfer - - done += 1 - if done % 100 == 0: - await sleep(0) + if self.load_prices: + tasks = ( + create_task( + coro=load(trace, load_prices=True), name="InternalTransfer.from_trace" + ) + for trace in traces + ) + del traces + async for internal_transfer in a_sync.as_completed( + tasks, aiter=True, tqdm=True, desc=tqdm_desc + ): + if internal_transfer is not None: + append_transfer(internal_transfer) + yield internal_transfer + + done += 1 + if done % 100 == 0: + await sleep(0) + + else: + pop_next_trace = traces.pop + for _ in tqdm(tuple(range(len(traces))), desc=tqdm_desc): + internal_transfer = await load(pop_next_trace(), load_prices=False) + if internal_transfer is not None: + append_transfer(internal_transfer) + yield internal_transfer + + done += 1 + if done % 100 == 0: + await sleep(0) if internal_transfers: self.objects.extend(internal_transfers)