diff --git a/crates/erc20_payment_lib/src/service.rs b/crates/erc20_payment_lib/src/service.rs index 2f1e543a..a5030884 100644 --- a/crates/erc20_payment_lib/src/service.rs +++ b/crates/erc20_payment_lib/src/service.rs @@ -75,7 +75,7 @@ pub async fn transaction_from_chain_and_into_db( glm_address: Address, get_balances: bool, ) -> Result, PaymentError> { - println!("tx_hash: {tx_hash}"); + log::debug!("tx_hash: {tx_hash}"); let tx_hash = web3::types::H256::from_str(tx_hash) .map_err(|_err| ConversionError::from("Cannot parse tx_hash".to_string())) .map_err(err_from!())?; @@ -84,7 +84,7 @@ pub async fn transaction_from_chain_and_into_db( .await .map_err(err_from!())? { - log::info!("Transaction already in DB: {}, skipping...", chain_tx.id); + log::warn!("Transaction already in DB: {}, skipping...", chain_tx.id); return Ok(Some(chain_tx)); } @@ -125,7 +125,7 @@ pub async fn transaction_from_chain_and_into_db( tokio::time::sleep(std::time::Duration::from_millis(100)).await; }; - log::info!( + log::debug!( "Balance: {:.5} for block {}", balance.unwrap_or_default().to_eth().unwrap(), chain_tx_dao.block_number @@ -228,7 +228,7 @@ pub async fn transaction_from_chain_and_into_db( } db_transaction.commit().await.map_err(err_from!())?; - log::info!("Transaction found and parsed successfully: {}", tx.id); + log::debug!("Transaction found and parsed successfully: {}", tx.id); Ok(Some(tx)) } diff --git a/crates/erc20_payment_lib/src/transaction.rs b/crates/erc20_payment_lib/src/transaction.rs index 2176acf0..d136af36 100644 --- a/crates/erc20_payment_lib/src/transaction.rs +++ b/crates/erc20_payment_lib/src/transaction.rs @@ -1163,7 +1163,7 @@ pub struct ImportErc20TxsArgs { } pub async fn import_erc20_txs(import_args: ImportErc20TxsArgs) -> Result, PaymentError> { - let mut start_block = import_args.start_block; + let start_block = import_args.start_block; let option_address_to_option_h256 = |val: Option>| -> Option> { val.map(|accounts| { accounts @@ -1189,43 +1189,46 @@ pub async fn import_erc20_txs(import_args: ImportErc20TxsArgs) -> Result::new(); - loop { - let end_block = std::cmp::min( - std::cmp::min(start_block + 1000, current_block), - import_args.scan_end_block, + + let end_block = import_args.scan_end_block; + if start_block > end_block { + return Err(err_custom_create!("Start block is greater than end block")); + } + if end_block - start_block > import_args.blocks_at_once as i64 { + return Err(err_custom_create!("Too many blocks to scan")); + } + if start_block > current_block { + return Err(err_custom_create!( + "Start block is greater than current block" + )); + } + log::debug!("Scanning chain, blocks: {start_block} - {end_block}"); + let logs = get_erc20_logs( + import_args.web3.clone(), + import_args.erc20_address, + topic_senders.clone(), + topic_receivers.clone(), + start_block, + end_block, + ) + .await?; + for log in logs.into_iter() { + txs.insert( + log.transaction_hash + .ok_or(err_custom_create!("Log without transaction hash"))?, + log.block_number + .ok_or(err_custom_create!("Log without block number"))? + .as_u64(), + ); + log::info!( + "Found matching log entry in block: {}, tx: {}", + log.block_number.unwrap(), + log.block_number.unwrap() ); - if start_block > end_block { - break; - } - log::info!("Scanning chain, blocks: {start_block} - {end_block}"); - let logs = get_erc20_logs( - import_args.web3.clone(), - import_args.erc20_address, - topic_senders.clone(), - topic_receivers.clone(), - start_block, - end_block, - ) - .await?; - for log in logs.into_iter() { - txs.insert( - log.transaction_hash - .ok_or(err_custom_create!("Log without transaction hash"))?, - log.block_number - .ok_or(err_custom_create!("Log without block number"))? - .as_u64(), - ); - log::info!( - "Found matching log entry in block: {}, tx: {}", - log.block_number.unwrap(), - log.block_number.unwrap() - ); - } - start_block += import_args.blocks_at_once as i64; } if txs.is_empty() { - log::info!("No logs found"); + log::debug!("No logs found"); } else { log::info!("Found {} transactions", txs.len()); } diff --git a/src/actions/scan_chain.rs b/src/actions/scan_chain.rs index 041e2cc4..72edb311 100644 --- a/src/actions/scan_chain.rs +++ b/src/actions/scan_chain.rs @@ -1,5 +1,5 @@ use crate::options::ScanBlockchainOptions; -use erc20_payment_lib::config::Config; +use erc20_payment_lib::config::{Chain, Config}; use erc20_payment_lib::service::transaction_from_chain_and_into_db; use erc20_payment_lib::setup::PaymentSetup; use erc20_payment_lib::transaction::{import_erc20_txs, ImportErc20TxsArgs}; @@ -7,16 +7,184 @@ use erc20_payment_lib_common::error::ErrorBag; use erc20_payment_lib_common::error::PaymentError; use erc20_payment_lib_common::model::ScanDaoDbObj; use erc20_payment_lib_common::ops::{delete_scan_info, get_scan_info, upsert_scan_info}; +use erc20_payment_lib_common::{err_custom_create, err_from}; +use erc20_rpc_pool::Web3RpcPool; use sqlx::SqlitePool; use std::str::FromStr; +use std::sync::Arc; use web3::types::Address; +#[allow(clippy::too_many_arguments)] +async fn scan_int( + conn: SqlitePool, + scan_blockchain_options: &ScanBlockchainOptions, + chain_cfg: Chain, + web3: Arc, + start_block: i64, + end_block: i64, + current_block: i64, + sender: Option
, +) -> Result<(), PaymentError> { + let txs = import_erc20_txs(ImportErc20TxsArgs { + web3: web3.clone(), + erc20_address: chain_cfg.token.address, + chain_id: chain_cfg.chain_id, + filter_by_senders: sender.map(|sender| [sender].to_vec()), + filter_by_receivers: None, + start_block, + scan_end_block: end_block, + blocks_at_once: scan_blockchain_options.blocks_at_once, + }) + .await + .map_err(|e| { + log::error!("Error when importing txs: {}", e); + e + })?; + + let mut max_block_from_tx = None; + for tx in &txs { + match transaction_from_chain_and_into_db( + web3.clone(), + &conn.clone(), + chain_cfg.chain_id, + &format!("{tx:#x}"), + chain_cfg.token.address, + scan_blockchain_options.import_balances, + ) + .await + { + Ok(Some(chain_tx)) => { + if chain_tx.block_number > max_block_from_tx.unwrap_or(0) { + max_block_from_tx = Some(chain_tx.block_number); + } + } + Ok(None) => {} + Err(e) => { + log::error!("Error when getting transaction from chain: {}", e); + continue; + } + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn scan_auto_step( + conn: SqlitePool, + scan_blockchain_options: &ScanBlockchainOptions, + chain_cfg: Chain, + web3: Arc, + sender: Option
, + scan_info: &mut ScanDaoDbObj, +) -> Result<(), PaymentError> { + let current_block = web3 + .clone() + .eth_block_number() + .await + .map_err(err_from!())? + .as_u64() as i64; + + let (start_block, end_block, is_forward) = if scan_info.start_block == -1 { + let start_block = std::cmp::max( + current_block as i64 - scan_blockchain_options.blocks_behind.unwrap_or(100) as i64, + 1, + ); + let end_block = current_block; + (start_block, end_block, true) + } else if current_block - scan_info.last_block + >= scan_blockchain_options.blocks_behind.unwrap_or(100) as i64 + + scan_blockchain_options.forward_scan_buffer as i64 + { + log::info!("Scan forward needed"); + let start_block = scan_info.last_block + 1; + if start_block > current_block { + log::warn!( + "Start block {} is higher than current block {}, no newer data on blockchain", + start_block, + current_block + ); + return Ok(()); + } + let end_block = start_block + scan_blockchain_options.blocks_at_once as i64; + let end_block = std::cmp::min(end_block, current_block); + (start_block, end_block, true) + } else { + if scan_info.start_block <= scan_blockchain_options.from_block.unwrap_or(1) as i64 { + return Ok(()); + } + + let end_block = scan_info.start_block; + let start_block = + std::cmp::max(end_block - scan_blockchain_options.blocks_at_once as i64, 1); + if end_block - start_block > 0 { + (start_block, end_block, false) + } else { + log::warn!( + "Start block {} is higher than end block {}, no newer data on blockchain", + start_block, + end_block + ); + return Ok(()); + } + + }; + + log::info!( + "Scanning from {} to {} - direction {}", + start_block, + end_block, + if is_forward { "forward" } else { "backward" } + ); + + scan_int( + conn.clone(), + scan_blockchain_options, + chain_cfg.clone(), + web3.clone(), + start_block, + end_block, + current_block, + sender, + ) + .await?; + + if scan_info.start_block == -1 { + scan_info.start_block = start_block; + } + + if is_forward { + //last blocks may be missing so we subtract 100 blocks from current to be sure + scan_info.last_block = std::cmp::min( + end_block, + current_block - scan_blockchain_options.blocks_behind.unwrap_or(100) as i64, + ); + log::debug!( + "Updating db scan entry {} - {}", + scan_info.start_block, + scan_info.last_block + ); + } else { + scan_info.start_block = start_block; + log::debug!( + "Updating db scan entry {} - {}", + scan_info.start_block, + scan_info.last_block + ); + } + + upsert_scan_info(&conn.clone(), scan_info) + .await + .map_err(err_from!())?; + + Ok(()) +} + pub async fn scan_blockchain_local( conn: SqlitePool, scan_blockchain_options: ScanBlockchainOptions, config: Config, ) -> Result<(), PaymentError> { - use erc20_payment_lib_common::{err_custom_create, err_from}; log::info!("Scanning blockchain {}", scan_blockchain_options.chain_name); let payment_setup = PaymentSetup::new_empty(&config)?; @@ -31,6 +199,7 @@ pub async fn scan_blockchain_local( let sender = scan_blockchain_options .sender + .clone() .map(|s| Address::from_str(&s).unwrap()); let scan_info = ScanDaoDbObj { @@ -46,7 +215,7 @@ pub async fn scan_blockchain_local( .await .map_err(err_from!())?; - let mut scan_info = if scan_blockchain_options.start_new_scan { + let scan_info = if scan_blockchain_options.start_new_scan { log::warn!("Starting new scan - removing old scan info from db"); delete_scan_info(&conn.clone(), scan_info.chain_id, &scan_info.filter) .await @@ -68,117 +237,111 @@ pub async fn scan_blockchain_local( .map_err(err_from!())? .as_u64() as i64; - //start around 30 days ago - let mut start_block = std::cmp::max(1, scan_blockchain_options.from_block as i64); + let from_block: i64 = scan_blockchain_options + .from_block + .map(|f| f as i64) + .unwrap_or(current_block - 100); + let mut start_block = std::cmp::max(1, from_block); - if scan_blockchain_options.from_block > current_block as u64 { - log::warn!( - "From block {} is higher than current block {}, no newer data on blockchain", - scan_blockchain_options.from_block, - current_block - ); - return Ok(()); + if let Some(from_block) = scan_blockchain_options.from_block { + if from_block > current_block as u64 { + log::warn!( + "From block {} is higher than current block {}, no newer data on blockchain", + from_block, + current_block + ); + return Ok(()); + } } - if current_block < scan_info.last_block { - log::warn!( - "Current block {} is lower than last block from db {}, no newer data on blockchain", - current_block, - scan_info.last_block - ); - return Ok(()); - } + if scan_blockchain_options.auto { + let mut scan_info = scan_info.clone(); + loop { + match scan_auto_step( + conn.clone(), + &scan_blockchain_options, + chain_cfg.clone(), + web3.clone(), + sender, + &mut scan_info, + ) + .await + { + Ok(_) => { + log::info!("Scan step done"); + } + Err(e) => { + log::info!("Scan step failed - trying again: {}", e); + } + } + //tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } + } else { + if current_block < scan_info.last_block { + log::warn!( + "Current block {} is lower than last block from db {}, no newer data on blockchain", + current_block, + scan_info.last_block + ); + return Ok(()); + } - if scan_info.last_block > start_block { - log::info!( + if scan_info.last_block > start_block { + log::info!( "Start block from db is higher than start block from cli {}, using start block from db {}", start_block, scan_info.last_block ); - start_block = scan_info.last_block; - } else if scan_info.last_block != -1 { - log::error!( + start_block = scan_info.last_block; + } else if scan_info.last_block != -1 { + log::error!( "There is old entry in db, remove it to start new scan or give proper block range: start block: {}, last block {}", start_block, scan_info.last_block ); - return Err(err_custom_create!( + return Err(err_custom_create!( "There is old entry in db, remove it to start new scan or give proper block range: start block: {}, last block {}", start_block, scan_info.last_block )); - } + } - let mut end_block = if let Some(max_block_range) = scan_blockchain_options.max_block_range { - start_block + max_block_range as i64 - } else { - current_block - }; + let mut end_block = if let Some(max_block_range) = scan_blockchain_options.max_block_range { + start_block + max_block_range as i64 + } else { + current_block + }; - if let Some(blocks_behind) = scan_blockchain_options.blocks_behind { - if end_block > current_block - blocks_behind as i64 { - log::info!( + if let Some(blocks_behind) = scan_blockchain_options.blocks_behind { + if end_block > current_block - blocks_behind as i64 { + log::info!( "End block {} is too close to current block {}, using current block - blocks_behind: {}", end_block, current_block, current_block - blocks_behind as i64 ); - end_block = current_block - blocks_behind as i64; + end_block = current_block - blocks_behind as i64; + } } - } - let txs = import_erc20_txs(ImportErc20TxsArgs { - web3: web3.clone(), - erc20_address: chain_cfg.token.address, - chain_id: chain_cfg.chain_id, - filter_by_senders: sender.map(|sender| [sender].to_vec()), - filter_by_receivers: None, - start_block, - scan_end_block: end_block, - blocks_at_once: scan_blockchain_options.blocks_at_once, - }) - .await - .unwrap(); + let current_end_block = std::cmp::min( + end_block, + start_block + scan_blockchain_options.blocks_at_once as i64, + ); - let mut max_block_from_tx = None; - for tx in &txs { - match transaction_from_chain_and_into_db( + let mut scan_info = scan_info.clone(); + scan_int( + conn.clone(), + &scan_blockchain_options, + chain_cfg.clone(), web3.clone(), - &conn.clone(), - chain_cfg.chain_id, - &format!("{tx:#x}"), - chain_cfg.token.address, - scan_blockchain_options.import_balances, + start_block, + end_block, + current_end_block, + sender, ) - .await - { - Ok(Some(chain_tx)) => { - if chain_tx.block_number > max_block_from_tx.unwrap_or(0) { - max_block_from_tx = Some(chain_tx.block_number); - } - } - Ok(None) => {} - Err(e) => { - log::error!("Error when getting transaction from chain: {}", e); - continue; - } - } + .await?; } - if scan_info.start_block == -1 { - scan_info.start_block = start_block; - } - - //last blocks may be missing so we subtract 100 blocks from current to be sure - scan_info.last_block = std::cmp::min(end_block, current_block - 100); - log::info!( - "Updating db scan entry {} - {}", - scan_info.start_block, - scan_info.last_block - ); - upsert_scan_info(&conn.clone(), &scan_info) - .await - .map_err(err_from!())?; - Ok(()) } diff --git a/src/options.rs b/src/options.rs index ce26b604..e5794f95 100644 --- a/src/options.rs +++ b/src/options.rs @@ -231,7 +231,7 @@ pub struct ScanBlockchainOptions { pub chain_name: String, #[structopt(short = "b", long = "from-block")] - pub from_block: u64, + pub from_block: Option, #[structopt(long = "start-new-scan")] pub start_new_scan: bool, @@ -248,6 +248,13 @@ pub struct ScanBlockchainOptions { )] pub blocks_behind: Option, + #[structopt( + long = "forward-scan-buffer", + help = "How much blocks behind scanner should stop", + default_value = "40" + )] + pub forward_scan_buffer: u64, + #[structopt( long = "blocks-at-once", default_value = "1000", @@ -260,6 +267,9 @@ pub struct ScanBlockchainOptions { #[structopt(short = "a", long = "address")] pub sender: Option, + + #[structopt(long = "auto")] + pub auto: bool, } #[derive(StructOpt)]