From 1add5a776decbdc1c15a57b400252b91e63cefa4 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 6 Jan 2024 18:38:22 +0100 Subject: [PATCH] wallet tracking optimization to reduce the dynamodb load --- release.py | 2 +- tracker.py | 4 +- tracker/ethereum/wallet_tracker.py | 287 +++++++++++---------- tracker/ethereum/wallet_tracker_storage.py | 17 ++ web3_wrapper.py | 5 +- 5 files changed, 179 insertions(+), 136 deletions(-) diff --git a/release.py b/release.py index 22e06a2..129c408 100644 --- a/release.py +++ b/release.py @@ -9,5 +9,5 @@ __metaclass__ = type # pylint: disable=invalid-name -__version__ = "0.2.3" +__version__ = "v0.2.3" __author__ = "F." diff --git a/tracker.py b/tracker.py index 1cbe636..fcdf809 100644 --- a/tracker.py +++ b/tracker.py @@ -189,9 +189,7 @@ async def main(self): await self.application.initialize() gas_tracker = GasTracker(application=self.application, logger=self.logger) - wallet_tracker = WalletTracker( - application=self.application, logger=self.logger - ) + wallet_tracker = WalletTracker(application=self.application) # Define conversation handler for '/track' command track_conv_handler = ConversationHandler( diff --git a/tracker/ethereum/wallet_tracker.py b/tracker/ethereum/wallet_tracker.py index 39e096c..f8b6ebe 100644 --- a/tracker/ethereum/wallet_tracker.py +++ b/tracker/ethereum/wallet_tracker.py @@ -2,9 +2,10 @@ import asyncio import re +import time import aiohttp -from aiohttp import ClientOSError, ClientSession, ClientSSLError, HttpVersion11 +from aiohttp import ClientOSError, ClientSession, ClientSSLError from botocore.exceptions import ClientError from telegram import MessageEntity, Update from telegram.error import BadRequest, NetworkError, TelegramError @@ -18,20 +19,19 @@ from web3_wrapper import Web3Wrapper config = ConfigHandler() +logger = Logger.get_logger("tracker") class WalletTracker(metaclass=SingletonMeta): """Tracker class.""" - def __init__(self, application: Application, logger: Logger): + def __init__(self, application: Application): """Initialize the Tracker class.""" - self.logger = logger - # The Telegram Application self.application: Application = application - self.web3_wrapper = Web3Wrapper(self.logger) + self.web3_wrapper = Web3Wrapper(logger) self.storage = WalletTrackerStorage() @@ -39,8 +39,10 @@ async def update_last_checked_block( self, chat_id, wallet_address, new_last_checked_block ): """Update the last checked block for a wallet.""" - self.logger.debug( - f"Updating last checked block for {wallet_address} to {new_last_checked_block}" + logger.debug( + "Updating last checked block for %s to %s", + wallet_address, + new_last_checked_block, ) try: # Fetch the current list of wallets for the user @@ -54,29 +56,44 @@ async def update_last_checked_block( # Update the tracked_wallets attribute in the database await self.storage.update_wallet(chat_id, tracked_wallets) - self.logger.info( - f"Updated last_checked_block for {wallet_address} to {new_last_checked_block}" + logger.info( + "Updated last_checked_block for %s to %s", + wallet_address, + new_last_checked_block, ) except ClientError as e: - self.logger.error( - f"Failed to update last checked block for wallet {wallet_address}: {e}" + logger.error( + "Failed to update last checked block for wallet %s: %s", + wallet_address, + e, ) async def monitor_wallet_transactions(self): """Regularly check tracked wallets for new transactions.""" - self.logger.info("Starting monitor_wallet_transactions task") + logger.info("Starting monitor_wallet_transactions task") while True: try: # Fetch all tracked wallets from the storage items = await self.storage.get_all() + logger.debug("Tracked wallets: %s", items) for item in items: - chat_id = int(item["chat_id"]) + try: + logger.debug("Checking item: %s", item) + logger.debug("Item type: %s", type(item)) + chat_id = int(item["chat_id"]) + except (KeyError, ValueError, TypeError) as e: + logger.error( + "Invalid chat_id for item %s: %s", + item, + e, + ) + continue # Skip this user if the chat_id is missing or invalid tracked_wallets = item.get("tracked_wallets", []) for wallet in tracked_wallets: is_paused = wallet.get("paused", False) - self.logger.debug( + logger.debug( "Checking wallet %s for paused state: %s", wallet["wallet_address"], is_paused, @@ -89,7 +106,7 @@ async def monitor_wallet_transactions(self): try: last_checked_block = int(wallet["last_checked_block"]) except (KeyError, ValueError): - self.logger.error( + logger.error( "Invalid last_checked_block for wallet %s: %s", wallet_address, wallet["last_checked_block"], @@ -105,7 +122,7 @@ async def monitor_wallet_transactions(self): ) continue # Skip this wallet if the last_checked_block is missing or invalid - self.logger.debug( + logger.debug( "Checking wallet %s for new transactions since block %s", wallet_address, last_checked_block, @@ -118,14 +135,14 @@ async def monitor_wallet_transactions(self): ) ) except Exception as e: # pylint: disable=broad-except - self.logger.exception( + logger.exception( "Failed to check wallet %s for new transactions: %s", wallet_address, e, ) if new_last_checked_block: - self.logger.debug( + logger.debug( "Updating last checked block for wallet %s to %s", wallet_address, new_last_checked_block, @@ -134,15 +151,12 @@ async def monitor_wallet_transactions(self): chat_id, wallet_address, new_last_checked_block ) - await asyncio.sleep(15) # Check every 15 seconds, adjust as needed. - except asyncio.CancelledError: # Handle the cancellation - self.logger.warning("Wallet Tracker monitor task was cancelled") + logger.warning("Wallet Tracker monitor task was cancelled") return # Ensure immediate exit except ClientError as e: - self.logger.error("Failed to fetch tracked wallets: %s", e) - await asyncio.sleep(15) + logger.error("Failed to fetch tracked wallets: %s", e) async def __send_transaction_details( self, message_text, chat_id, wallet_address, new_last_checked_block @@ -150,24 +164,13 @@ async def __send_transaction_details( """Send the transaction details to the user.""" try: chat_id = int(chat_id) - self.logger.debug(message_text) + logger.debug(message_text) await self.application.bot.send_message( chat_id=chat_id, text=message_text, parse_mode="Markdown", + disable_web_page_preview=True, ) - - if new_last_checked_block: - try: - await self.update_last_checked_block( - chat_id, - wallet_address, - new_last_checked_block, - ) - except ClientError as e: - self.logger.error("Failed to update last checked block: %s", e) - - await asyncio.sleep(15) # Wait 15 second between messages to avoid flooding except ( aiohttp.ClientError, ClientSSLError, @@ -176,7 +179,7 @@ async def __send_transaction_details( TelegramError, BadRequest, ) as ex: - self.logger.error( + logger.error( "Failed to send message to %s: %s", chat_id, ex, @@ -202,13 +205,17 @@ async def __check_wallet_transactions( "sort": "asc", } async with ClientSession() as session: + start_time = time.time() + total_transactions = 0 async with session.get(config.etherscan_api_url, params=params) as response: if response.status == 200: data = await response.json() - self.logger.debug("Response from Etherscan: %s", data) + logger.debug("Response from Etherscan: %s", data) transactions = data.get("result", []) if transactions: + total_transactions = len(transactions) + new_last_checked_block = int(transactions[-1]["blockNumber"]) for tx in transactions: # Translate the contract address to a ticker or name and format the message @@ -217,6 +224,14 @@ async def __check_wallet_transactions( message_text = await self.__process_transaction( tx, wallet_address, chat_id ) + + # processing_time = time.time() - start_time + sleep_time = self.__calculate_sleep_time( + total_transactions, + time.time() - start_time, + max_sleep_time=15, # Set max sleep time to 60 seconds + ) + await self.__send_transaction_details( message_text, chat_id, @@ -224,6 +239,12 @@ async def __check_wallet_transactions( new_last_checked_block, ) + logger.debug( + "Transaction details sent. Sleeping for %s", sleep_time + ) + + await asyncio.sleep(sleep_time) + return new_last_checked_block return None @@ -239,6 +260,7 @@ async def __process_transaction(self, tx, wallet_address, chat_id) -> str: gas_price = int(tx["gasPrice"]) gas_paid = gas_used * gas_price / 10**18 # Convert from wei to ETH block_number = int(tx["blockNumber"]) + tx_hash = tx.get("hash", "") # Determine the direction of the transaction direction = ( @@ -279,74 +301,46 @@ async def __process_transaction(self, tx, wallet_address, chat_id) -> str: if wallet_tag: wallet_tag_section = f"Tag: {wallet_tag}" + # try to resolve to ENS name + __ens_addr = self.web3_wrapper.get_ens_name(from_address) + if __ens_addr: + from_address = __ens_addr + __ens_addr = None + __ens_addr = self.web3_wrapper.get_ens_name(to_address) + if __ens_addr: + to_address = __ens_addr + # Construct the alert message message_text = ( f"*{direction} Transaction Alert*\n" f"*{wallet_tag_section}*\n" - f"{'From' if direction == 'Outgoing' else 'To'}: `{from_address}`\n" + f"From `{from_address}`\n" + f"To: `{to_address}`\n" f"Asset: {asset_description}\n" f"Value: {str(value_eth)} ETH{str(value_usd_text)}\n" f"Gas Paid: {str(gas_paid)} ETH{str(gas_paid_usd_text)}\n" - f"Block: {str(block_number)}" + f"Block: {str(block_number)}\n\n" + f"[View on Etherscan](https://etherscan.io/tx/{tx_hash})" ) return message_text - # async def __get_token_details(self, token_address): - # """Get the token name and symbol from the Etherscan API.""" - # url = f"{config.etherscan_api_url}&module=token&action=tokeninfo&contractaddress={token_address}" - - # async with ClientSession() as session: - # try: - # async with session.get(url) as response: - # if response.status == 200: - # data = await response.json() - # # Check if the response is successful and has the necessary information - # if ( - # data["status"] == "1" - # and "result" in data - # and len(data["result"]) > 0 - # ): - # token_info = data["result"][0] - # token_name = token_info.get("tokenName", "Unknown Token") - # token_symbol = token_info.get("tokenSymbol", "Unidentified") - # return token_name, token_symbol - - # self.logger.error( - # "Failed to retrieve token details or no details available for address: %s", - # token_address, - # ) - # else: - # self.logger.error( - # "Etherscan API response error, status: %s", response.status - # ) - # except (ClientError, ClientSSLError, ClientOSError) as ex: - # self.logger.error( - # "Exception occurred while retrieving token details: %s", str(ex) - # ) - # return "Unknown Token", "Unidentified" - - async def __get_eth_price_usd(self): - """Get the current price of Ethereum in USD.""" - url = "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd" + async def list_tracked_wallets(self, update: Update, context: CallbackContext): + """List all tracked wallets.""" + chat_id = update.message.chat_id + message = "🔍 *Currently tracking the following wallets:*\n" + tracked_wallets = await self.storage.get_tracked_wallets(chat_id) - async with ClientSession(version=HttpVersion11) as session: - try: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - # Extract the price of Ethereum in USD - return data["ethereum"]["usd"] - - self.logger.error( - "Failed to retrieve ETH price, status: %s", response.status - ) + for wallet in tracked_wallets: + wallet_address = wallet["wallet_address"] + last_checked_block = wallet["last_checked_block"] + wallet_tag = wallet.get("wallet_tag", "No wallet_tag provided") + status = "Paused" if wallet.get("paused", False) else "Active" + message += f"- `{wallet_address}`\nTag: `{wallet_tag}`\nfrom block {last_checked_block}\nstatus: {status}\n" - return None # Return None if there was an error - except (ClientSSLError, ClientOSError) as ex: - self.logger.error( - "Exception occurred while retrieving ETH price: %s", str(ex) - ) + if not tracked_wallets: + message = "🔍 You are not currently tracking any wallets." + await update.message.reply_text(message, parse_mode="Markdown") async def ask_for_wallet_to_resolve(self, update: Update, context: CallbackContext): """Ask the user for the wallet address to resolve.""" @@ -361,26 +355,26 @@ async def received_wallet_to_resolve( """Handle the received wallet address and resolve it.""" wallet_address = update.message.text.strip() # chat_id = update.message.chat_id - self.logger.debug("Received wallet address to resolve: %s", wallet_address) + logger.debug("Received wallet address to resolve: %s", wallet_address) if self.__is_valid_wallet(wallet_address): resolved_address = self.web3_wrapper.resolve_ens(wallet_address) if resolved_address: message_text = f"✅ Resolved wallet address: `{wallet_address}` to `{resolved_address}`" - self.logger.debug(message_text) + logger.debug(message_text) await update.message.reply_text(message_text, parse_mode="Markdown") else: await update.message.reply_text( "❌ Failed to resolve the wallet. Please check the address and try again." ) - self.logger.debug( + logger.debug( "Failed to resolve the wallet. Please check the address and try again." ) else: await update.message.reply_text( "❌ Invalid wallet address. Please try again." ) - self.logger.debug("Invalid wallet address. Please try again.") + logger.debug("Invalid wallet address. Please try again.") return ConversationHandler.END @@ -423,7 +417,7 @@ async def received_wallet(self, update: Update, context: CallbackContext): # Proceed if the wallet address and tag are valid current_block = self.web3_wrapper.get_block_number() - self.logger.debug("Current block: %s", current_block) + logger.debug("Current block: %s", current_block) if current_block: wallet_address_resolved = await self.storage.add_wallet( @@ -440,7 +434,7 @@ async def received_wallet(self, update: Update, context: CallbackContext): message += f" with tag {wallet_tag}" message += f" from block {current_block}" - self.logger.debug(message) + logger.debug(message) # Define the position and length of the hashtag entity tag_position = message.find(wallet_tag) @@ -462,23 +456,6 @@ async def received_wallet(self, update: Update, context: CallbackContext): return ConversationHandler.END - def __is_valid_tag(self, tag): - """Check if the tag is valid.""" - # Define a regular expression for a valid Telegram hashtag (letters, numbers, and underscores only) - tag_pattern = r"^#[A-Za-z0-9_]+$" - return re.match(tag_pattern, tag) is not None - - def __is_valid_wallet(self, wallet_address: str): - """Check if the wallet address is valid.""" - if len(wallet_address) == 42 and wallet_address.startswith("0x"): - # Check if the wallet address is a valid Ethereum address - return self.web3_wrapper.is_address(wallet_address) - # validate ENS name - if wallet_address.endswith(".eth"): - # Validate ENS domain names - return self.web3_wrapper.is_valid_ens_domain(wallet_address) - return False - async def ask_for_wallet_untrack(self, update: Update, context: CallbackContext): """Ask the user for the wallet address to stop tracking.""" await update.message.reply_text( @@ -566,19 +543,69 @@ async def received_wallet_to_resume(self, update: Update, context: CallbackConte parse_mode="Markdown", ) - async def list_tracked_wallets(self, update: Update, context: CallbackContext): - """List all tracked wallets.""" - chat_id = update.message.chat_id - message = "🔍 *Currently tracking the following wallets:*\n" - tracked_wallets = await self.storage.get_tracked_wallets(chat_id) + async def __get_eth_price_usd(self): + """Get the current price of ETH in USD from a provider.""" + providers = [ + ( + "CoinGecko", + "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd", + ), + ( + "CryptoCompare", + "https://min-api.cryptocompare.com/data/price?fsym=ETH&tsyms=USD", + ), + ] - for wallet in tracked_wallets: - wallet_address = wallet["wallet_address"] - last_checked_block = wallet["last_checked_block"] - wallet_tag = wallet.get("wallet_tag", "No wallet_tag provided") - status = "Paused" if wallet.get("paused", False) else "Active" - message += f"- `{wallet_address}`\nTag: `{wallet_tag}`\nfrom block {last_checked_block}\nstatus: {status}\n" + async with ClientSession() as session: + for name, url in providers: + try: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + # Extract and return price depending on the provider's response structure + return ( + data["ethereum"]["usd"] + if name == "CoinGecko" + else data["USD"] + ) + except (ClientSSLError, ClientOSError) as ex: + logger.error( + "Exception occurred while retrieving ETH price: %s", str(ex) + ) + return None # Return None if all providers fail - if not tracked_wallets: - message = "🔍 You are not currently tracking any wallets." - await update.message.reply_text(message, parse_mode="Markdown") + def __is_valid_tag(self, tag): + """Check if the tag is valid.""" + # Define a regular expression for a valid Telegram hashtag (letters, numbers, and underscores only) + tag_pattern = r"^#[A-Za-z0-9_]+$" + return re.match(tag_pattern, tag) is not None + + def __is_valid_wallet(self, wallet_address: str): + """Check if the wallet address is valid.""" + if len(wallet_address) == 42 and wallet_address.startswith("0x"): + # Check if the wallet address is a valid Ethereum address + return self.web3_wrapper.is_address(wallet_address) + # validate ENS name + if wallet_address.endswith(".eth"): + # Validate ENS domain names + return self.web3_wrapper.is_valid_ens_domain(wallet_address) + return False + + @staticmethod + def __calculate_sleep_time(total_transactions, processing_time, max_sleep_time=60): + """Dynamically calculate sleep time based on activity and processing time.""" + # Base sleep time on a minimum threshold to prevent zero or negative values + base_sleep_time = max( + 7, processing_time * 0.5 + ) # 50% of the processing time or 7 seconds, whichever is higher + + # Adjust sleep time based on transaction volume + if total_transactions > 100: # High number of transactions + sleep_time = base_sleep_time * 2 + elif total_transactions > 50: # Moderate number of transactions + sleep_time = base_sleep_time * 1.5 + else: + sleep_time = base_sleep_time + + # Ensure the sleep time does not exceed the maximum allowed + return min(sleep_time, max_sleep_time) diff --git a/tracker/ethereum/wallet_tracker_storage.py b/tracker/ethereum/wallet_tracker_storage.py index 46e8a53..69faf4e 100644 --- a/tracker/ethereum/wallet_tracker_storage.py +++ b/tracker/ethereum/wallet_tracker_storage.py @@ -145,10 +145,19 @@ async def update_wallet(self, chat_id, tracked_wallets): async def get_all(self): """Get all the items in the table.""" try: + # fetch all items from cache first + if self.tracked_wallets_cache: + logger.debug("Returning all items from cache") + logger.debug(self.tracked_wallets_cache) + return self.__format_wallet_data(self.tracked_wallets_cache) + + logger.warning("Returning all items from database") response = self.table.scan() + if "Items" not in response: return [] return response.get("Items", []) + except ClientError as e: logger.error("Error getting all items: %s", e) return [] @@ -198,3 +207,11 @@ async def handle_wallet_state(self, chat_id, wallet_address, paused: bool = True logger.error( "Failed to set paused state for wallet %s: %s", wallet_address, e ) + + @staticmethod + def __format_wallet_data(wallet_data): + """Format the wallet data to ensure consistency.""" + formatted_data = [] + for chat_id, wallets in wallet_data.items(): + formatted_data.append({"chat_id": chat_id, "tracked_wallets": wallets}) + return formatted_data diff --git a/web3_wrapper.py b/web3_wrapper.py index dc5c029..186e2d7 100644 --- a/web3_wrapper.py +++ b/web3_wrapper.py @@ -4,6 +4,7 @@ from ens import ENS # pylint: disable=import-error from ens.exceptions import InvalidName, ResolverNotFound, UnauthorizedError from web3 import HTTPProvider, Web3 +from web3.types import ChecksumAddress from core import SingletonMeta from tracker.config import ConfigHandler @@ -24,7 +25,7 @@ def __init__(self, logger: Logger) -> None: self.web3 = Web3(self.provider) self.logger = logger - def resolve_ens(self, ens_name): + def resolve_ens(self, ens_name) -> ChecksumAddress | None: """Resolve an ENS name to an address.""" self.logger.info("Resolving ENS name: %s", ens_name) @@ -36,7 +37,7 @@ def resolve_ens(self, ens_name): self.logger.error("Error resolving ENS name: %s", e) return None - def get_ens_name(self, eth_address): + def get_ens_name(self, eth_address) -> str | None: """Get the ENS name of an address.""" self.logger.info("Getting ENS name of address: %s", eth_address)