Skip to content

Commit

Permalink
Separate tasks for multiple networks
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 authored Mar 20, 2024
1 parent 9997111 commit 0304f7f
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 190 deletions.
80 changes: 0 additions & 80 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,86 +187,6 @@ jobs:
- name: Run tests (docker_03_problems)
run: cargo test --test docker_03_problems --profile=release-fast -- --test-threads=10

test_faucet:
name: Test Goerli faucet
timeout-minutes: 20

runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Cache dependencies
uses: Swatinem/rust-cache@v2
with:
shared-key: "dev-build-cache"

- name: Build
run: cargo build

- name: Run tests (faucet)
run: cargo run -- generate-key -n 1 > .env

- name: Check if balance is 0
run: |
[ $(cargo run -- balance -c goerli | jq -r '.[] | .gasDecimal') == "0" ]
[ $(cargo run -- balance -c goerli | jq -r '.[] | .tokenDecimal') == "0" ]
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Get ETH from faucet
run: cargo run -- get-dev-eth -c goerli
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Check ETH balance after getting funds from faucet (should be 0.01)
run: |
sleep 60 # give time for the blockchain to propagate info about the transaction
[ $(cargo run -- balance -c goerli | jq -r '.[] | .gasDecimal') == "0.01" ]
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Mint tokens
run: |
cargo run -- mint-test-tokens -c goerli
cargo run -- run
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Check token balance
run: |
[ $(cargo run -- balance -c goerli | jq -r '.[] | .tokenDecimal') == "1000" ]
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Transfer 166.6 GLM tokens
run: |
cargo run -- transfer -c goerli --recipient 0x5b984629E2Cc7570cBa7dD745b83c3dD23Ba6d0f --token glm --amount 166.6
cargo run -- run
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Transfer all GLM tokens
run: |
cargo run -- transfer -c goerli --recipient 0x5b984629E2Cc7570cBa7dD745b83c3dD23Ba6d0f --token glm --all
cargo run -- run
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Check token balance zero
run: |
[ $(cargo run -- balance -c goerli | jq -r '.[] | .tokenDecimal') == "0" ]
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

- name: Transfer all left ETH tokens
run: |
cargo run -- transfer -c goerli --recipient 0x5b984629E2Cc7570cBa7dD745b83c3dD23Ba6d0f --token eth --all
cargo run -- run
env:
GOERLI_GETH_ADDR: ${{ secrets.GOERLI_RPC_ADDRESS }}

