From 9dd96284d7d245481e8a6c2d64c34a39fa0b6108 Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Thu, 26 Sep 2024 08:56:33 -0400 Subject: [PATCH 01/11] rebase work on main --- crates/astria-sequencer/src/mempool/mod.rs | 99 +++++- .../src/mempool/transactions_container.rs | 4 + crates/astria-sequencer/src/metrics.rs | 52 ++-- .../service/{mempool.rs => mempool/mod.rs} | 292 ++++++++++++------ .../src/service/mempool/tests.rs | 178 +++++++++++ .../src/transaction/checks.rs | 21 -- .../astria-sequencer/src/transaction/mod.rs | 1 - 7 files changed, 490 insertions(+), 157 deletions(-) rename crates/astria-sequencer/src/service/{mempool.rs => mempool/mod.rs} (67%) create mode 100644 crates/astria-sequencer/src/service/mempool/tests.rs diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index b47f2482a3..dbb18088c0 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -20,7 +20,6 @@ use astria_core::{ use astria_eyre::eyre::Result; pub(crate) use mempool_state::get_account_balances; use tokio::{ - join, sync::{ RwLock, RwLockWriteGuard, @@ -133,6 +132,7 @@ pub(crate) struct Mempool { pending: Arc>, parked: Arc>>, comet_bft_removal_cache: Arc>, + contained_txs: Arc>>, } impl Mempool { @@ -145,6 +145,7 @@ impl Mempool { NonZeroUsize::try_from(REMOVAL_CACHE_SIZE) .expect("Removal cache cannot be zero sized"), ))), + contained_txs: Arc::new(RwLock::new(HashSet::new())), } } @@ -152,12 +153,7 @@ impl Mempool { #[must_use] #[instrument(skip_all)] pub(crate) async fn len(&self) -> usize { - #[rustfmt::skip] - let (pending_len, parked_len) = join!( - async { self.pending.read().await.len() }, - async { self.parked.read().await.len() } - ); - pending_len.saturating_add(parked_len) + self.contained_txs.read().await.len() } /// Inserts a transaction into the mempool and does not allow for transaction replacement. @@ -184,11 +180,18 @@ impl Mempool { // Release the lock asap. drop(pending); // try to add to parked queue - parked.add( - timemarked_tx, + match parked.add( + timemarked_tx.clone(), current_account_nonce, ¤t_account_balances, - ) + ) { + Ok(()) => { + // track in contained txs + self.contained_txs.write().await.insert(timemarked_tx.id()); + Ok(()) + } + Err(err) => Err(err), + } } error @ Err( InsertionError::AlreadyPresent @@ -220,6 +223,10 @@ impl Mempool { ); } } + + // track in contained txs + self.contained_txs.write().await.insert(timemarked_tx.id()); + Ok(()) } } @@ -266,10 +273,12 @@ impl Mempool { // Add all removed to removal cache for cometbft. let mut removal_cache = self.comet_bft_removal_cache.write().await; - // Add the original tx first, since it will also be listed in `removed_txs`. The second + + // Add the original tx first to preserve its reason for removal. The second // attempt to add it inside the loop below will be a no-op. removal_cache.add(tx_hash, reason); for removed_tx in removed_txs { + self.contained_txs.write().await.remove(&removed_tx); removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } } @@ -281,6 +290,12 @@ impl Mempool { self.comet_bft_removal_cache.write().await.remove(tx_hash) } + /// Returns true if the transaction is tracked as inserted. + #[instrument(skip_all)] + pub(crate) async fn tracked(&self, tx_hash: [u8; 32]) -> bool { + self.contained_txs.read().await.contains(&tx_hash) + } + /// Updates stored transactions to reflect current blockchain state. Will remove transactions /// that have stale nonces or are expired. Will also shift transation between pending and /// parked to relfect changes in account balances. @@ -380,10 +395,12 @@ impl Mempool { drop(parked); drop(pending); - // add to removal cache for cometbft + // add to removal cache for cometbft and remove from the tracked set let mut removal_cache = self.comet_bft_removal_cache.write().await; + let mut tracked_txs = self.contained_txs.write().await; for (tx_hash, reason) in removed_txs { removal_cache.add(tx_hash, reason); + tracked_txs.remove(&tx_hash); } } @@ -947,4 +964,62 @@ mod tests { "first removal reason should be presenved" ); } + + #[tokio::test] + async fn tx_tracked_set() { + let mempool = Mempool::new(); + let signing_key = SigningKey::from([1; 32]); + let signing_address = signing_key.verification_key().address_bytes(); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = mock_tx(0, &signing_key, "test"); + let tx1 = mock_tx(1, &signing_key, "test"); + + // check that the parked transaction is in the tracked set + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + assert!(mempool.tracked(tx1.id().get()).await); + + // check that the pending transaction is in the tracked set + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + assert!(mempool.tracked(tx0.id().get()).await); + + // remove the transactions from the mempool + mempool + .remove_tx_invalid(tx0.clone(), RemovalReason::Expired) + .await; + + // check that the transactions are not in the tracked set + assert!(!mempool.tracked(tx0.id().get()).await); + assert!(!mempool.tracked(tx1.id().get()).await); + + // re-insert the transactions into the mempool + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // check that the transactions are in the tracked set + assert!(mempool.tracked(tx0.id().get()).await); + assert!(mempool.tracked(tx1.id().get()).await); + + // remove the transacitons from the mempool via maintenance + let mut mock_state = mock_state_getter().await; + mock_state_put_account_nonce(&mut mock_state, signing_address, 2); + mempool.run_maintenance(&mock_state, false).await; + + // check that the transactions are not in the tracked set + assert!(!mempool.tracked(tx0.id().get()).await); + assert!(!mempool.tracked(tx1.id().get()).await); + } } diff --git a/crates/astria-sequencer/src/mempool/transactions_container.rs b/crates/astria-sequencer/src/mempool/transactions_container.rs index 62cd0d2fbd..b9bc032a18 100644 --- a/crates/astria-sequencer/src/mempool/transactions_container.rs +++ b/crates/astria-sequencer/src/mempool/transactions_container.rs @@ -110,6 +110,10 @@ impl TimemarkedTransaction { pub(super) fn cost(&self) -> &HashMap { &self.cost } + + pub(super) fn id(&self) -> [u8; 32] { + self.tx_hash + } } impl fmt::Display for TimemarkedTransaction { diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index d051c68cfc..413196fe9d 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -24,10 +24,10 @@ pub struct Metrics { check_tx_removed_expired: Counter, check_tx_removed_failed_execution: Counter, check_tx_removed_failed_stateless: Counter, - check_tx_removed_stale_nonce: Counter, check_tx_duration_seconds_parse_tx: Histogram, check_tx_duration_seconds_check_stateless: Histogram, - check_tx_duration_seconds_check_nonce: Histogram, + check_tx_duration_seconds_fetch_nonce: Histogram, + check_tx_duration_seconds_check_tracked: Histogram, check_tx_duration_seconds_check_chain_id: Histogram, check_tx_duration_seconds_check_removed: Histogram, check_tx_duration_seconds_convert_address: Histogram, @@ -88,10 +88,6 @@ impl Metrics { self.check_tx_removed_failed_stateless.increment(1); } - pub(crate) fn increment_check_tx_removed_stale_nonce(&self) { - self.check_tx_removed_stale_nonce.increment(1); - } - pub(crate) fn record_check_tx_duration_seconds_parse_tx(&self, duration: Duration) { self.check_tx_duration_seconds_parse_tx.record(duration); } @@ -101,8 +97,13 @@ impl Metrics { .record(duration); } - pub(crate) fn record_check_tx_duration_seconds_check_nonce(&self, duration: Duration) { - self.check_tx_duration_seconds_check_nonce.record(duration); + pub(crate) fn record_check_tx_duration_seconds_fetch_nonce(&self, duration: Duration) { + self.check_tx_duration_seconds_fetch_nonce.record(duration); + } + + pub(crate) fn record_check_tx_duration_seconds_check_tracked(&self, duration: Duration) { + self.check_tx_duration_seconds_check_tracked + .record(duration); } pub(crate) fn record_check_tx_duration_seconds_check_chain_id(&self, duration: Duration) { @@ -260,6 +261,21 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let check_tx_duration_seconds_fetch_nonce = builder + .new_histogram_factory( + CHECK_TX_DURATION_SECONDS_FETCH_NONCE, + "The amount of time taken in seconds to fetch an account's nonce", + )? + .register()?; + + let check_tx_duration_seconds_check_tracked = builder + .new_histogram_factory( + CHECK_TX_DURATION_SECONDS_CHECK_TRACKED, + "The amount of time taken in seconds to check if the transaction is already in \ + the mempool", + )? + .register()?; + let check_tx_removed_failed_stateless = builder .new_counter_factory( CHECK_TX_REMOVED_FAILED_STATELESS, @@ -267,15 +283,6 @@ impl telemetry::Metrics for Metrics { failing the stateless check", )? .register()?; - - let check_tx_removed_stale_nonce = builder - .new_counter_factory( - CHECK_TX_REMOVED_STALE_NONCE, - "The number of transactions that have been removed from the mempool due to having \ - a stale nonce", - )? - .register()?; - let mut check_tx_duration_factory = builder.new_histogram_factory( CHECK_TX_DURATION_SECONDS, "The amount of time taken in seconds to successfully complete the various stages of \ @@ -286,8 +293,6 @@ impl telemetry::Metrics for Metrics { )?; let check_tx_duration_seconds_check_stateless = check_tx_duration_factory .register_with_labels(&[(CHECK_TX_STAGE, "stateless check".to_string())])?; - let check_tx_duration_seconds_check_nonce = check_tx_duration_factory - .register_with_labels(&[(CHECK_TX_STAGE, "nonce check".to_string())])?; let check_tx_duration_seconds_check_chain_id = check_tx_duration_factory .register_with_labels(&[(CHECK_TX_STAGE, "chain id check".to_string())])?; let check_tx_duration_seconds_check_removed = check_tx_duration_factory @@ -335,10 +340,10 @@ impl telemetry::Metrics for Metrics { check_tx_removed_expired, check_tx_removed_failed_execution, check_tx_removed_failed_stateless, - check_tx_removed_stale_nonce, check_tx_duration_seconds_parse_tx, check_tx_duration_seconds_check_stateless, - check_tx_duration_seconds_check_nonce, + check_tx_duration_seconds_fetch_nonce, + check_tx_duration_seconds_check_tracked, check_tx_duration_seconds_check_chain_id, check_tx_duration_seconds_check_removed, check_tx_duration_seconds_convert_address, @@ -365,12 +370,13 @@ metric_names!(const METRICS_NAMES: CHECK_TX_REMOVED_EXPIRED, CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, - CHECK_TX_REMOVED_STALE_NONCE, CHECK_TX_REMOVED_ACCOUNT_BALANCE, CHECK_TX_DURATION_SECONDS, CHECK_TX_DURATION_SECONDS_CONVERT_ADDRESS, CHECK_TX_DURATION_SECONDS_FETCH_BALANCES, + CHECK_TX_DURATION_SECONDS_FETCH_NONCE, CHECK_TX_DURATION_SECONDS_FETCH_TX_COST, + CHECK_TX_DURATION_SECONDS_CHECK_TRACKED, ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, @@ -386,7 +392,6 @@ mod tests { CHECK_TX_REMOVED_EXPIRED, CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, - CHECK_TX_REMOVED_STALE_NONCE, CHECK_TX_REMOVED_TOO_LARGE, MEMPOOL_RECOSTED, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, @@ -442,7 +447,6 @@ mod tests { CHECK_TX_REMOVED_FAILED_STATELESS, "check_tx_removed_failed_stateless", ); - assert_const(CHECK_TX_REMOVED_STALE_NONCE, "check_tx_removed_stale_nonce"); assert_const( CHECK_TX_REMOVED_ACCOUNT_BALANCE, "check_tx_removed_account_balance", diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool/mod.rs similarity index 67% rename from crates/astria-sequencer/src/service/mempool.rs rename to crates/astria-sequencer/src/service/mempool/mod.rs index 73f79f8ffc..230f28bf21 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use std::{ collections::HashMap, pin::Pin, @@ -18,6 +21,7 @@ use astria_core::{ }, }; use astria_eyre::eyre::WrapErr as _; +use bytes::Bytes; use cnidarium::Storage; use futures::{ Future, @@ -106,11 +110,14 @@ impl Service for Mempool { /// Handles a [`request::CheckTx`] request. /// -/// Performs stateless checks (decoding and signature check), -/// as well as stateful checks (nonce and balance checks). +/// This function will error if: +/// - the transaction has been removed from the app's mempool (will throw error once) +/// - the transaction fails stateless checks +/// - the transaction fails insertion into the mempool /// -/// If the tx passes all checks, status code 0 is returned. -#[allow(clippy::too_many_lines)] +/// The function will return a [`response::CheckTx`] with a status code of 0 if the transaction: +/// - Is already in the appside mempool +/// - Passes stateless checks and insertion into the mempool is successful #[instrument(skip_all)] async fn handle_check_tx( req: request::CheckTx, @@ -120,18 +127,149 @@ async fn handle_check_tx response::CheckTx { use sha2::Digest as _; - let start_parsing = Instant::now(); - let request::CheckTx { tx, .. } = req; let tx_hash = sha2::Sha256::digest(&tx).into(); + + // check if the transaction has been removed from the appside mempool + if let Some(rsp) = check_removed_comet_bft(tx_hash, mempool, metrics).await { + return rsp; + } + + // check if the transaction is already in the mempool + if let Some(rsp) = check_tracked(tx_hash, mempool, metrics).await { + return rsp; + } + + // perform stateless checks + let signed_tx = match stateless_checks(tx, &state, metrics).await { + StatelessReturn::Response(rsp) => return rsp, + StatelessReturn::Tx(signed_tx) => signed_tx, + }; + + // attempt to insert the transaction into the mempool + if let Some(rsp) = insert_into_mempool(mempool, &state, signed_tx, metrics).await { + return rsp; + } + + // insertion successful + metrics.set_transactions_in_mempool_total(mempool.len().await); + + response::CheckTx::default() +} + +/// Checks if the transaction is already in the mempool. +/// +/// Returns a [`response::CheckTx`] with a status code of 0 if the transaction is already in the +/// mempool. +async fn check_tracked( + tx_hash: [u8; 32], + mempool: &AppMempool, + metrics: &Metrics, +) -> Option { + let start_tracked_check = Instant::now(); + + if mempool.tracked(tx_hash).await { + return Some(response::CheckTx::default()); + } + + let finished_check_tracked = Instant::now(); + metrics.record_check_tx_duration_seconds_check_tracked( + finished_check_tracked.saturating_duration_since(start_tracked_check), + ); + + None +} + +/// Checks if the transaction has been removed from the appside mempool. +/// +/// Returns a [`response::CheckTx`] with an error code and message if the transaction has been +/// removed from the appside mempool. +async fn check_removed_comet_bft( + tx_hash: [u8; 32], + mempool: &AppMempool, + metrics: &Metrics, +) -> Option { + let start_removal_check = Instant::now(); + + // check if the transaction has been removed from the appside mempool and handle + // the removal reason + if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await { + match removal_reason { + RemovalReason::Expired => { + metrics.increment_check_tx_removed_expired(); + return Some(response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), + info: "transaction expired in app's mempool".into(), + log: "Transaction expired in the app's mempool".into(), + ..response::CheckTx::default() + }); + } + RemovalReason::FailedPrepareProposal(err) => { + metrics.increment_check_tx_removed_failed_execution(); + return Some(response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), + info: "transaction failed execution in prepare_proposal()".into(), + log: format!("transaction failed execution because: {err}"), + ..response::CheckTx::default() + }); + } + RemovalReason::NonceStale => { + return Some(response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: "transaction removed from app mempool due to stale nonce".into(), + log: "Transaction from app mempool due to stale nonce".into(), + ..response::CheckTx::default() + }); + } + RemovalReason::LowerNonceInvalidated => { + return Some(response::CheckTx { + code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), + info: "transaction removed from app mempool due to lower nonce being \ + invalidated" + .into(), + log: "Transaction removed from app mempool due to lower nonce being \ + invalidated" + .into(), + ..response::CheckTx::default() + }); + } + } + }; + + let finished_removal_check = Instant::now(); + metrics.record_check_tx_duration_seconds_check_removed( + finished_removal_check.saturating_duration_since(start_removal_check), + ); + + None +} + +/// Stateless checks return a [`response::CheckTx`] if the transaction fails any of the checks. +/// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. +enum StatelessReturn { + Response(response::CheckTx), + Tx(SignedTransaction), +} + +/// Performs stateless checks on the transaction. +/// +/// Returns a [`response::CheckTx`] if the transaction fails any of the checks. +/// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. +async fn stateless_checks( + tx: Bytes, + state: &S, + metrics: &'static Metrics, +) -> StatelessReturn { + let start_parsing = Instant::now(); + let tx_len = tx.len(); if tx_len > MAX_TX_SIZE { metrics.increment_check_tx_removed_too_large(); - return response::CheckTx { + return StatelessReturn::Response(response::CheckTx { code: Code::Err(AbciErrorCode::TRANSACTION_TOO_LARGE.value()), log: format!( "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", @@ -139,31 +277,31 @@ async fn handle_check_tx tx, Err(e) => { - return response::CheckTx { + return StatelessReturn::Response(response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), log: format!("{e:#}"), info: "failed decoding bytes as a protobuf SignedTransaction".into(), ..response::CheckTx::default() - }; + }); } }; let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { Ok(tx) => tx, Err(e) => { - return response::CheckTx { + return StatelessReturn::Response(response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ the signature was invalid" .into(), log: format!("{e:#}"), ..response::CheckTx::default() - }; + }); } }; @@ -174,12 +312,12 @@ async fn handle_check_tx { - metrics.increment_check_tx_removed_expired(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), - info: "transaction expired in app's mempool".into(), - log: "Transaction expired in the app's mempool".into(), - ..response::CheckTx::default() - }; - } - RemovalReason::FailedPrepareProposal(err) => { - metrics.increment_check_tx_removed_failed_execution(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), - info: "transaction failed execution in prepare_proposal()".into(), - log: format!("transaction failed execution because: {err}"), - ..response::CheckTx::default() - }; - } - RemovalReason::NonceStale => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "transaction removed from app mempool due to stale nonce".into(), - log: "Transaction from app mempool due to stale nonce".into(), - ..response::CheckTx::default() - }; - } - RemovalReason::LowerNonceInvalidated => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), - info: "transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - log: "Transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - ..response::CheckTx::default() - }; - } - } - }; + // note: decide if worth moving to post-insertion, would have to recalculate cost + metrics.record_transaction_in_mempool_size_bytes(tx_len); - let finished_check_removed = Instant::now(); - metrics.record_check_tx_duration_seconds_check_removed( - finished_check_removed.saturating_duration_since(finished_check_chain_id), - ); + StatelessReturn::Tx(signed_tx) +} - // tx is valid, push to mempool with current state +/// Attempts to insert the transaction into the mempool. +/// +/// Returns a [`response::CheckTx`] with an error code and message if the transaction fails to be +/// inserted into the mempool. +async fn insert_into_mempool( + mempool: &AppMempool, + state: &S, + signed_tx: SignedTransaction, + metrics: &'static Metrics, +) -> Option { + let start_convert_address = Instant::now(); + + // generate address for the signed transaction let address = match state .try_base_prefixed(&signed_tx.verification_key().address_bytes()) .await .context("failed to generate address for signed transaction") { Err(err) => { - return response::CheckTx { + return Some(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to generate address because: {err:#}"), ..response::CheckTx::default() - }; + }); } Ok(address) => address, }; - // fetch current account + let finished_convert_address = Instant::now(); + metrics.record_check_tx_duration_seconds_convert_address( + finished_convert_address.saturating_duration_since(start_convert_address), + ); + + // fetch current account nonce let current_account_nonce = match state .get_account_nonce(address) .await .wrap_err("failed fetching nonce for account") { Err(err) => { - return response::CheckTx { + return Some(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to fetch account nonce because: {err:#}"), ..response::CheckTx::default() - }; + }); } Ok(nonce) => nonce, }; - let finished_convert_address = Instant::now(); - metrics.record_check_tx_duration_seconds_convert_address( - finished_convert_address.saturating_duration_since(finished_check_removed), + let finished_fetch_nonce = Instant::now(); + metrics.record_check_tx_duration_seconds_fetch_nonce( + finished_fetch_nonce.saturating_duration_since(finished_convert_address), ); // grab cost of transaction @@ -309,19 +407,19 @@ async fn handle_check_tx { - return response::CheckTx { + return Some(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to fetch cost of the transaction because: {err:#}"), ..response::CheckTx::default() - }; + }); } Ok(transaction_cost) => transaction_cost, }; let finished_fetch_tx_cost = Instant::now(); metrics.record_check_tx_duration_seconds_fetch_tx_cost( - finished_fetch_tx_cost.saturating_duration_since(finished_convert_address), + finished_fetch_tx_cost.saturating_duration_since(finished_fetch_nonce), ); // grab current account's balances @@ -331,12 +429,12 @@ async fn handle_check_tx { - return response::CheckTx { + return Some(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to fetch account balances because: {err:#}"), ..response::CheckTx::default() - }; + }); } Ok(account_balance) => account_balance, }; @@ -357,21 +455,17 @@ async fn handle_check_tx( - tx: &SignedTransaction, - state: &S, -) -> Result<()> { - let signer_address = state - .try_base_prefixed(&tx.verification_key().address_bytes()) - .await - .wrap_err( - "failed constructing the signer address from signed transaction verification and \ - prefix provided by app state", - )?; - let curr_nonce = state - .get_account_nonce(signer_address) - .await - .wrap_err("failed to get account nonce")?; - ensure!(tx.nonce() >= curr_nonce, "nonce already used by account"); - Ok(()) -} - #[instrument(skip_all)] pub(crate) async fn check_chain_id_mempool( tx: &SignedTransaction, diff --git a/crates/astria-sequencer/src/transaction/mod.rs b/crates/astria-sequencer/src/transaction/mod.rs index 9dc97c7ee1..90e4ea7fca 100644 --- a/crates/astria-sequencer/src/transaction/mod.rs +++ b/crates/astria-sequencer/src/transaction/mod.rs @@ -20,7 +20,6 @@ use astria_eyre::{ pub(crate) use checks::{ check_balance_for_total_fees_and_transfers, check_chain_id_mempool, - check_nonce_mempool, get_total_transaction_cost, }; use cnidarium::StateWrite; From 83e4718d8bae67f2f85439677279df78cdae1c4f Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Thu, 26 Sep 2024 18:26:45 -0400 Subject: [PATCH 02/11] respond to comments from @noot --- crates/astria-sequencer/src/mempool/mod.rs | 6 +- .../src/service/mempool/mod.rs | 82 ++++++++----------- 2 files changed, 36 insertions(+), 52 deletions(-) diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index dbb18088c0..ba64cbdc8b 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -167,7 +167,7 @@ impl Mempool { transaction_cost: HashMap, ) -> Result<(), InsertionError> { let timemarked_tx = TimemarkedTransaction::new(tx, transaction_cost); - + let id = timemarked_tx.id(); let (mut pending, mut parked) = self.acquire_both_locks().await; // try insert into pending @@ -181,13 +181,13 @@ impl Mempool { drop(pending); // try to add to parked queue match parked.add( - timemarked_tx.clone(), + timemarked_tx, current_account_nonce, ¤t_account_balances, ) { Ok(()) => { // track in contained txs - self.contained_txs.write().await.insert(timemarked_tx.id()); + self.contained_txs.write().await.insert(id); Ok(()) } Err(err) => Err(err), diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index 230f28bf21..ddcfe2e499 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -134,23 +134,23 @@ async fn handle_check_tx return rsp, - StatelessReturn::Tx(signed_tx) => signed_tx, + Ok(signed_tx) => signed_tx, + Err(rsp) => return rsp, }; // attempt to insert the transaction into the mempool - if let Some(rsp) = insert_into_mempool(mempool, &state, signed_tx, metrics).await { + if let Err(rsp) = insert_into_mempool(mempool, &state, signed_tx, metrics).await { return rsp; } @@ -161,37 +161,28 @@ async fn handle_check_tx Option { +async fn is_tracked(tx_hash: [u8; 32], mempool: &AppMempool, metrics: &Metrics) -> bool { let start_tracked_check = Instant::now(); - if mempool.tracked(tx_hash).await { - return Some(response::CheckTx::default()); - } + let result = mempool.tracked(tx_hash).await; let finished_check_tracked = Instant::now(); metrics.record_check_tx_duration_seconds_check_tracked( finished_check_tracked.saturating_duration_since(start_tracked_check), ); - None + result } /// Checks if the transaction has been removed from the appside mempool. /// -/// Returns a [`response::CheckTx`] with an error code and message if the transaction has been +/// Returns an Err([`response::CheckTx`]) with an error code and message if the transaction has been /// removed from the appside mempool. async fn check_removed_comet_bft( tx_hash: [u8; 32], mempool: &AppMempool, metrics: &Metrics, -) -> Option { +) -> Result<(), response::CheckTx> { let start_removal_check = Instant::now(); // check if the transaction has been removed from the appside mempool and handle @@ -200,7 +191,7 @@ async fn check_removed_comet_bft( match removal_reason { RemovalReason::Expired => { metrics.increment_check_tx_removed_expired(); - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), info: "transaction expired in app's mempool".into(), log: "Transaction expired in the app's mempool".into(), @@ -209,7 +200,7 @@ async fn check_removed_comet_bft( } RemovalReason::FailedPrepareProposal(err) => { metrics.increment_check_tx_removed_failed_execution(); - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), info: "transaction failed execution in prepare_proposal()".into(), log: format!("transaction failed execution because: {err}"), @@ -217,7 +208,7 @@ async fn check_removed_comet_bft( }); } RemovalReason::NonceStale => { - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), info: "transaction removed from app mempool due to stale nonce".into(), log: "Transaction from app mempool due to stale nonce".into(), @@ -225,7 +216,7 @@ async fn check_removed_comet_bft( }); } RemovalReason::LowerNonceInvalidated => { - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), info: "transaction removed from app mempool due to lower nonce being \ invalidated" @@ -244,32 +235,25 @@ async fn check_removed_comet_bft( finished_removal_check.saturating_duration_since(start_removal_check), ); - None -} - -/// Stateless checks return a [`response::CheckTx`] if the transaction fails any of the checks. -/// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. -enum StatelessReturn { - Response(response::CheckTx), - Tx(SignedTransaction), + Ok(()) } /// Performs stateless checks on the transaction. /// -/// Returns a [`response::CheckTx`] if the transaction fails any of the checks. +/// Returns an Err([`response::CheckTx`]) if the transaction fails any of the checks. /// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. async fn stateless_checks( tx: Bytes, state: &S, metrics: &'static Metrics, -) -> StatelessReturn { +) -> Result { let start_parsing = Instant::now(); let tx_len = tx.len(); if tx_len > MAX_TX_SIZE { metrics.increment_check_tx_removed_too_large(); - return StatelessReturn::Response(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::TRANSACTION_TOO_LARGE.value()), log: format!( "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", @@ -283,7 +267,7 @@ async fn stateless_checks tx, Err(e) => { - return StatelessReturn::Response(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), log: format!("{e:#}"), info: "failed decoding bytes as a protobuf SignedTransaction".into(), @@ -294,7 +278,7 @@ async fn stateless_checks tx, Err(e) => { - return StatelessReturn::Response(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ the signature was invalid" @@ -312,7 +296,7 @@ async fn stateless_checks( mempool: &AppMempool, state: &S, signed_tx: SignedTransaction, metrics: &'static Metrics, -) -> Option { +) -> Result<(), response::CheckTx> { let start_convert_address = Instant::now(); // generate address for the signed transaction @@ -364,7 +348,7 @@ async fn insert_into_mempool { - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to generate address because: {err:#}"), @@ -386,7 +370,7 @@ async fn insert_into_mempool { - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to fetch account nonce because: {err:#}"), @@ -407,7 +391,7 @@ async fn insert_into_mempool { - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to fetch cost of the transaction because: {err:#}"), @@ -429,7 +413,7 @@ async fn insert_into_mempool { - return Some(response::CheckTx { + return Err(response::CheckTx { code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), info: AbciErrorCode::INTERNAL_ERROR.info(), log: format!("failed to fetch account balances because: {err:#}"), @@ -455,7 +439,7 @@ async fn insert_into_mempool Date: Sat, 28 Sep 2024 12:52:10 -0400 Subject: [PATCH 03/11] respond to comment from @Fraser999 --- crates/astria-sequencer/src/app/test_utils.rs | 2 +- crates/astria-sequencer/src/grpc/sequencer.rs | 10 +- .../src/mempool/benchmarks.rs | 4 +- crates/astria-sequencer/src/mempool/mod.rs | 98 ++++++++++++++----- crates/astria-sequencer/src/metrics.rs | 19 +++- crates/astria-sequencer/src/sequencer.rs | 2 +- .../astria-sequencer/src/service/consensus.rs | 2 +- .../src/service/mempool/mod.rs | 23 ++--- .../src/service/mempool/tests.rs | 8 +- 9 files changed, 118 insertions(+), 50 deletions(-) diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index 0290b3ccf0..82584ce591 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -189,8 +189,8 @@ pub(crate) async fn initialize_app_with_storage( .await .expect("failed to create temp storage backing chain state"); let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); let genesis_state = genesis_state.unwrap_or_else(self::genesis_state); diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index f7ddf660c4..7a3c1cd170 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -221,6 +221,7 @@ mod tests { sequencerblock::v1alpha1::SequencerBlock, }; use cnidarium::StateDelta; + use telemetry::Metrics; use super::*; use crate::{ @@ -246,7 +247,8 @@ mod tests { async fn test_get_sequencer_block() { let block = make_test_sequencer_block(1); let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut state_tx = StateDelta::new(storage.latest_snapshot()); state_tx.put_block_height(1); state_tx.put_sequencer_block(block.clone()).unwrap(); @@ -264,7 +266,8 @@ mod tests { #[tokio::test] async fn get_pending_nonce_in_mempool() { let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); @@ -307,7 +310,8 @@ mod tests { use crate::accounts::StateWriteExt as _; let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut state_tx = StateDelta::new(storage.latest_snapshot()); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs index d8df9dc718..7173478b38 100644 --- a/crates/astria-sequencer/src/mempool/benchmarks.rs +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -17,6 +17,7 @@ use sha2::{ Digest as _, Sha256, }; +use telemetry::Metrics; use crate::{ app::test_utils::{ @@ -103,7 +104,8 @@ fn init_mempool() -> Mempool { .enable_all() .build() .unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_mock_balance = mock_balances(0, 0); let tx_mock_cost = mock_tx_cost(0, 0, 0); runtime.block_on(async { diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index ba64cbdc8b..21dac98530 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -37,7 +37,10 @@ use transactions_container::{ TimemarkedTransaction, }; -use crate::accounts; +use crate::{ + accounts, + Metrics, +}; #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) enum RemovalReason { @@ -133,11 +136,12 @@ pub(crate) struct Mempool { parked: Arc>>, comet_bft_removal_cache: Arc>, contained_txs: Arc>>, + metrics: &'static Metrics, } impl Mempool { #[must_use] - pub(crate) fn new() -> Self { + pub(crate) fn new(metrics: &'static Metrics) -> Self { Self { pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))), parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))), @@ -146,6 +150,7 @@ impl Mempool { .expect("Removal cache cannot be zero sized"), ))), contained_txs: Arc::new(RwLock::new(HashSet::new())), + metrics, } } @@ -156,6 +161,30 @@ impl Mempool { self.contained_txs.read().await.len() } + /// Adds a transaction to the mempool's tracked transactions. + /// Will increment logic error metrics and log error if transaction is already present. + fn add_to_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { + if !contained_txs.insert(tx_hash) { + self.metrics.increment_mempool_tx_logic_error(); + error!( + "attempted to re-add transaction {tx_hash:?} to mempool's tracked container, is \ + logic error" + ); + } + } + + /// Removes a transaction from the mempool's tracked transactions. + /// Will increment logic error metrics and log error if transaction is not present. + fn remove_from_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { + if !contained_txs.remove(&tx_hash) { + self.metrics.increment_mempool_tx_logic_error(); + error!( + "attempted to remove transaction non present {tx_hash:?} from mempool's tracked \ + container, is logic error" + ); + } + } + /// Inserts a transaction into the mempool and does not allow for transaction replacement. /// Will return the reason for insertion failure if failure occurs. #[instrument(skip_all)] @@ -214,18 +243,23 @@ impl Mempool { ); // promote the transactions for ttx in to_promote { + let tx_id = ttx.id(); if let Err(error) = pending.add(ttx, current_account_nonce, ¤t_account_balances) { + // remove from tracked + let mut contained_txs = self.contained_txs.write().await; + self.remove_from_contained_txs(timemarked_tx.id(), &mut contained_txs); error!( current_account_nonce, - "failed to promote transaction during insertion: {error:#}" + "failed to promote transaction {tx_id:?} during insertion: {error:#}" ); } } // track in contained txs - self.contained_txs.write().await.insert(timemarked_tx.id()); + let mut contained_txs = self.contained_txs.write().await; + self.add_to_contained_txs(timemarked_tx.id(), &mut contained_txs); Ok(()) } @@ -277,8 +311,9 @@ impl Mempool { // Add the original tx first to preserve its reason for removal. The second // attempt to add it inside the loop below will be a no-op. removal_cache.add(tx_hash, reason); + let mut contained_txs = self.contained_txs.write().await; for removed_tx in removed_txs { - self.contained_txs.write().await.remove(&removed_tx); + self.remove_from_contained_txs(removed_tx, &mut contained_txs); removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } } @@ -292,7 +327,7 @@ impl Mempool { /// Returns true if the transaction is tracked as inserted. #[instrument(skip_all)] - pub(crate) async fn tracked(&self, tx_hash: [u8; 32]) -> bool { + pub(crate) async fn is_tracked(&self, tx_hash: [u8; 32]) -> bool { self.contained_txs.read().await.contains(&tx_hash) } @@ -371,18 +406,29 @@ impl Mempool { parked.find_promotables(&address, highest_pending_nonce, &remaining_balances); for tx in promtion_txs { + let tx_id = tx.id(); if let Err(error) = pending.add(tx, current_nonce, ¤t_balances) { + // remove from tracked + let mut contained_txs = self.contained_txs.write().await; + self.remove_from_contained_txs(tx_id, &mut contained_txs); + // this shouldn't happen + self.metrics.increment_mempool_tx_logic_error(); error!( current_nonce, - "failed to promote transaction during maintenance: {error:#}" + "failed to promote transaction {tx_id:?} during maintenance: {error:#}" ); } } } else { // add demoted transactions to parked for tx in demotion_txs { + let tx_id = tx.id(); if let Err(err) = parked.add(tx, current_nonce, ¤t_balances) { + // remove from tracked + let mut contained_txs = self.contained_txs.write().await; + self.remove_from_contained_txs(tx_id, &mut contained_txs); // this shouldn't happen + self.metrics.increment_mempool_tx_logic_error(); error!( address = %telemetry::display::base64(&address), "failed to demote transaction during maintenance: {err:#}" @@ -428,6 +474,7 @@ impl Mempool { #[cfg(test)] mod tests { use astria_core::crypto::SigningKey; + use telemetry::Metrics; use super::*; use crate::app::test_utils::{ @@ -441,7 +488,8 @@ mod tests { #[tokio::test] async fn insert() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -502,7 +550,8 @@ mod tests { // odder edge cases that can be hit if a node goes offline or fails to see // some transactions that other nodes include into their proposed blocks. - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); let account_balances = mock_balances(100, 100); @@ -593,7 +642,8 @@ mod tests { #[tokio::test] async fn run_maintenance_promotion() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); @@ -660,7 +710,8 @@ mod tests { #[allow(clippy::too_many_lines)] #[tokio::test] async fn run_maintenance_demotion() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); @@ -740,7 +791,8 @@ mod tests { #[tokio::test] async fn remove_invalid() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 10); @@ -843,7 +895,8 @@ mod tests { #[tokio::test] async fn should_get_pending_nonce() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key_0 = SigningKey::from([1; 32]); let signing_key_1 = SigningKey::from([2; 32]); let signing_key_2 = SigningKey::from([3; 32]); @@ -967,7 +1020,8 @@ mod tests { #[tokio::test] async fn tx_tracked_set() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); let account_balances = mock_balances(100, 100); @@ -981,14 +1035,14 @@ mod tests { .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) .await .unwrap(); - assert!(mempool.tracked(tx1.id().get()).await); + assert!(mempool.is_tracked(tx1.id().get()).await); // check that the pending transaction is in the tracked set mempool .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) .await .unwrap(); - assert!(mempool.tracked(tx0.id().get()).await); + assert!(mempool.is_tracked(tx0.id().get()).await); // remove the transactions from the mempool mempool @@ -996,8 +1050,8 @@ mod tests { .await; // check that the transactions are not in the tracked set - assert!(!mempool.tracked(tx0.id().get()).await); - assert!(!mempool.tracked(tx1.id().get()).await); + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); // re-insert the transactions into the mempool mempool @@ -1010,8 +1064,8 @@ mod tests { .unwrap(); // check that the transactions are in the tracked set - assert!(mempool.tracked(tx0.id().get()).await); - assert!(mempool.tracked(tx1.id().get()).await); + assert!(mempool.is_tracked(tx0.id().get()).await); + assert!(mempool.is_tracked(tx1.id().get()).await); // remove the transacitons from the mempool via maintenance let mut mock_state = mock_state_getter().await; @@ -1019,7 +1073,7 @@ mod tests { mempool.run_maintenance(&mock_state, false).await; // check that the transactions are not in the tracked set - assert!(!mempool.tracked(tx0.id().get()).await); - assert!(!mempool.tracked(tx1.id().get()).await); + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); } } diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index 413196fe9d..3e961e3b41 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -38,6 +38,7 @@ pub struct Metrics { transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, mempool_recosted: Counter, + mempool_tx_logic_error: Counter, } impl Metrics { @@ -154,6 +155,10 @@ impl Metrics { pub(crate) fn increment_mempool_recosted(&self) { self.mempool_recosted.increment(1); } + + pub(crate) fn increment_mempool_tx_logic_error(&self) { + self.mempool_tx_logic_error.increment(1); + } } impl telemetry::Metrics for Metrics { @@ -328,6 +333,14 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let mempool_tx_logic_error = builder + .new_counter_factory( + MEMPOOL_TX_LOGIC_ERROR, + "The number of times a transaction has been rejected due to logic errors in the \ + mempool", + )? + .register()?; + Ok(Self { prepare_proposal_excluded_transactions_cometbft_space, prepare_proposal_excluded_transactions_sequencer_space, @@ -354,6 +367,7 @@ impl telemetry::Metrics for Metrics { transaction_in_mempool_size_bytes, transactions_in_mempool_total, mempool_recosted, + mempool_tx_logic_error, }) } } @@ -380,7 +394,8 @@ metric_names!(const METRICS_NAMES: ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, - MEMPOOL_RECOSTED + MEMPOOL_RECOSTED, + MEMPOOL_TX_LOGIC_ERROR ); #[cfg(test)] @@ -394,6 +409,7 @@ mod tests { CHECK_TX_REMOVED_FAILED_STATELESS, CHECK_TX_REMOVED_TOO_LARGE, MEMPOOL_RECOSTED, + MEMPOOL_TX_LOGIC_ERROR, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, @@ -465,5 +481,6 @@ mod tests { "transactions_in_mempool_total", ); assert_const(MEMPOOL_RECOSTED, "mempool_recosted"); + assert_const(MEMPOOL_TX_LOGIC_ERROR, "mempool_tx_logic_error"); } } diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index 3bd1bfd64c..a6df51421b 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -84,7 +84,7 @@ impl Sequencer { .wrap_err("failed to load storage backing chain state")?; let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); + let mempool = Mempool::new(metrics); let app = App::new(snapshot, mempool.clone(), metrics) .await .wrap_err("failed to initialize app")?; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 001d7aceae..3fd07c26d6 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -473,8 +473,8 @@ mod tests { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index ddcfe2e499..212d823654 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -164,19 +164,16 @@ async fn handle_check_tx bool { let start_tracked_check = Instant::now(); - let result = mempool.tracked(tx_hash).await; + let result = mempool.is_tracked(tx_hash).await; - let finished_check_tracked = Instant::now(); - metrics.record_check_tx_duration_seconds_check_tracked( - finished_check_tracked.saturating_duration_since(start_tracked_check), - ); + metrics.record_check_tx_duration_seconds_check_tracked(start_tracked_check.elapsed()); result } /// Checks if the transaction has been removed from the appside mempool. /// -/// Returns an Err([`response::CheckTx`]) with an error code and message if the transaction has been +/// Returns an `Err(response::CheckTx)` with an error code and message if the transaction has been /// removed from the appside mempool. async fn check_removed_comet_bft( tx_hash: [u8; 32], @@ -230,17 +227,14 @@ async fn check_removed_comet_bft( } }; - let finished_removal_check = Instant::now(); - metrics.record_check_tx_duration_seconds_check_removed( - finished_removal_check.saturating_duration_since(start_removal_check), - ); + metrics.record_check_tx_duration_seconds_check_removed(start_removal_check.elapsed()); Ok(()) } /// Performs stateless checks on the transaction. /// -/// Returns an Err([`response::CheckTx`]) if the transaction fails any of the checks. +/// Returns an `Err(response::CheckTx)` if the transaction fails any of the checks. /// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. async fn stateless_checks( tx: Bytes, @@ -318,10 +312,7 @@ async fn stateless_checks( mempool: &AppMempool, diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index d9c22ad1f8..a31b6866d6 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -30,8 +30,8 @@ async fn future_nonce_ok() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -64,8 +64,8 @@ async fn rechecks_pass() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -103,8 +103,8 @@ async fn can_reinsert_after_recheck_fail() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -152,8 +152,8 @@ async fn receck_adds_non_tracked_tx() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); From ddffdd86947c31d9efea57b18590d520611334ff Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Tue, 1 Oct 2024 10:59:16 -0400 Subject: [PATCH 04/11] update error logs and rename alert --- crates/astria-sequencer/src/mempool/mod.rs | 36 ++++++++++++++-------- crates/astria-sequencer/src/metrics.rs | 18 +++++------ 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index 21dac98530..92119b2411 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -165,10 +165,11 @@ impl Mempool { /// Will increment logic error metrics and log error if transaction is already present. fn add_to_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { if !contained_txs.insert(tx_hash) { - self.metrics.increment_mempool_tx_logic_error(); + self.metrics.increment_mempool_logic_error(); error!( - "attempted to re-add transaction {tx_hash:?} to mempool's tracked container, is \ - logic error" + tx_hash = %telemetry::display::hex(&tx_hash), + "attempted to add transaction already tracked in mempool's tracked container, is logic \ + error" ); } } @@ -177,10 +178,11 @@ impl Mempool { /// Will increment logic error metrics and log error if transaction is not present. fn remove_from_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { if !contained_txs.remove(&tx_hash) { - self.metrics.increment_mempool_tx_logic_error(); + self.metrics.increment_mempool_logic_error(); error!( - "attempted to remove transaction non present {tx_hash:?} from mempool's tracked \ - container, is logic error" + tx_hash = %telemetry::display::hex(&tx_hash), + "attempted to remove transaction absent from mempool's tracked container, is logic \ + error" ); } } @@ -252,7 +254,9 @@ impl Mempool { self.remove_from_contained_txs(timemarked_tx.id(), &mut contained_txs); error!( current_account_nonce, - "failed to promote transaction {tx_id:?} during insertion: {error:#}" + tx_hash = %telemetry::display::hex(&tx_id), + %error, + "failed to promote transaction during insertion" ); } } @@ -412,10 +416,13 @@ impl Mempool { let mut contained_txs = self.contained_txs.write().await; self.remove_from_contained_txs(tx_id, &mut contained_txs); // this shouldn't happen - self.metrics.increment_mempool_tx_logic_error(); + self.metrics.increment_mempool_logic_error(); error!( + address = %telemetry::display::base64(&address), current_nonce, - "failed to promote transaction {tx_id:?} during maintenance: {error:#}" + tx_hash = %telemetry::display::hex(&tx_id), + %error, + "failed to promote transaction during maintenance" ); } } @@ -423,15 +430,18 @@ impl Mempool { // add demoted transactions to parked for tx in demotion_txs { let tx_id = tx.id(); - if let Err(err) = parked.add(tx, current_nonce, ¤t_balances) { + if let Err(error) = parked.add(tx, current_nonce, ¤t_balances) { // remove from tracked let mut contained_txs = self.contained_txs.write().await; self.remove_from_contained_txs(tx_id, &mut contained_txs); // this shouldn't happen - self.metrics.increment_mempool_tx_logic_error(); + self.metrics.increment_mempool_logic_error(); error!( - address = %telemetry::display::base64(&address), - "failed to demote transaction during maintenance: {err:#}" + address = %telemetry::display::base64(&address), + current_nonce, + tx_hash = %telemetry::display::hex(&tx_id), + %error, + "failed to demote transaction during maintenance" ); } } diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index 3e961e3b41..982f756934 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -38,7 +38,7 @@ pub struct Metrics { transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, mempool_recosted: Counter, - mempool_tx_logic_error: Counter, + mempool_logic_error: Counter, } impl Metrics { @@ -156,8 +156,8 @@ impl Metrics { self.mempool_recosted.increment(1); } - pub(crate) fn increment_mempool_tx_logic_error(&self) { - self.mempool_tx_logic_error.increment(1); + pub(crate) fn increment_mempool_logic_error(&self) { + self.mempool_logic_error.increment(1); } } @@ -333,9 +333,9 @@ impl telemetry::Metrics for Metrics { )? .register()?; - let mempool_tx_logic_error = builder + let mempool_logic_error = builder .new_counter_factory( - MEMPOOL_TX_LOGIC_ERROR, + MEMPOOL_LOGIC_ERROR, "The number of times a transaction has been rejected due to logic errors in the \ mempool", )? @@ -367,7 +367,7 @@ impl telemetry::Metrics for Metrics { transaction_in_mempool_size_bytes, transactions_in_mempool_total, mempool_recosted, - mempool_tx_logic_error, + mempool_logic_error, }) } } @@ -395,7 +395,7 @@ metric_names!(const METRICS_NAMES: TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, MEMPOOL_RECOSTED, - MEMPOOL_TX_LOGIC_ERROR + MEMPOOL_LOGIC_ERROR ); #[cfg(test)] @@ -408,8 +408,8 @@ mod tests { CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, CHECK_TX_REMOVED_TOO_LARGE, + MEMPOOL_LOGIC_ERROR, MEMPOOL_RECOSTED, - MEMPOOL_TX_LOGIC_ERROR, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, @@ -481,6 +481,6 @@ mod tests { "transactions_in_mempool_total", ); assert_const(MEMPOOL_RECOSTED, "mempool_recosted"); - assert_const(MEMPOOL_TX_LOGIC_ERROR, "mempool_tx_logic_error"); + assert_const(MEMPOOL_LOGIC_ERROR, "mempool_logic_error"); } } From 8812a4e0498defb4cd04721e2a8a989ca027bf41 Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Tue, 1 Oct 2024 11:21:37 -0400 Subject: [PATCH 05/11] remove non existant import --- crates/astria-sequencer/src/service/mempool/tests.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index a31b6866d6..13758bf5e9 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -12,10 +12,7 @@ use tendermint::{ use crate::{ app::{ - test_utils::{ - get_alice_signing_key, - mock_tx, - }, + test_utils::get_alice_signing_key, App, }, mempool::{ From cc3abb889ce52500a9b80c3bd190cd8fe31e8e13 Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Tue, 1 Oct 2024 11:24:34 -0400 Subject: [PATCH 06/11] fix tests --- crates/astria-sequencer/src/mempool/mod.rs | 12 ++++++++---- .../src/service/mempool/tests.rs | 17 +++++------------ 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index 47b013eb0d..9946ccdf09 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -669,7 +669,7 @@ mod tests { #[tokio::test] async fn run_maintenance_promotion() { - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); let mempool = Mempool::new(metrics); // create transaction setup to trigger promotions @@ -742,7 +742,7 @@ mod tests { #[tokio::test] async fn run_maintenance_demotion() { - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); let mempool = Mempool::new(metrics); // create transaction setup to trigger demotions @@ -837,7 +837,7 @@ mod tests { #[tokio::test] async fn remove_invalid() { - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 10); @@ -1130,7 +1130,11 @@ mod tests { // remove the transacitons from the mempool via maintenance let mut mock_state = mock_state_getter().await; - mock_state_put_account_nonce(&mut mock_state, astria_address_from_hex_string(ALICE_ADDRESS).bytes(), 2); + mock_state_put_account_nonce( + &mut mock_state, + astria_address_from_hex_string(ALICE_ADDRESS).bytes(), + 2, + ); mempool.run_maintenance(&mock_state, false).await; // check that the transactions are not in the tracked set diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index 13758bf5e9..bb9b6ea0d8 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -12,7 +12,7 @@ use tendermint::{ use crate::{ app::{ - test_utils::get_alice_signing_key, + test_utils::MockTxBuilder, App, }, mempool::{ @@ -38,11 +38,7 @@ async fn future_nonce_ok() { app.commit(storage.clone()).await; let the_future_nonce = 10; - let tx = mock_tx( - the_future_nonce, - &get_alice_signing_key(), - "target_rollup_id", - ); + let tx = MockTxBuilder::new().nonce(the_future_nonce).build(); let req = CheckTx { tx: tx.to_raw().encode_to_vec().into(), kind: CheckTxKind::New, @@ -71,8 +67,7 @@ async fn rechecks_pass() { .unwrap(); app.commit(storage.clone()).await; - let nonce = 0; - let tx = mock_tx(nonce, &get_alice_signing_key(), "target_rollup_id"); + let tx = MockTxBuilder::new().nonce(0).build(); let req = CheckTx { tx: tx.to_raw().encode_to_vec().into(), kind: CheckTxKind::New, @@ -110,8 +105,7 @@ async fn can_reinsert_after_recheck_fail() { .unwrap(); app.commit(storage.clone()).await; - let nonce = 0; - let tx = mock_tx(nonce, &get_alice_signing_key(), "target_rollup_id"); + let tx = MockTxBuilder::new().nonce(0).build(); let req = CheckTx { tx: tx.to_raw().encode_to_vec().into(), kind: CheckTxKind::New, @@ -159,8 +153,7 @@ async fn receck_adds_non_tracked_tx() { .unwrap(); app.commit(storage.clone()).await; - let nonce = 0; - let tx = mock_tx(nonce, &get_alice_signing_key(), "target_rollup_id"); + let tx = MockTxBuilder::new().nonce(0).build(); let req = CheckTx { tx: tx.to_raw().encode_to_vec().into(), kind: CheckTxKind::Recheck, From 98df89c381b1d65978f9cc1ced67d0d35365deec Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Tue, 1 Oct 2024 14:42:38 -0400 Subject: [PATCH 07/11] update metric name --- crates/astria-sequencer/src/mempool/mod.rs | 8 ++++---- crates/astria-sequencer/src/metrics.rs | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index 9946ccdf09..69bdc0c7ea 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -165,7 +165,7 @@ impl Mempool { /// Will increment logic error metrics and log error if transaction is already present. fn add_to_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { if !contained_txs.insert(tx_hash) { - self.metrics.increment_mempool_logic_error(); + self.metrics.increment_internal_logic_error(); error!( tx_hash = %telemetry::display::hex(&tx_hash), "attempted to add transaction already tracked in mempool's tracked container, is logic \ @@ -178,7 +178,7 @@ impl Mempool { /// Will increment logic error metrics and log error if transaction is not present. fn remove_from_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { if !contained_txs.remove(&tx_hash) { - self.metrics.increment_mempool_logic_error(); + self.metrics.increment_internal_logic_error(); error!( tx_hash = %telemetry::display::hex(&tx_hash), "attempted to remove transaction absent from mempool's tracked container, is logic \ @@ -416,7 +416,7 @@ impl Mempool { let mut contained_txs = self.contained_txs.write().await; self.remove_from_contained_txs(tx_id, &mut contained_txs); // this shouldn't happen - self.metrics.increment_mempool_logic_error(); + self.metrics.increment_internal_logic_error(); error!( address = %telemetry::display::base64(&address), current_nonce, @@ -435,7 +435,7 @@ impl Mempool { let mut contained_txs = self.contained_txs.write().await; self.remove_from_contained_txs(tx_id, &mut contained_txs); // this shouldn't happen - self.metrics.increment_mempool_logic_error(); + self.metrics.increment_internal_logic_error(); error!( address = %telemetry::display::base64(&address), current_nonce, diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index 282e65ad71..dbb3858ef7 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -38,7 +38,7 @@ pub struct Metrics { transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, mempool_recosted: Counter, - mempool_logic_error: Counter, + internal_logic_error: Counter, } impl Metrics { @@ -156,8 +156,8 @@ impl Metrics { self.mempool_recosted.increment(1); } - pub(crate) fn increment_mempool_logic_error(&self) { - self.mempool_logic_error.increment(1); + pub(crate) fn increment_internal_logic_error(&self) { + self.internal_logic_error.increment(1); } } @@ -335,9 +335,9 @@ impl telemetry::Metrics for Metrics { )? .register()?; - let mempool_logic_error = builder + let internal_logic_error = builder .new_counter_factory( - MEMPOOL_LOGIC_ERROR, + INTERNAL_LOGIC_ERROR, "The number of times a transaction has been rejected due to logic errors in the \ mempool", )? @@ -369,7 +369,7 @@ impl telemetry::Metrics for Metrics { transaction_in_mempool_size_bytes, transactions_in_mempool_total, mempool_recosted, - mempool_logic_error, + internal_logic_error, }) } } @@ -397,7 +397,7 @@ metric_names!(const METRICS_NAMES: TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, MEMPOOL_RECOSTED, - MEMPOOL_LOGIC_ERROR + INTERNAL_LOGIC_ERROR ); #[cfg(test)] @@ -410,7 +410,7 @@ mod tests { CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, CHECK_TX_REMOVED_TOO_LARGE, - MEMPOOL_LOGIC_ERROR, + INTERNAL_LOGIC_ERROR, MEMPOOL_RECOSTED, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, @@ -483,6 +483,6 @@ mod tests { "transactions_in_mempool_total", ); assert_const(MEMPOOL_RECOSTED, "mempool_recosted"); - assert_const(MEMPOOL_LOGIC_ERROR, "mempool_logic_error"); + assert_const(INTERNAL_LOGIC_ERROR, "internal_logic_error"); } } From 021faa2c2c9ffcb1c94acc4ca5e047e3bf294fca Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Wed, 2 Oct 2024 13:39:21 -0400 Subject: [PATCH 08/11] add comment --- crates/astria-sequencer/src/mempool/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index efdb26bf10..fcd3a9fc18 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -250,6 +250,8 @@ impl Mempool { pending.add(ttx, current_account_nonce, ¤t_account_balances) { // remove from tracked + // note: this branch is not expected to be hit so grabbing the lock inside + // of the loop is more performant. let mut contained_txs = self.contained_txs.write().await; self.remove_from_contained_txs(timemarked_tx.id(), &mut contained_txs); error!( From 890acfba68dd4cbd97cfb85b34fe016a6acd4f45 Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Wed, 2 Oct 2024 16:04:47 -0400 Subject: [PATCH 09/11] fix error message --- .../src/service/mempool/mod.rs | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index 796696e97c..daf9579956 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -50,6 +50,7 @@ use crate::{ app::ActionHandler as _, mempool::{ get_account_balances, + InsertionError, Mempool as AppMempool, RemovalReason, }, @@ -324,6 +325,7 @@ async fn stateless_checks( mempool: &AppMempool, state: &S, @@ -430,12 +432,24 @@ async fn insert_into_mempool { + return Err(response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: "transaction failed because account nonce is too low".into(), + log: format!("transaction failed because account nonce is too low: {err:#}"), + ..response::CheckTx::default() + }); + } + _ => { + return Err(response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_INSERTION_FAILED.value()), + info: "transaction insertion failed".into(), + log: format!("transaction insertion failed because: {err:#}"), + ..response::CheckTx::default() + }); + } + } } metrics From 4501fe484846d33e621251d6efe13b5d0a9ce6df Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Fri, 4 Oct 2024 09:44:20 -0400 Subject: [PATCH 10/11] respond to comments by @SuperFluffy --- crates/astria-core/src/protocol/abci.rs | 10 + crates/astria-sequencer/src/mempool/mod.rs | 92 +++--- .../src/service/mempool/mod.rs | 263 ++++++++++-------- .../src/service/mempool/tests.rs | 4 +- 4 files changed, 207 insertions(+), 162 deletions(-) diff --git a/crates/astria-core/src/protocol/abci.rs b/crates/astria-core/src/protocol/abci.rs index 5caf357459..7cacc6b780 100644 --- a/crates/astria-core/src/protocol/abci.rs +++ b/crates/astria-core/src/protocol/abci.rs @@ -22,6 +22,9 @@ impl AbciErrorCode { pub const TRANSACTION_INSERTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(11) }); pub const LOWER_NONCE_INVALIDATED: Self = Self(unsafe { NonZeroU32::new_unchecked(12) }); pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(13) }); + pub const ALREADY_PRESENT: Self = Self(unsafe { NonZeroU32::new_unchecked(14) }); + pub const NONCE_TAKEN: Self = Self(unsafe { NonZeroU32::new_unchecked(15) }); + pub const ACCOUNT_SIZE_LIMIT: Self = Self(unsafe { NonZeroU32::new_unchecked(16) }); } impl AbciErrorCode { @@ -52,6 +55,13 @@ impl AbciErrorCode { } Self::LOWER_NONCE_INVALIDATED => "lower nonce was invalidated in mempool".into(), Self::BAD_REQUEST => "the request payload was malformed".into(), + Self::ALREADY_PRESENT => "the transaction is already present in the mempool".into(), + Self::NONCE_TAKEN => "there is already a transaction with the same nonce for the \ + account in the mempool" + .into(), + Self::ACCOUNT_SIZE_LIMIT => { + "the account has reached the maximum number of parked transactions".into() + } Self(other) => { format!("invalid error code {other}: should be unreachable (this is a bug)") } diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index fcd3a9fc18..c170b84073 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -106,6 +106,35 @@ impl RemovalCache { } } +struct ContainedTxLock<'a> { + mempool: &'a Mempool, + txs: RwLockWriteGuard<'a, HashSet<[u8; 32]>>, +} + +impl<'a> ContainedTxLock<'a> { + fn add(&mut self, id: [u8; 32]) { + if !self.txs.insert(id) { + self.mempool.metrics.increment_internal_logic_error(); + error!( + tx_hash = %telemetry::display::hex(&id), + "attempted to add transaction already tracked in mempool's tracked container, is logic \ + error" + ); + } + } + + fn remove(&mut self, id: [u8; 32]) { + if !self.txs.remove(&id) { + self.mempool.metrics.increment_internal_logic_error(); + error!( + tx_hash = %telemetry::display::hex(&id), + "attempted to remove transaction absent from mempool's tracked container, is logic \ + error" + ); + } + } +} + /// [`Mempool`] is an account-based structure for maintaining transactions for execution. /// /// The transactions are split between pending and parked, where pending transactions are ready for @@ -161,29 +190,10 @@ impl Mempool { self.contained_txs.read().await.len() } - /// Adds a transaction to the mempool's tracked transactions. - /// Will increment logic error metrics and log error if transaction is already present. - fn add_to_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { - if !contained_txs.insert(tx_hash) { - self.metrics.increment_internal_logic_error(); - error!( - tx_hash = %telemetry::display::hex(&tx_hash), - "attempted to add transaction already tracked in mempool's tracked container, is logic \ - error" - ); - } - } - - /// Removes a transaction from the mempool's tracked transactions. - /// Will increment logic error metrics and log error if transaction is not present. - fn remove_from_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { - if !contained_txs.remove(&tx_hash) { - self.metrics.increment_internal_logic_error(); - error!( - tx_hash = %telemetry::display::hex(&tx_hash), - "attempted to remove transaction absent from mempool's tracked container, is logic \ - error" - ); + async fn lock_contained_txs(&self) -> ContainedTxLock<'_> { + ContainedTxLock { + mempool: self, + txs: self.contained_txs.write().await, } } @@ -249,11 +259,10 @@ impl Mempool { if let Err(error) = pending.add(ttx, current_account_nonce, ¤t_account_balances) { - // remove from tracked - // note: this branch is not expected to be hit so grabbing the lock inside + // NOTE: this branch is not expected to be hit so grabbing the lock inside // of the loop is more performant. - let mut contained_txs = self.contained_txs.write().await; - self.remove_from_contained_txs(timemarked_tx.id(), &mut contained_txs); + let mut contained_lock = self.lock_contained_txs().await; + contained_lock.remove(timemarked_tx.id()); error!( current_account_nonce, tx_hash = %telemetry::display::hex(&tx_id), @@ -264,8 +273,8 @@ impl Mempool { } // track in contained txs - let mut contained_txs = self.contained_txs.write().await; - self.add_to_contained_txs(timemarked_tx.id(), &mut contained_txs); + let mut contained_lock = self.lock_contained_txs().await; + contained_lock.add(timemarked_tx.id()); Ok(()) } @@ -317,9 +326,9 @@ impl Mempool { // Add the original tx first to preserve its reason for removal. The second // attempt to add it inside the loop below will be a no-op. removal_cache.add(tx_hash, reason); - let mut contained_txs = self.contained_txs.write().await; + let mut contained_lock = self.lock_contained_txs().await; for removed_tx in removed_txs { - self.remove_from_contained_txs(removed_tx, &mut contained_txs); + contained_lock.remove(removed_tx); removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } } @@ -414,10 +423,9 @@ impl Mempool { for tx in promtion_txs { let tx_id = tx.id(); if let Err(error) = pending.add(tx, current_nonce, ¤t_balances) { - // remove from tracked - let mut contained_txs = self.contained_txs.write().await; - self.remove_from_contained_txs(tx_id, &mut contained_txs); - // this shouldn't happen + // NOTE: this shouldn't happen. Promotions should never fail. + let mut contained_lock = self.lock_contained_txs().await; + contained_lock.remove(tx_id); self.metrics.increment_internal_logic_error(); error!( address = %telemetry::display::base64(&address), @@ -433,10 +441,10 @@ impl Mempool { for tx in demotion_txs { let tx_id = tx.id(); if let Err(error) = parked.add(tx, current_nonce, ¤t_balances) { - // remove from tracked - let mut contained_txs = self.contained_txs.write().await; - self.remove_from_contained_txs(tx_id, &mut contained_txs); - // this shouldn't happen + // NOTE: this shouldn't happen normally but could on the edge case of + // the parked queue being full for the account. + let mut contained_lock = self.lock_contained_txs().await; + contained_lock.remove(tx_id); self.metrics.increment_internal_logic_error(); error!( address = %telemetry::display::base64(&address), @@ -1084,7 +1092,7 @@ mod tests { } #[tokio::test] - async fn tx_tracked_set() { + async fn tx_tracked_edge_cases() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); @@ -1107,7 +1115,7 @@ mod tests { .unwrap(); assert!(mempool.is_tracked(tx0.id().get()).await); - // remove the transactions from the mempool + // remove the transactions from the mempool, should remove both mempool .remove_tx_invalid(tx0.clone(), RemovalReason::Expired) .await; @@ -1126,7 +1134,7 @@ mod tests { .await .unwrap(); - // check that the transactions are in the tracked set + // check that the transactions are in the tracked set on re-insertion assert!(mempool.is_tracked(tx0.id().get()).await); assert!(mempool.is_tracked(tx1.id().get()).await); diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index daf9579956..425d199798 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -1,6 +1,3 @@ -#[cfg(test)] -mod tests; - use std::{ collections::HashMap, pin::Pin, @@ -22,7 +19,10 @@ use astria_core::{ }; use astria_eyre::eyre::WrapErr as _; use bytes::Bytes; -use cnidarium::Storage; +use cnidarium::{ + StateRead, + Storage, +}; use futures::{ Future, FutureExt, @@ -45,8 +45,8 @@ use tracing::{ }; use crate::{ - accounts, - address, + accounts::StateReadExt as _, + address::StateReadExt as _, app::ActionHandler as _, mempool::{ get_account_balances, @@ -58,8 +58,98 @@ use crate::{ transaction, }; +#[cfg(test)] +mod tests; + const MAX_TX_SIZE: usize = 256_000; // 256 KB +pub(crate) trait IntoCheckTxResponse { + fn into_check_tx_response(self) -> response::CheckTx; +} + +impl IntoCheckTxResponse for RemovalReason { + fn into_check_tx_response(self) -> response::CheckTx { + match self { + RemovalReason::Expired => response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), + info: AbciErrorCode::TRANSACTION_EXPIRED.to_string(), + log: "transaction expired in the app's mempool".into(), + ..response::CheckTx::default() + }, + RemovalReason::FailedPrepareProposal(err) => response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), + info: AbciErrorCode::TRANSACTION_FAILED.to_string(), + log: format!("transaction failed execution because: {err}"), + ..response::CheckTx::default() + }, + RemovalReason::NonceStale => response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: "transaction removed from app mempool due to stale nonce".into(), + log: "transaction from app mempool due to stale nonce".into(), + ..response::CheckTx::default() + }, + RemovalReason::LowerNonceInvalidated => response::CheckTx { + code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), + info: AbciErrorCode::LOWER_NONCE_INVALIDATED.to_string(), + log: "transaction removed from app mempool due to lower nonce being invalidated" + .into(), + ..response::CheckTx::default() + }, + } + } +} + +impl IntoCheckTxResponse for InsertionError { + fn into_check_tx_response(self) -> response::CheckTx { + match self { + InsertionError::AlreadyPresent => response::CheckTx { + code: Code::Err(AbciErrorCode::ALREADY_PRESENT.value()), + info: AbciErrorCode::ALREADY_PRESENT.to_string(), + log: InsertionError::AlreadyPresent.to_string(), + ..response::CheckTx::default() + }, + InsertionError::NonceTooLow => response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: AbciErrorCode::INVALID_NONCE.to_string(), + log: InsertionError::NonceTooLow.to_string(), + ..response::CheckTx::default() + }, + InsertionError::NonceTaken => response::CheckTx { + code: Code::Err(AbciErrorCode::NONCE_TAKEN.value()), + info: AbciErrorCode::NONCE_TAKEN.to_string(), + log: InsertionError::NonceTaken.to_string(), + ..response::CheckTx::default() + }, + InsertionError::AccountSizeLimit => response::CheckTx { + code: Code::Err(AbciErrorCode::ACCOUNT_SIZE_LIMIT.value()), + info: AbciErrorCode::ACCOUNT_SIZE_LIMIT.to_string(), + log: InsertionError::AccountSizeLimit.to_string(), + ..response::CheckTx::default() + }, + InsertionError::AccountBalanceTooLow | InsertionError::NonceGap => { + // NOTE: these are handled interally by the mempool and don't + // block transaction inclusion in the mempool. they shouldn't + // be bubbled up to the client. + response::CheckTx { + code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), + info: AbciErrorCode::INTERNAL_ERROR.info(), + log: "transaction failed insertion because of an internal error".into(), + ..response::CheckTx::default() + } + } + } + } +} + +fn error_response(abci_error_code: AbciErrorCode, log: String) -> response::CheckTx { + response::CheckTx { + code: Code::Err(abci_error_code.value()), + info: abci_error_code.info(), + log, + ..response::CheckTx::default() + } +} + /// Mempool handles [`request::CheckTx`] abci requests. // /// It performs a stateless check of the given transaction, @@ -120,7 +210,7 @@ impl Service for Mempool { /// - Is already in the appside mempool /// - Passes stateless checks and insertion into the mempool is successful #[instrument(skip_all)] -async fn handle_check_tx( +async fn handle_check_tx( req: request::CheckTx, state: S, mempool: &mut AppMempool, @@ -189,42 +279,13 @@ async fn check_removed_comet_bft( match removal_reason { RemovalReason::Expired => { metrics.increment_check_tx_removed_expired(); - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), - info: "transaction expired in app's mempool".into(), - log: "Transaction expired in the app's mempool".into(), - ..response::CheckTx::default() - }); + return Err(removal_reason.into_check_tx_response()); } - RemovalReason::FailedPrepareProposal(err) => { + RemovalReason::FailedPrepareProposal(_) => { metrics.increment_check_tx_removed_failed_execution(); - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), - info: "transaction failed execution in prepare_proposal()".into(), - log: format!("transaction failed execution because: {err}"), - ..response::CheckTx::default() - }); - } - RemovalReason::NonceStale => { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "transaction removed from app mempool due to stale nonce".into(), - log: "Transaction from app mempool due to stale nonce".into(), - ..response::CheckTx::default() - }); - } - RemovalReason::LowerNonceInvalidated => { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), - info: "transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - log: "Transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - ..response::CheckTx::default() - }); + return Err(removal_reason.into_check_tx_response()); } + _ => return Err(removal_reason.into_check_tx_response()), } }; @@ -237,7 +298,7 @@ async fn check_removed_comet_bft( /// /// Returns an `Err(response::CheckTx)` if the transaction fails any of the checks. /// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. -async fn stateless_checks( +async fn stateless_checks( tx: Bytes, state: &S, metrics: &'static Metrics, @@ -248,39 +309,34 @@ async fn stateless_checks MAX_TX_SIZE { metrics.increment_check_tx_removed_too_large(); - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_TOO_LARGE.value()), - log: format!( + return Err(error_response( + AbciErrorCode::TRANSACTION_TOO_LARGE, + format!( "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", tx.len() ), - info: AbciErrorCode::TRANSACTION_TOO_LARGE.info(), - ..response::CheckTx::default() - }); + )); } let raw_signed_tx = match raw::SignedTransaction::decode(tx) { Ok(tx) => tx, Err(e) => { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - log: format!("{e:#}"), - info: "failed decoding bytes as a protobuf SignedTransaction".into(), - ..response::CheckTx::default() - }); + return Err(error_response( + AbciErrorCode::INVALID_PARAMETER, + format!("failed decoding bytes as a protobuf SignedTransaction: {e:#}"), + )); } }; let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { Ok(tx) => tx, Err(e) => { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ - the signature was invalid" - .into(), - log: format!("{e:#}"), - ..response::CheckTx::default() - }); + return Err(error_response( + AbciErrorCode::INVALID_PARAMETER, + format!( + "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ + the signature was invalid: {e:#}" + ), + )); } }; @@ -291,12 +347,10 @@ async fn stateless_checks( +async fn insert_into_mempool( mempool: &AppMempool, state: &S, signed_tx: SignedTransaction, @@ -334,6 +385,7 @@ async fn insert_into_mempool Result<(), response::CheckTx> { let start_convert_address = Instant::now(); + // TODO: just use address bytes directly https://github.com/astriaorg/astria/issues/1620 // generate address for the signed transaction let address = match state .try_base_prefixed(signed_tx.verification_key().address_bytes()) @@ -341,12 +393,10 @@ async fn insert_into_mempool { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to generate address because: {err:#}"), - ..response::CheckTx::default() - }); + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to generate address because: {err:#}"), + )); } Ok(address) => address, }; @@ -363,12 +413,10 @@ async fn insert_into_mempool { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch account nonce because: {err:#}"), - ..response::CheckTx::default() - }); + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch account nonce because: {err:#}"), + )); } Ok(nonce) => nonce, }; @@ -384,12 +432,10 @@ async fn insert_into_mempool { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch cost of the transaction because: {err:#}"), - ..response::CheckTx::default() - }); + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch cost of the transaction because: {err:#}"), + )); } Ok(transaction_cost) => transaction_cost, }; @@ -406,12 +452,10 @@ async fn insert_into_mempool { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch account balances because: {err:#}"), - ..response::CheckTx::default() - }); + return Err(error_response( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch account balances because: {err:#}"), + )); } Ok(account_balance) => account_balance, }; @@ -432,24 +476,7 @@ async fn insert_into_mempool { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "transaction failed because account nonce is too low".into(), - log: format!("transaction failed because account nonce is too low: {err:#}"), - ..response::CheckTx::default() - }); - } - _ => { - return Err(response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_INSERTION_FAILED.value()), - info: "transaction insertion failed".into(), - log: format!("transaction insertion failed because: {err:#}"), - ..response::CheckTx::default() - }); - } - } + return Err(err.into_check_tx_response()); } metrics diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index bb9b6ea0d8..b6511e9c67 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -22,7 +22,7 @@ use crate::{ }; #[tokio::test] -async fn future_nonce_ok() { +async fn future_nonces_are_accepted() { // The mempool should allow future nonces. let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); @@ -136,7 +136,7 @@ async fn can_reinsert_after_recheck_fail() { } #[tokio::test] -async fn receck_adds_non_tracked_tx() { +async fn recheck_adds_non_tracked_tx() { // The mempool should be able to insert a transaction on recheck if it isn't in the mempool. // This could happen in the case of a sequencer restart as the cometbft mempool persists but // the appside one does not. From ac66880573f20be22e73ca7ee3b2491bc98ef4a2 Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Fri, 4 Oct 2024 10:54:11 -0400 Subject: [PATCH 11/11] make contained_tx code consistent and improve test clarity --- crates/astria-sequencer/src/mempool/mod.rs | 91 +++++++++++++++++----- 1 file changed, 70 insertions(+), 21 deletions(-) diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index c170b84073..6573bbfbd5 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -228,7 +228,7 @@ impl Mempool { ) { Ok(()) => { // track in contained txs - self.contained_txs.write().await.insert(id); + self.lock_contained_txs().await.add(id); Ok(()) } Err(err) => Err(err), @@ -261,8 +261,7 @@ impl Mempool { { // NOTE: this branch is not expected to be hit so grabbing the lock inside // of the loop is more performant. - let mut contained_lock = self.lock_contained_txs().await; - contained_lock.remove(timemarked_tx.id()); + self.lock_contained_txs().await.remove(timemarked_tx.id()); error!( current_account_nonce, tx_hash = %telemetry::display::hex(&tx_id), @@ -273,8 +272,7 @@ impl Mempool { } // track in contained txs - let mut contained_lock = self.lock_contained_txs().await; - contained_lock.add(timemarked_tx.id()); + self.lock_contained_txs().await.add(timemarked_tx.id()); Ok(()) } @@ -423,9 +421,10 @@ impl Mempool { for tx in promtion_txs { let tx_id = tx.id(); if let Err(error) = pending.add(tx, current_nonce, ¤t_balances) { - // NOTE: this shouldn't happen. Promotions should never fail. - let mut contained_lock = self.lock_contained_txs().await; - contained_lock.remove(tx_id); + // NOTE: this shouldn't happen. Promotions should never fail. This also + // means grabbing the lock inside the loop is more + // performant. + self.lock_contained_txs().await.remove(tx_id); self.metrics.increment_internal_logic_error(); error!( address = %telemetry::display::base64(&address), @@ -442,9 +441,9 @@ impl Mempool { let tx_id = tx.id(); if let Err(error) = parked.add(tx, current_nonce, ¤t_balances) { // NOTE: this shouldn't happen normally but could on the edge case of - // the parked queue being full for the account. - let mut contained_lock = self.lock_contained_txs().await; - contained_lock.remove(tx_id); + // the parked queue being full for the account. This also means + // grabbing the lock inside the loop is more performant. + self.lock_contained_txs().await.remove(tx_id); self.metrics.increment_internal_logic_error(); error!( address = %telemetry::display::base64(&address), @@ -463,10 +462,10 @@ impl Mempool { // add to removal cache for cometbft and remove from the tracked set let mut removal_cache = self.comet_bft_removal_cache.write().await; - let mut tracked_txs = self.contained_txs.write().await; + let mut contained_lock = self.lock_contained_txs().await; for (tx_hash, reason) in removed_txs { removal_cache.add(tx_hash, reason); - tracked_txs.remove(&tx_hash); + contained_lock.remove(tx_hash); } } @@ -528,6 +527,7 @@ mod tests { .is_ok(), "should be able to insert nonce 1 transaction into mempool" ); + assert_eq!(mempool.len().await, 1); // try to insert again assert_eq!( @@ -1092,7 +1092,7 @@ mod tests { } #[tokio::test] - async fn tx_tracked_edge_cases() { + async fn tx_tracked_invalid_removal_removes_all() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); let mempool = Mempool::new(metrics); let account_balances = mock_balances(100, 100); @@ -1123,21 +1123,27 @@ mod tests { // check that the transactions are not in the tracked set assert!(!mempool.is_tracked(tx0.id().get()).await); assert!(!mempool.is_tracked(tx1.id().get()).await); + } + + #[tokio::test] + async fn tx_tracked_maintenance_removes_all() { + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = MockTxBuilder::new().nonce(0).build(); + let tx1 = MockTxBuilder::new().nonce(1).build(); - // re-insert the transactions into the mempool mempool - .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) .await .unwrap(); mempool - .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) .await .unwrap(); - // check that the transactions are in the tracked set on re-insertion - assert!(mempool.is_tracked(tx0.id().get()).await); - assert!(mempool.is_tracked(tx1.id().get()).await); - // remove the transacitons from the mempool via maintenance let mut mock_state = mock_state_getter().await; mock_state_put_account_nonce( @@ -1151,4 +1157,47 @@ mod tests { assert!(!mempool.is_tracked(tx0.id().get()).await); assert!(!mempool.is_tracked(tx1.id().get()).await); } + + #[tokio::test] + async fn tx_tracked_reinsertion_ok() { + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = MockTxBuilder::new().nonce(0).build(); + let tx1 = MockTxBuilder::new().nonce(1).build(); + + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // remove the transactions from the mempool, should remove both + mempool + .remove_tx_invalid(tx0.clone(), RemovalReason::Expired) + .await; + + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); + + // re-insert the transactions into the mempool + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // check that the transactions are in the tracked set on re-insertion + assert!(mempool.is_tracked(tx0.id().get()).await); + assert!(mempool.is_tracked(tx1.id().get()).await); + } }