Skip to content

Commit

Permalink
Merge branch 'release' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 authored Mar 27, 2024
2 parents ebe43d0 + 0304f7f commit 0d4d7a8
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 113 deletions.
130 changes: 83 additions & 47 deletions crates/erc20_payment_lib/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use chrono::{DateTime, Utc};
use erc20_payment_lib_common::model::TokenTransferDbObj;
use erc20_payment_lib_common::{
DriverEvent, DriverEventContent, FaucetData, SharedInfoTx, StatusProperty,
TransactionFailedReason, TransactionStuckReason, Web3RpcPoolContent,
TransactionStuckReason, Web3RpcPoolContent,
};
use erc20_rpc_pool::{Web3ExternalSources, Web3FullNodeData, Web3PoolType, Web3RpcPool};
use rust_decimal::prelude::FromPrimitive;
Expand Down Expand Up @@ -125,12 +125,6 @@ impl StatusTracker {
for old_property in status_props.iter_mut() {
use StatusProperty::*;
match (old_property, &new_property) {
(InvalidChainId { chain_id: id1 }, InvalidChainId { chain_id: id2 })
if id1 == id2 =>
{
return false;
}

(
CantSign {
chain_id: id1,
Expand Down Expand Up @@ -211,7 +205,6 @@ impl StatusTracker {

#[allow(clippy::match_like_matches_macro)]
status_props.retain(|s| match s {
StatusProperty::InvalidChainId { chain_id } if *chain_id == ok_chain_id => false,
StatusProperty::CantSign { chain_id, .. } if *chain_id == ok_chain_id => false,
StatusProperty::NoGas { chain_id, .. } if *chain_id == ok_chain_id => false,
StatusProperty::NoToken { chain_id, .. } if *chain_id == ok_chain_id => false,
Expand All @@ -236,14 +229,6 @@ impl StatusTracker {
while let Some(ev) = status_rx.recv().await {
let mut pass_raw_message = true;
let emit_changed = match &ev.content {
DriverEventContent::TransactionFailed(
TransactionFailedReason::InvalidChainId(chain_id),
) => Self::update(
status.lock().await.deref_mut(),
StatusProperty::InvalidChainId {
chain_id: *chain_id,
},
),
DriverEventContent::CantSign(details) => Self::update(
status.lock().await.deref_mut(),
StatusProperty::CantSign {
Expand Down Expand Up @@ -407,6 +392,7 @@ impl PaymentRuntime {
fn start_service_loop(
&self,
signer_address: Address,
chain_id: i64,
notify: Arc<Notify>,
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
Expand Down Expand Up @@ -461,6 +447,7 @@ impl PaymentRuntime {
} else {
service_loop(
shared_state_clone,
chain_id,
signer_address,
notify,
&conn,
Expand Down Expand Up @@ -568,31 +555,45 @@ impl PaymentRuntime {
}

fn get_and_remove_tasks(&self) -> Vec<JoinHandle<()>> {
self.shared_state
.lock()
.unwrap()
.accounts
.iter_mut()
.filter_map(|a| a.jh.lock().unwrap().take())
.collect()
let mut task_handles = Vec::new();
let mut lock_shared_state = self.shared_state.lock().unwrap();

//this shouldn't end in deadlock. It just extracts all handles and removes them from the lists
for account in lock_shared_state.accounts.iter_mut() {
for jh in account.jh.lock().unwrap().iter_mut() {
if let Some(jh) = jh.take() {
task_handles.push(jh);
}
}
}

task_handles
}

pub fn is_any_task_running(&self) -> bool {
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| !jh.is_finished())
})
let lock_shared_state = self.shared_state.lock().unwrap();

for account in lock_shared_state.accounts.iter() {
for jh in account.jh.lock().unwrap().iter().flatten() {
if !jh.is_finished() {
return true;
}
}
}
false
}

pub fn is_any_task_finished(&self) -> bool {
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| jh.is_finished())
})
let lock_shared_state = self.shared_state.lock().unwrap();

for account in lock_shared_state.accounts.iter() {
for jh in account.jh.lock().unwrap().iter().flatten() {
if jh.is_finished() {
return true;
}
}
}
false
}

pub async fn join_tasks(&self) -> Result<(), JoinError> {
Expand Down Expand Up @@ -622,7 +623,7 @@ impl PaymentRuntime {
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
) -> bool {
log::info!("Adding account: {}", payment_account);
log::debug!("Adding account: {}", payment_account);
let mut sh = self.shared_state.lock().unwrap();

if sh
Expand All @@ -633,13 +634,21 @@ impl PaymentRuntime {
log::error!("Account already added: {}", payment_account);
return false;
}
let jh = self.start_service_loop(
payment_account.address,
self.wake.clone(),
extra_testing,
options,
);
*payment_account.jh.lock().unwrap() = Some(jh);
for chain_id in self.chains() {
log::debug!(
"Starting service loop for account: {} and chain id: {}",
payment_account.address,
chain_id
);
let jh = self.start_service_loop(
payment_account.address,
chain_id,
self.wake.clone(),
extra_testing.clone(),
options.clone(),
);
payment_account.jh.lock().as_mut().unwrap().push(Some(jh));
}
sh.accounts.push(payment_account);

true
Expand Down Expand Up @@ -1079,9 +1088,16 @@ pub async fn mint_golem_token(
let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let filter = "method=\"FAUCET.create\" AND fee_paid is NULL";

let tx_existing = get_transactions(&mut *db_transaction, Some(from), Some(filter), None, None)
.await
.map_err(err_from!())?;
let tx_existing = get_transactions(
&mut *db_transaction,
Some(from),
Some(filter),
None,
None,
Some(chain_id as i64),
)
.await
.map_err(err_from!())?;

if let Some(tx) = tx_existing.first() {
return Err(err_custom_create!(
Expand Down Expand Up @@ -1341,7 +1357,27 @@ pub async fn make_deposit(
};
}

let make_deposit_tx = create_make_deposit(
let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let filter = "method=\"LOCK.deposit\" AND fee_paid is NULL";
let tx_existing = get_transactions(
&mut *db_transaction,
Some(from),
Some(filter),
None,
None,
Some(chain_id as i64),
)
.await
.map_err(err_from!())?;

if let Some(tx) = tx_existing.first() {
return Err(err_custom_create!(
"You already have a pending deposit transaction with id: {}",
tx.id
));
}

let deposit_tx = create_lock_deposit(
from,
opt.lock_contract_address,
chain_id,
Expand Down
15 changes: 4 additions & 11 deletions crates/erc20_payment_lib/src/sender/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use crate::{err_create, err_custom_create, err_from};
use sqlx::SqlitePool;
use tokio::sync::mpsc;

use crate::runtime::send_driver_event;
use crate::signer::SignerAccount;
use erc20_payment_lib_common::model::TokenTransferDbObj;
use erc20_payment_lib_common::{DriverEvent, DriverEventContent, TransactionFailedReason};
use erc20_payment_lib_common::DriverEvent;
use web3::types::{Address, U256};

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
Expand Down Expand Up @@ -50,13 +49,14 @@ pub struct TokenTransferMultiOrder {

pub async fn gather_transactions_pre(
account: &SignerAccount,
chain_id: i64,
conn: &SqlitePool,
payment_setup: &PaymentSetup,
process_tx_needed: &mut bool,
) -> Result<TokenTransferMap, PaymentError> {
let mut transfer_map = TokenTransferMap::new();

let mut token_transfers = get_pending_token_transfers(conn, account.address)
let mut token_transfers = get_pending_token_transfers(conn, account.address, chain_id)
.await
.map_err(err_from!())?;

Expand Down Expand Up @@ -382,7 +382,7 @@ pub async fn gather_transactions_batch_multi(
}

pub async fn gather_transactions_batch(
event_sender: Option<mpsc::Sender<DriverEvent>>,
_event_sender: Option<mpsc::Sender<DriverEvent>>,
conn: &SqlitePool,
payment_setup: &PaymentSetup,
token_transfers: &mut [TokenTransferDbObj],
Expand All @@ -394,13 +394,6 @@ pub async fn gather_transactions_batch(
}

let Some(chain_setup) = payment_setup.chain_setup.get(&token_transfer.chain_id) else {
send_driver_event(
&event_sender,
DriverEventContent::TransactionFailed(TransactionFailedReason::InvalidChainId(
token_transfer.chain_id,
)),
)
.await;
return Err(err_custom_create!(
"No setup found for chain id: {}",
token_transfer.chain_id
Expand Down
9 changes: 1 addition & 8 deletions crates/erc20_payment_lib/src/sender/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use erc20_payment_lib_common::ops::{
};
use erc20_payment_lib_common::{
CantSignContent, DriverEvent, DriverEventContent, GasLowInfo, NoGasDetails,
TransactionFailedReason, TransactionStuckReason,
TransactionStuckReason,
};
use rust_decimal::prelude::Zero;
use rust_decimal::Decimal;
Expand Down Expand Up @@ -62,13 +62,6 @@ pub async fn process_transaction(
) -> Result<(TxDbObj, ProcessTransactionResult), PaymentError> {
let chain_id = web3_tx_dao.chain_id;
let Some(chain_setup) = payment_setup.chain_setup.get(&chain_id) else {
send_driver_event(
&event_sender,
DriverEventContent::TransactionFailed(TransactionFailedReason::InvalidChainId(
chain_id,
)),
)
.await;
return Ok((web3_tx_dao.clone(), ProcessTransactionResult::DoNotSave));
};

Expand Down
23 changes: 15 additions & 8 deletions crates/erc20_payment_lib/src/sender/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ pub async fn update_tx_result(

pub async fn process_transactions(
signer_account: &SignerAccount,
chain_id: i64,
event_sender: Option<tokio::sync::mpsc::Sender<DriverEvent>>,
shared_state: Arc<std::sync::Mutex<SharedState>>,
conn: &SqlitePool,
Expand All @@ -266,7 +267,7 @@ pub async fn process_transactions(
let mut current_wait_time_no_gas_token: f64 = 0.0;
loop {
let mut transactions =
get_next_transactions_to_process(conn, Some(signer_account.address), 1)
get_next_transactions_to_process(conn, Some(signer_account.address), 1, chain_id)
.await
.map_err(err_from!())?;

Expand Down Expand Up @@ -469,6 +470,7 @@ async fn sleep_for_gather_time_or_report_alive(

pub async fn service_loop(
shared_state: Arc<std::sync::Mutex<SharedState>>,
chain_id: i64,
account: Address,
wake: Arc<tokio::sync::Notify>,
conn: &SqlitePool,
Expand All @@ -491,17 +493,21 @@ pub async fn service_loop(
let metric_label_gather_post = "erc20_payment_lib.service_loop.gather_post";
let metric_label_gather_post_error = "erc20_payment_lib.service_loop.gather_post_error";
//let metric_label_loop_duration = "erc20_payment_lib.service_loop.loop_duration";
metrics::counter!(metric_label_start, 0);
metrics::counter!(metric_label_process_allowance, 0);
metrics::counter!(metric_label_gather_pre, 0);
metrics::counter!(metric_label_gather_pre_error, 0);
metrics::counter!(metric_label_gather_post, 0);
metrics::counter!(metric_label_gather_post_error, 0);
metrics::counter!(metric_label_start, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_process_allowance, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_pre, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_pre_error, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_post, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_post_error, 0, "chain_id" => chain_id.to_string());

let mut process_tx_needed;
let mut last_stats_time: Option<Instant> = None;
loop {
log::debug!("Sender service loop - start loop");
log::debug!(
"Sender service loop - start loop chain id: {} - account: {:#x}",
chain_id,
account
);
metrics::counter!(metric_label_start, 1);
let signer_account = match shared_state
.lock()
Expand Down Expand Up @@ -529,6 +535,7 @@ pub async fn service_loop(
log::warn!("Skipping processing transactions...");
} else if let Err(e) = process_transactions(
&signer_account,
chain_id,
event_sender.clone(),
shared_state.clone(),
conn,
Expand Down
Loading

0 comments on commit 0d4d7a8

Please sign in to comment.