test_faucet_holesky:
name: Test Holesky faucet
timeout-minutes: 20
Expand Down
121 changes: 72 additions & 49 deletions crates/erc20_payment_lib/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::utils::{DecimalConvExt, StringConvExt, U256ConvExt};
use chrono::{DateTime, Utc};
use erc20_payment_lib_common::{
DriverEvent, DriverEventContent, FaucetData, SharedInfoTx, StatusProperty,
TransactionFailedReason, TransactionStuckReason, Web3RpcPoolContent,
TransactionStuckReason, Web3RpcPoolContent,
};
use erc20_rpc_pool::{Web3ExternalSources, Web3FullNodeData, Web3PoolType, Web3RpcPool};
use rust_decimal::prelude::FromPrimitive;
Expand Down Expand Up @@ -125,12 +125,6 @@ impl StatusTracker {
for old_property in status_props.iter_mut() {
use StatusProperty::*;
match (old_property, &new_property) {
(InvalidChainId { chain_id: id1 }, InvalidChainId { chain_id: id2 })
if id1 == id2 =>
{
return false;
}

(
CantSign {
chain_id: id1,
Expand Down Expand Up @@ -211,7 +205,6 @@ impl StatusTracker {

#[allow(clippy::match_like_matches_macro)]
status_props.retain(|s| match s {
StatusProperty::InvalidChainId { chain_id } if *chain_id == ok_chain_id => false,
StatusProperty::CantSign { chain_id, .. } if *chain_id == ok_chain_id => false,
StatusProperty::NoGas { chain_id, .. } if *chain_id == ok_chain_id => false,
StatusProperty::NoToken { chain_id, .. } if *chain_id == ok_chain_id => false,
Expand All @@ -236,14 +229,6 @@ impl StatusTracker {
while let Some(ev) = status_rx.recv().await {
let mut pass_raw_message = true;
let emit_changed = match &ev.content {
DriverEventContent::TransactionFailed(
TransactionFailedReason::InvalidChainId(chain_id),
) => Self::update(
status.lock().await.deref_mut(),
StatusProperty::InvalidChainId {
chain_id: *chain_id,
},
),
DriverEventContent::CantSign(details) => Self::update(
status.lock().await.deref_mut(),
StatusProperty::CantSign {
Expand Down Expand Up @@ -408,6 +393,7 @@ impl PaymentRuntime {
fn start_service_loop(
&self,
signer_address: Address,
chain_id: i64,
notify: Arc<Notify>,
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
Expand Down Expand Up @@ -462,6 +448,7 @@ impl PaymentRuntime {
} else {
service_loop(
shared_state_clone,
chain_id,
signer_address,
notify,
&conn,
Expand Down Expand Up @@ -569,31 +556,45 @@ impl PaymentRuntime {
}

fn get_and_remove_tasks(&self) -> Vec<JoinHandle<()>> {
self.shared_state
.lock()
.unwrap()
.accounts
.iter_mut()
.filter_map(|a| a.jh.lock().unwrap().take())
.collect()
let mut task_handles = Vec::new();
let mut lock_shared_state = self.shared_state.lock().unwrap();

//this shouldn't end in deadlock. It just extracts all handles and removes them from the lists
for account in lock_shared_state.accounts.iter_mut() {
for jh in account.jh.lock().unwrap().iter_mut() {
if let Some(jh) = jh.take() {
task_handles.push(jh);
}
}
}

task_handles
}

pub fn is_any_task_running(&self) -> bool {
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| !jh.is_finished())
})
let lock_shared_state = self.shared_state.lock().unwrap();

for account in lock_shared_state.accounts.iter() {
for jh in account.jh.lock().unwrap().iter().flatten() {
if !jh.is_finished() {
return true;
}
}
}
false
}

pub fn is_any_task_finished(&self) -> bool {
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| jh.is_finished())
})
let lock_shared_state = self.shared_state.lock().unwrap();

for account in lock_shared_state.accounts.iter() {
for jh in account.jh.lock().unwrap().iter().flatten() {
if jh.is_finished() {
return true;
}
}
}
false
}

pub async fn join_tasks(&self) -> Result<(), JoinError> {
Expand Down Expand Up @@ -623,7 +624,7 @@ impl PaymentRuntime {
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
) -> bool {
log::info!("Adding account: {}", payment_account);
log::debug!("Adding account: {}", payment_account);
let mut sh = self.shared_state.lock().unwrap();

if sh
Expand All @@ -634,13 +635,21 @@ impl PaymentRuntime {
log::error!("Account already added: {}", payment_account);
return false;
}
let jh = self.start_service_loop(
payment_account.address,
self.wake.clone(),
extra_testing,
options,
);
*payment_account.jh.lock().unwrap() = Some(jh);
for chain_id in self.chains() {
log::debug!(
"Starting service loop for account: {} and chain id: {}",
payment_account.address,
chain_id
);
let jh = self.start_service_loop(
payment_account.address,
chain_id,
self.wake.clone(),
extra_testing.clone(),
options.clone(),
);
payment_account.jh.lock().as_mut().unwrap().push(Some(jh));
}
sh.accounts.push(payment_account);

true
Expand Down Expand Up @@ -1063,9 +1072,16 @@ pub async fn mint_golem_token(
let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let filter = "method=\"FAUCET.create\" AND fee_paid is NULL";

let tx_existing = get_transactions(&mut *db_transaction, Some(from), Some(filter), None, None)
.await
.map_err(err_from!())?;
let tx_existing = get_transactions(
&mut *db_transaction,
Some(from),
Some(filter),
None,
None,
Some(chain_id as i64),
)
.await
.map_err(err_from!())?;

if let Some(tx) = tx_existing.first() {
return Err(err_custom_create!(
Expand Down Expand Up @@ -1449,9 +1465,16 @@ pub async fn deposit_funds(

let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let filter = "method=\"LOCK.deposit\" AND fee_paid is NULL";
let tx_existing = get_transactions(&mut *db_transaction, Some(from), Some(filter), None, None)
.await
.map_err(err_from!())?;
let tx_existing = get_transactions(
&mut *db_transaction,
Some(from),
Some(filter),
None,
None,
Some(chain_id as i64),
)
.await
.map_err(err_from!())?;

if let Some(tx) = tx_existing.first() {
return Err(err_custom_create!(
Expand Down
15 changes: 4 additions & 11 deletions crates/erc20_payment_lib/src/sender/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use crate::{err_create, err_custom_create, err_from};
use sqlx::SqlitePool;
use tokio::sync::mpsc;

use crate::runtime::send_driver_event;
use crate::signer::SignerAccount;
use erc20_payment_lib_common::model::TokenTransferDbObj;
use erc20_payment_lib_common::{DriverEvent, DriverEventContent, TransactionFailedReason};
use erc20_payment_lib_common::DriverEvent;
use web3::types::{Address, U256};

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
Expand Down Expand Up @@ -52,12 +51,13 @@ pub struct TokenTransferMultiOrder {

pub async fn gather_transactions_pre(
account: &SignerAccount,
chain_id: i64,
conn: &SqlitePool,
_payment_setup: &PaymentSetup,
) -> Result<TokenTransferMap, PaymentError> {
let mut transfer_map = TokenTransferMap::new();

let mut token_transfers = get_pending_token_transfers(conn, account.address)
let mut token_transfers = get_pending_token_transfers(conn, account.address, chain_id)
.await
.map_err(err_from!())?;

Expand Down Expand Up @@ -321,7 +321,7 @@ pub async fn gather_transactions_batch_multi(
}

pub async fn gather_transactions_batch(
event_sender: Option<mpsc::Sender<DriverEvent>>,
_event_sender: Option<mpsc::Sender<DriverEvent>>,
conn: &SqlitePool,
payment_setup: &PaymentSetup,
token_transfers: &mut [TokenTransferDbObj],
Expand All @@ -333,13 +333,6 @@ pub async fn gather_transactions_batch(
}

let Some(chain_setup) = payment_setup.chain_setup.get(&token_transfer.chain_id) else {
send_driver_event(
&event_sender,
DriverEventContent::TransactionFailed(TransactionFailedReason::InvalidChainId(
token_transfer.chain_id,
)),
)
.await;
return Err(err_custom_create!(
"No setup found for chain id: {}",
token_transfer.chain_id
Expand Down
9 changes: 1 addition & 8 deletions crates/erc20_payment_lib/src/sender/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use erc20_payment_lib_common::ops::{
};
use erc20_payment_lib_common::{
CantSignContent, DriverEvent, DriverEventContent, GasLowInfo, NoGasDetails,
TransactionFailedReason, TransactionStuckReason,
TransactionStuckReason,
};
use rust_decimal::prelude::Zero;
use rust_decimal::Decimal;
Expand Down Expand Up @@ -62,13 +62,6 @@ pub async fn process_transaction(
) -> Result<(TxDbObj, ProcessTransactionResult), PaymentError> {
let chain_id = web3_tx_dao.chain_id;
let Some(chain_setup) = payment_setup.chain_setup.get(&chain_id) else {
send_driver_event(
&event_sender,
DriverEventContent::TransactionFailed(TransactionFailedReason::InvalidChainId(
chain_id,
)),
)
.await;
return Ok((web3_tx_dao.clone(), ProcessTransactionResult::DoNotSave));
};

Expand Down
Loading

0 comments on commit 0304f7f

Please sign in to comment.