diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs index 5317af922..14f65730e 100644 --- a/src/builder/bundle_proposer.rs +++ b/src/builder/bundle_proposer.rs @@ -12,22 +12,26 @@ use futures::future; use linked_hash_map::LinkedHashMap; #[cfg(test)] use mockall::automock; -use tokio::try_join; +use tokio::{sync::broadcast, try_join}; use tonic::{async_trait, transport::Channel}; use tracing::{error, info}; -use crate::common::{ - contracts::entry_point::UserOpsPerAggregator, - gas::{FeeEstimator, GasFees, PriorityFeeMode}, - math, - protos::{ - self, - op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp}, - }, - simulation::{SimulationError, SimulationSuccess, Simulator}, - types::{ - Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, Timestamp, - UserOperation, +use crate::{ + builder::emit::{BuilderEvent, OpRejectionReason, SkipReason}, + common::{ + contracts::entry_point::UserOpsPerAggregator, + emit::WithEntryPoint, + gas::{FeeEstimator, GasFees, PriorityFeeMode}, + math, + protos::{ + self, + op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp}, + }, + simulation::{SimulationError, SimulationSuccess, Simulator}, + types::{ + Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, + Timestamp, UserOperation, + }, }, }; @@ -57,6 +61,10 @@ impl Bundle { pub fn is_empty(&self) -> bool { self.ops_per_aggregator.is_empty() } + + pub fn iter_ops(&self) -> impl Iterator + '_ { + self.ops_per_aggregator.iter().flat_map(|ops| &ops.user_ops) + } } #[cfg_attr(test, automock)] @@ -79,10 +87,12 @@ where chain_id: u64, settings: Settings, fee_estimator: FeeEstimator

, + event_sender: broadcast::Sender>, } #[derive(Debug)] pub struct Settings { + pub chain_id: u64, pub max_bundle_size: u64, pub beneficiary: Address, pub use_bundle_priority_fee: Option, @@ -176,6 +186,7 @@ where provider: Arc

, chain_id: u64, settings: Settings, + event_sender: broadcast::Sender>, ) -> Self { Self { op_pool, @@ -191,6 +202,7 @@ where settings.bundle_priority_fee_overhead_percent, ), settings, + event_sender, } } @@ -233,17 +245,25 @@ where let mut paymasters_to_reject = Vec::

::new(); for (op, simulation) in ops_with_simulations { let Some(simulation) = simulation else { + self.emit(BuilderEvent::RejectedOp { + op_hash: self.op_hash(&op), + reason: OpRejectionReason::FailedRevalidation, + }); rejected_ops.push(op); continue; }; - if simulation + if let Some(&other_sender) = simulation .accessed_addresses .iter() - .any(|&address| address != op.sender && all_sender_addresses.contains(&address)) + .find(|&address| *address != op.sender && all_sender_addresses.contains(address)) { // Exclude ops that access the sender of another op in the // batch, but don't reject them (remove them from pool). info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender); + self.emit(BuilderEvent::SkippedOp { + op_hash: self.op_hash(&op), + reason: SkipReason::AccessedOtherSender { other_sender }, + }); continue; } if let Some(paymaster) = op.paymaster() { @@ -344,6 +364,12 @@ where match handle_ops_out { HandleOpsOut::Success => Ok(Some(gas)), HandleOpsOut::FailedOp(index, message) => { + self.emit(BuilderEvent::RejectedOp { + op_hash: self.op_hash(context.get_op_at(index)?), + reason: OpRejectionReason::FailedInBundle { + message: Arc::new(message.clone()), + }, + }); self.process_failed_op(context, index, message).await?; Ok(None) } @@ -440,6 +466,17 @@ where Ok(()) } + + fn emit(&self, event: BuilderEvent) { + let _ = self.event_sender.send(WithEntryPoint { + entry_point: self.entry_point.address(), + event, + }); + } + + fn op_hash(&self, op: &UserOperation) -> H256 { + op.op_hash(self.entry_point.address(), self.settings.chain_id) + } } #[derive(Clone, Debug)] @@ -1155,6 +1192,7 @@ mod tests { provider .expect_aggregate_signatures() .returning(move |address, _| signatures_by_aggregator[&address]()); + let (event_sender, _) = broadcast::channel(16); let proposer = BundleProposerImpl::new( op_pool_handle.client.clone(), simulator, @@ -1162,12 +1200,14 @@ mod tests { Arc::new(provider), 0, Settings { + chain_id: 0, max_bundle_size, beneficiary, use_bundle_priority_fee: Some(true), priority_fee_mode: PriorityFeeMode::PriorityFeePercent(10), bundle_priority_fee_overhead_percent: 0, }, + event_sender, ); proposer .make_bundle(None) diff --git a/src/builder/emit.rs b/src/builder/emit.rs new file mode 100644 index 000000000..55ee97ccc --- /dev/null +++ b/src/builder/emit.rs @@ -0,0 +1,145 @@ +use std::{fmt::Display, sync::Arc}; + +use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256}; + +use crate::common::{gas::GasFees, strs}; + +#[derive(Clone, Debug)] +pub enum BuilderEvent { + FormedBundle { + /// If `None`, means that the bundle contained no operations and so no + /// transaction was created. + tx_details: Option, + nonce: u64, + fee_increase_count: u64, + required_fees: Option, + }, + TransactionMined { + tx_hash: H256, + nonce: u64, + block_number: u64, + }, + LatestTransactionDropped { + nonce: u64, + }, + NonceUsedForOtherTransaction { + nonce: u64, + }, + SkippedOp { + op_hash: H256, + reason: SkipReason, + }, + RejectedOp { + op_hash: H256, + reason: OpRejectionReason, + }, +} + +#[derive(Clone, Debug)] +pub struct BundleTxDetails { + pub tx_hash: H256, + pub tx: TypedTransaction, + pub op_hashes: Arc>, +} + +#[derive(Clone, Debug)] +pub enum SkipReason { + AccessedOtherSender { other_sender: Address }, +} + +#[derive(Clone, Debug)] +pub enum OpRejectionReason { + FailedRevalidation, + FailedInBundle { message: Arc }, +} + +impl Display for BuilderEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BuilderEvent::FormedBundle { + tx_details, + nonce, + fee_increase_count, + required_fees, + } => { + let required_max_fee_per_gas = + strs::to_string_or(required_fees.map(|fees| fees.max_fee_per_gas), "(default)"); + let required_max_priority_fee_per_gas = strs::to_string_or( + required_fees.map(|fees| fees.max_priority_fee_per_gas), + "(default)", + ); + match tx_details { + Some(tx_details) => { + let op_hashes = tx_details + .op_hashes + .iter() + .map(|hash| format!("{hash:?}")) + .collect::>() + .join(", "); + write!( + f, + concat!( + "Bundle transaction sent!", + " Transaction hash: {:?}", + " Nonce: {}", + " Fee increases: {}", + " Required maxFeePerGas: {}", + " Required maxPriorityFeePerGas: {}", + " Op hashes: {}", + ), + tx_details.tx_hash, + nonce, + fee_increase_count, + required_max_fee_per_gas, + required_max_priority_fee_per_gas, + op_hashes, + ) + } + None => write!( + f, + concat!( + "Bundle was empty.", + " Nonce: {}", + " Fee increases: {}", + " Required maxFeePerGas: {}", + " Required maxPriorityFeePerGas: {}", + ), + nonce, + fee_increase_count, + required_max_fee_per_gas, + required_max_priority_fee_per_gas + ), + } + } + BuilderEvent::TransactionMined { + tx_hash, + nonce, + block_number, + } => write!( + f, + concat!( + "Transaction mined!", + " Transaction hash: {:?}", + " Nonce: {}", + " Block number: {}", + ), + tx_hash, nonce, block_number, + ), + BuilderEvent::LatestTransactionDropped { nonce } => { + write!( + f, + "Latest transaction dropped. Higher fees are needed. Nonce: {nonce}" + ) + } + BuilderEvent::NonceUsedForOtherTransaction { nonce } => { + write!(f, "Transaction failed because nonce was used by another transaction outside of this Rundler. Nonce: {nonce}") + } + BuilderEvent::SkippedOp { op_hash, reason } => { + write!(f, "Op skipped in bundle (but remains in pool). Op hash: {op_hash:?} Reason: {reason:?}") + } + BuilderEvent::RejectedOp { op_hash, reason } => { + write!(f, "Op rejected from bundle and removed from pool. Op hash: {op_hash:?} Reason: {reason:?}") + } + } + } +} diff --git a/src/builder/mod.rs b/src/builder/mod.rs index f53332dd4..d346782c9 100644 --- a/src/builder/mod.rs +++ b/src/builder/mod.rs @@ -1,4 +1,5 @@ mod bundle_proposer; +pub mod emit; mod sender; mod server; mod signer; diff --git a/src/builder/server.rs b/src/builder/server.rs index bf90ef62f..f2a67bee4 100644 --- a/src/builder/server.rs +++ b/src/builder/server.rs @@ -11,16 +11,18 @@ use ethers::{ providers::{Http, Middleware, Provider, RetryClient}, types::{transaction::eip2718::TypedTransaction, Address, H256, U256}, }; -use tokio::{join, time}; +use tokio::{join, sync::broadcast, time}; use tonic::{async_trait, transport::Channel, Request, Response, Status}; use tracing::{debug, error, info, trace, warn}; use crate::{ builder::{ bundle_proposer::BundleProposer, - transaction_tracker::{TrackerUpdate, TransactionTracker}, + emit::{BuilderEvent, BundleTxDetails}, + transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker}, }, common::{ + emit::WithEntryPoint, gas::GasFees, math, protos::{ @@ -64,13 +66,14 @@ where // TODO: Figure out what we really want to do for detecting new blocks. provider: Arc>>, settings: Settings, + event_sender: broadcast::Sender>, } #[derive(Debug)] struct BundleTx { tx: TypedTransaction, expected_storage: ExpectedStorage, - op_count: usize, + op_hashes: Vec, } #[derive(Debug)] @@ -106,6 +109,7 @@ where transaction_tracker: T, provider: Arc>>, settings: Settings, + event_sender: broadcast::Sender>, ) -> Self { Self { is_manual_bundling_mode: AtomicBool::new(false), @@ -118,6 +122,7 @@ where transaction_tracker, provider, settings, + event_sender, } } @@ -186,11 +191,17 @@ where } } TrackerUpdate::StillPendingAfterWait => (), - TrackerUpdate::LatestTxDropped => { + TrackerUpdate::LatestTxDropped { nonce } => { + self.emit(BuilderEvent::LatestTransactionDropped { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_dropped(); info!("Previous transaction dropped by sender"); } - TrackerUpdate::NonceUsedForOtherTx => { + TrackerUpdate::NonceUsedForOtherTx { nonce } => { + self.emit(BuilderEvent::NonceUsedForOtherTransaction { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_nonce_used(); info!("Nonce used by external transaction") } @@ -222,14 +233,20 @@ where let (nonce, mut required_fees) = self.transaction_tracker.get_nonce_and_required_fees()?; BuilderMetrics::set_nonce(nonce); let mut initial_op_count: Option = None; - for num_increases in 0..=self.settings.max_fee_increases { + for fee_increase_count in 0..=self.settings.max_fee_increases { let Some(bundle_tx) = self.get_bundle_tx(nonce, required_fees).await? else { + self.emit(BuilderEvent::FormedBundle { + tx_details: None, + nonce: nonce.low_u64(), + fee_increase_count, + required_fees, + }); return Ok(match initial_op_count { Some(initial_op_count) => { BuilderMetrics::increment_bundle_txns_abandoned(); SendBundleResult::NoOperationsAfterFeeIncreases { initial_op_count, - attempt_number: num_increases, + attempt_number: fee_increase_count, } } None => SendBundleResult::NoOperationsInitially, @@ -238,27 +255,49 @@ where let BundleTx { tx, expected_storage, - op_count, + op_hashes, } = bundle_tx; if initial_op_count.is_none() { - initial_op_count = Some(op_count); + initial_op_count = Some(op_hashes.len()); } let current_fees = GasFees::from(&tx); BuilderMetrics::increment_bundle_txns_sent(); BuilderMetrics::set_current_fees(¤t_fees); - let update = self + let send_result = self .transaction_tracker - .send_transaction_and_wait(tx, &expected_storage) + .send_transaction(tx.clone(), &expected_storage) .await?; + let update = match send_result { + SendResult::TrackerUpdate(update) => update, + SendResult::TxHash(tx_hash) => { + self.emit(BuilderEvent::FormedBundle { + tx_details: Some(BundleTxDetails { + tx_hash, + tx, + op_hashes: Arc::new(op_hashes), + }), + nonce: nonce.low_u64(), + fee_increase_count, + required_fees, + }); + self.transaction_tracker.wait_for_update().await? + } + }; match update { TrackerUpdate::Mined { tx_hash, + nonce, gas_fees: _, block_number, attempt_number, } => { + self.emit(BuilderEvent::TransactionMined { + tx_hash, + nonce: nonce.low_u64(), + block_number, + }); BuilderMetrics::increment_bundle_txns_success(); return Ok(SendBundleResult::Success { block_number, @@ -269,17 +308,23 @@ where TrackerUpdate::StillPendingAfterWait => { info!("Transaction not mined for several blocks") } - TrackerUpdate::LatestTxDropped => { + TrackerUpdate::LatestTxDropped { nonce } => { + self.emit(BuilderEvent::LatestTransactionDropped { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_dropped(); info!("Previous transaction dropped by sender"); } - TrackerUpdate::NonceUsedForOtherTx => { + TrackerUpdate::NonceUsedForOtherTx { nonce } => { + self.emit(BuilderEvent::NonceUsedForOtherTransaction { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_nonce_used(); bail!("nonce used by external transaction") } }; info!( - "Bundle transaction failed to mine after {num_increases} fee increases (maxFeePerGas: {}, maxPriorityFeePerGas: {}).", + "Bundle transaction failed to mine after {fee_increase_count} fee increases (maxFeePerGas: {}, maxPriorityFeePerGas: {}).", current_fees.max_fee_per_gas, current_fees.max_priority_fee_per_gas, ); @@ -356,7 +401,7 @@ where bundle.rejected_entities.len() ); let gas = math::increase_by_percent(bundle.gas_estimate, GAS_ESTIMATE_OVERHEAD_PERCENT); - let op_count = bundle.len(); + let op_hashes: Vec<_> = bundle.iter_ops().map(|op| self.op_hash(op)).collect(); let mut tx = self.entry_point.get_send_bundle_transaction( bundle.ops_per_aggregator, self.beneficiary, @@ -367,7 +412,7 @@ where Ok(Some(BundleTx { tx, expected_storage: bundle.expected_storage, - op_count, + op_hashes, })) } @@ -401,6 +446,13 @@ where fn op_hash(&self, op: &UserOperation) -> H256 { op.op_hash(self.entry_point.address(), self.chain_id) } + + fn emit(&self, event: BuilderEvent) { + let _ = self.event_sender.send(WithEntryPoint { + entry_point: self.entry_point.address(), + event, + }); + } } #[async_trait] diff --git a/src/builder/task.rs b/src/builder/task.rs index 009bd4347..cd7f7ffef 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -7,7 +7,7 @@ use ethers::{ }; use ethers_signers::Signer; use rusoto_core::Region; -use tokio::{select, time}; +use tokio::{select, sync::broadcast, time}; use tokio_util::sync::CancellationToken; use tonic::{ async_trait, @@ -20,6 +20,7 @@ use crate::{ builder::{ self, bundle_proposer::{self, BundleProposerImpl}, + emit::BuilderEvent, sender::get_sender, server::{BuilderImpl, DummyBuilder}, signer::{BundlerSigner, KmsSigner, LocalSigner}, @@ -27,6 +28,7 @@ use crate::{ }, common::{ contracts::i_entry_point::IEntryPoint, + emit::WithEntryPoint, gas::PriorityFeeMode, handle::{SpawnGuard, Task}, mempool::MempoolConfig, @@ -69,6 +71,7 @@ pub struct Args { #[derive(Debug)] pub struct BuilderTask { args: Args, + event_sender: broadcast::Sender>, } #[async_trait] @@ -111,6 +114,7 @@ impl Task for BuilderTask { }; let beneficiary = signer.address(); let proposer_settings = bundle_proposer::Settings { + chain_id: self.args.chain_id, max_bundle_size: self.args.max_bundle_size, beneficiary, use_bundle_priority_fee: self.args.use_bundle_priority_fee, @@ -133,6 +137,7 @@ impl Task for BuilderTask { Arc::clone(&provider), self.args.chain_id, proposer_settings, + self.event_sender.clone(), ); let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; let transaction_sender = get_sender( @@ -166,6 +171,7 @@ impl Task for BuilderTask { transaction_tracker, provider, builder_settings, + self.event_sender.clone(), )); let _builder_loop_guard = { @@ -208,8 +214,11 @@ impl Task for BuilderTask { } impl BuilderTask { - pub fn new(args: Args) -> BuilderTask { - Self { args } + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + ) -> BuilderTask { + Self { args, event_sender } } pub fn boxed(self) -> Box { diff --git a/src/builder/transaction_tracker.rs b/src/builder/transaction_tracker.rs index d96d2a86c..3455c6208 100644 --- a/src/builder/transaction_tracker.rs +++ b/src/builder/transaction_tracker.rs @@ -27,34 +27,52 @@ use crate::{ pub trait TransactionTracker: Send + Sync + 'static { fn get_nonce_and_required_fees(&self) -> anyhow::Result<(U256, Option)>; - async fn check_for_update_now(&self) -> anyhow::Result>; + /// Sends the provided transaction and typically returns its transaction + /// hash, but if the transaction failed to send because another transaction + /// with the same nonce mined first, then returns information about that + /// transaction instead. + async fn send_transaction( + &self, + tx: TypedTransaction, + expected_stroage: &ExpectedStorage, + ) -> anyhow::Result; - /// Sends a transaction then waits until one of the following occurs: + /// Waits until one of the following occurs: /// - /// 1. One of our transactions mines (not necessarily the most recent one). + /// 1. One of our transactions mines (not necessarily the one just sent). /// 2. All our send transactions have dropped. /// 3. Our nonce has changed but none of our transactions mined. This means /// that a transaction from our account other than one of the ones we are /// tracking has mined. This should not normally happen. /// 4. Several new blocks have passed. - async fn send_transaction_and_wait( - &self, - tx: TypedTransaction, - expected_storage: &ExpectedStorage, - ) -> anyhow::Result; + async fn wait_for_update(&self) -> anyhow::Result; + + /// Like `wait_for_update`, except it returns immediately if there is no + /// update rather than waiting for several new blocks. + async fn check_for_update_now(&self) -> anyhow::Result>; +} + +pub enum SendResult { + TxHash(H256), + TrackerUpdate(TrackerUpdate), } #[derive(Debug)] pub enum TrackerUpdate { Mined { tx_hash: H256, + nonce: U256, gas_fees: GasFees, block_number: u64, attempt_number: u64, }, StillPendingAfterWait, - LatestTxDropped, - NonceUsedForOtherTx, + LatestTxDropped { + nonce: U256, + }, + NonceUsedForOtherTx { + nonce: U256, + }, } #[derive(Debug)] @@ -102,18 +120,20 @@ where Ok(self.inner()?.get_nonce_and_required_fees()) } - async fn check_for_update_now(&self) -> anyhow::Result> { - self.inner()?.check_for_update_now().await - } - - async fn send_transaction_and_wait( + async fn send_transaction( &self, tx: TypedTransaction, expected_storage: &ExpectedStorage, - ) -> anyhow::Result { - self.inner()? - .send_transaction_and_wait(tx, expected_storage) - .await + ) -> anyhow::Result { + self.inner()?.send_transaction(tx, expected_storage).await + } + + async fn wait_for_update(&self) -> anyhow::Result { + self.inner()?.wait_for_update().await + } + + async fn check_for_update_now(&self) -> anyhow::Result> { + self.inner()?.check_for_update_now().await } } @@ -169,17 +189,20 @@ where (self.nonce, gas_fees) } - async fn send_transaction_and_wait( + async fn send_transaction( &mut self, tx: TypedTransaction, expected_storage: &ExpectedStorage, - ) -> anyhow::Result { + ) -> anyhow::Result { self.validate_transaction(&tx)?; let gas_fees = GasFees::from(&tx); let send_result = self.sender.send_transaction(tx, expected_storage).await; let sent_tx = match send_result { Ok(sent_tx) => sent_tx, - Err(error) => return self.handle_send_error(error).await, + Err(error) => { + let tracker_update = self.handle_send_error(error).await?; + return Ok(SendResult::TrackerUpdate(tracker_update)); + } }; info!("Sent transaction {:?}", sent_tx.tx_hash); self.transactions.push(PendingTransaction { @@ -189,7 +212,7 @@ where }); self.has_dropped = false; self.attempt_count += 1; - self.wait_for_update_or_new_blocks().await + Ok(SendResult::TxHash(sent_tx.tx_hash)) } /// When we fail to send a transaction, it may be because another @@ -201,12 +224,14 @@ where return Err(error); }; match &update { - TrackerUpdate::Mined { .. } | TrackerUpdate::NonceUsedForOtherTx => Ok(update), - TrackerUpdate::StillPendingAfterWait | TrackerUpdate::LatestTxDropped => Err(error), + TrackerUpdate::Mined { .. } | TrackerUpdate::NonceUsedForOtherTx { .. } => Ok(update), + TrackerUpdate::StillPendingAfterWait | TrackerUpdate::LatestTxDropped { .. } => { + Err(error) + } } } - async fn wait_for_update_or_new_blocks(&mut self) -> anyhow::Result { + async fn wait_for_update(&mut self) -> anyhow::Result { let start_block_number = self .provider .get_block_number() @@ -235,7 +260,7 @@ where if self.nonce < external_nonce { // The nonce has changed. Check to see which of our transactions has // mined, if any. - let mut out = TrackerUpdate::NonceUsedForOtherTx; + let mut out = TrackerUpdate::NonceUsedForOtherTx { nonce: self.nonce }; for tx in self.transactions.iter().rev() { let status = self .sender @@ -245,6 +270,7 @@ where if let TxStatus::Mined { block_number } = status { out = TrackerUpdate::Mined { tx_hash: tx.tx_hash, + nonce: self.nonce, gas_fees: tx.gas_fees, block_number, attempt_number: tx.attempt_number, @@ -274,9 +300,11 @@ where Ok(match status { TxStatus::Pending => None, TxStatus::Mined { block_number } => { - self.set_nonce_and_clear_state(self.nonce + 1); + let nonce = self.nonce; + self.set_nonce_and_clear_state(nonce + 1); Some(TrackerUpdate::Mined { tx_hash: last_tx.tx_hash, + nonce, gas_fees: last_tx.gas_fees, block_number, attempt_number: last_tx.attempt_number, @@ -284,7 +312,7 @@ where } TxStatus::Dropped => { self.has_dropped = true; - Some(TrackerUpdate::LatestTxDropped) + Some(TrackerUpdate::LatestTxDropped { nonce: self.nonce }) } }) } diff --git a/src/cli/builder.rs b/src/cli/builder.rs index 46bf3728c..2463aa58e 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -3,12 +3,16 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Context; use clap::Args; use ethers::types::H256; +use tokio::sync::broadcast; use super::{json::get_json_config, CommonArgs}; use crate::{ - builder::{self, BuilderTask}, + builder::{self, emit::BuilderEvent, BuilderTask}, common::{ - gas::PriorityFeeMode, handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, + emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, + gas::PriorityFeeMode, + handle::spawn_tasks_with_shutdown, + mempool::MempoolConfig, server::format_server_addr, }, }; @@ -230,11 +234,28 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho pool_url, } = builder_args; let task_args = builder_args.to_args(&common_args, pool_url).await?; + let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + + emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event); spawn_tasks_with_shutdown( - [BuilderTask::new(task_args).boxed()], + [BuilderTask::new(task_args, event_sender).boxed()], tokio::signal::ctrl_c(), ) .await; Ok(()) } + +pub fn is_nonspammy_event(event: &WithEntryPoint) -> bool { + if let BuilderEvent::FormedBundle { + tx_details, + fee_increase_count, + .. + } = &event.event + { + if tx_details.is_none() && *fee_increase_count == 0 { + return false; + } + } + true +} diff --git a/src/cli/node.rs b/src/cli/node.rs deleted file mode 100644 index 2d0c807c4..000000000 --- a/src/cli/node.rs +++ /dev/null @@ -1,57 +0,0 @@ -use clap::Args; - -use super::{builder::BuilderArgs, pool::PoolArgs, rpc::RpcArgs, CommonArgs}; -use crate::{ - builder::BuilderTask, - common::{handle, server::format_server_addr}, - op_pool::PoolTask, - rpc::RpcTask, -}; - -#[derive(Debug, Args)] -pub struct NodeCliArgs { - #[command(flatten)] - pool: PoolArgs, - - #[command(flatten)] - builder: BuilderArgs, - - #[command(flatten)] - rpc: RpcArgs, -} - -pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { - let NodeCliArgs { - pool: pool_args, - builder: builder_args, - rpc: rpc_args, - } = bundler_args; - - let pool_url = format_server_addr(&pool_args.host, pool_args.port, false); - let builder_url = builder_args.url(false); - - let pool_task_args = pool_args.to_args(&common_args).await?; - let builder_task_args = builder_args.to_args(&common_args, pool_url.clone()).await?; - let rpc_task_args = rpc_args - .to_args( - &common_args, - pool_url, - builder_url, - (&common_args).try_into()?, - (&common_args).into(), - (&common_args).try_into()?, - ) - .await?; - - handle::spawn_tasks_with_shutdown( - [ - PoolTask::new(pool_task_args).boxed(), - BuilderTask::new(builder_task_args).boxed(), - RpcTask::new(rpc_task_args).boxed(), - ], - tokio::signal::ctrl_c(), - ) - .await; - - Ok(()) -} diff --git a/src/cli/node/events.rs b/src/cli/node/events.rs new file mode 100644 index 000000000..1236a686b --- /dev/null +++ b/src/cli/node/events.rs @@ -0,0 +1,38 @@ +use std::fmt::Display; + +use ethers::types::Address; + +use crate::{builder::emit::BuilderEvent, op_pool::emit::OpPoolEvent}; + +#[derive(Clone, Debug)] +pub enum Event { + OpPoolEvent(OpPoolEvent), + BuilderEvent(BuilderEvent), +} + +#[derive(Clone, Debug)] +pub struct WithEntryPoint { + pub entry_point: Address, + pub event: T, +} + +impl From for Event { + fn from(event: OpPoolEvent) -> Self { + Self::OpPoolEvent(event) + } +} + +impl From for Event { + fn from(event: BuilderEvent) -> Self { + Self::BuilderEvent(event) + } +} + +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Event::OpPoolEvent(event) => event.fmt(f), + Event::BuilderEvent(event) => event.fmt(f), + } + } +} diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs new file mode 100644 index 000000000..c08c1ea5c --- /dev/null +++ b/src/cli/node/mod.rs @@ -0,0 +1,92 @@ +use clap::Args; +use tokio::sync::broadcast; + +use self::events::Event; +use crate::{ + builder::{emit::BuilderEvent, BuilderTask}, + cli::{ + builder::{self, BuilderArgs}, + pool::PoolArgs, + rpc::RpcArgs, + CommonArgs, + }, + common::{ + emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, + handle, + server::format_server_addr, + }, + op_pool::{emit::OpPoolEvent, PoolTask}, + rpc::RpcTask, +}; +mod events; + +#[derive(Debug, Args)] +pub struct NodeCliArgs { + #[command(flatten)] + pool: PoolArgs, + + #[command(flatten)] + builder: BuilderArgs, + + #[command(flatten)] + rpc: RpcArgs, +} + +pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { + let NodeCliArgs { + pool: pool_args, + builder: builder_args, + rpc: rpc_args, + } = bundler_args; + + let pool_url = format_server_addr(&pool_args.host, pool_args.port, false); + let builder_url = builder_args.url(false); + + let pool_task_args = pool_args.to_args(&common_args).await?; + let builder_task_args = builder_args.to_args(&common_args, pool_url.clone()).await?; + let rpc_task_args = rpc_args + .to_args( + &common_args, + pool_url, + builder_url, + (&common_args).try_into()?, + (&common_args).into(), + (&common_args).try_into()?, + ) + .await?; + + let (event_sender, event_rx) = + broadcast::channel::>(EVENT_CHANNEL_CAPACITY); + let (op_pool_event_sender, op_pool_event_rx) = + broadcast::channel::>(EVENT_CHANNEL_CAPACITY); + let (builder_event_sender, builder_event_rx) = + broadcast::channel::>(EVENT_CHANNEL_CAPACITY); + + emit::receive_and_log_events_with_filter(event_rx, |_| true); + emit::receive_events("op pool", op_pool_event_rx, { + let event_sender = event_sender.clone(); + move |event| { + let _ = event_sender.send(WithEntryPoint::of(event)); + } + }); + emit::receive_events("builder", builder_event_rx, { + let event_sender = event_sender.clone(); + move |event| { + if builder::is_nonspammy_event(&event) { + let _ = event_sender.send(WithEntryPoint::of(event)); + } + } + }); + + handle::spawn_tasks_with_shutdown( + [ + PoolTask::new(pool_task_args, op_pool_event_sender).boxed(), + BuilderTask::new(builder_task_args, builder_event_sender).boxed(), + RpcTask::new(rpc_task_args).boxed(), + ], + tokio::signal::ctrl_c(), + ) + .await; + + Ok(()) +} diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 93b1d06df..65a29c387 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -2,11 +2,15 @@ use std::time::Duration; use anyhow::Context; use clap::Args; +use tokio::sync::broadcast; use super::CommonArgs; use crate::{ cli::json::get_json_config, - common::handle::spawn_tasks_with_shutdown, + common::{ + emit::{self, EVENT_CHANNEL_CAPACITY}, + handle::spawn_tasks_with_shutdown, + }, op_pool::{self, PoolConfig, PoolTask}, }; @@ -136,7 +140,14 @@ pub struct PoolCliArgs { pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { let PoolCliArgs { pool: pool_args } = pool_args; let task_args = pool_args.to_args(&common_args).await?; + let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); - spawn_tasks_with_shutdown([PoolTask::new(task_args).boxed()], tokio::signal::ctrl_c()).await; + emit::receive_and_log_events_with_filter(event_rx, |_| true); + + spawn_tasks_with_shutdown( + [PoolTask::new(task_args, event_sender).boxed()], + tokio::signal::ctrl_c(), + ) + .await; Ok(()) } diff --git a/src/common/emit.rs b/src/common/emit.rs new file mode 100644 index 000000000..84a284475 --- /dev/null +++ b/src/common/emit.rs @@ -0,0 +1,72 @@ +use std::fmt::Display; + +use ethers::types::Address; +use tokio::{ + sync::broadcast::{self, error::RecvError}, + task::JoinHandle, +}; +use tracing::{info, warn}; + +// Events should be in the tens of kilobytes at most (they can contain the +// contents of an op or transaction), so allocating tens of megabytes for the +// channel should be fine. +pub const EVENT_CHANNEL_CAPACITY: usize = 1000; + +#[derive(Clone, Debug)] +pub struct WithEntryPoint { + pub entry_point: Address, + pub event: T, +} + +impl WithEntryPoint { + pub fn of>(value: WithEntryPoint) -> Self { + Self { + entry_point: value.entry_point, + event: value.event.into(), + } + } +} + +impl Display for WithEntryPoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} Entrypoint: {:?}", self.event, self.entry_point) + } +} + +pub fn receive_events( + description: &'static str, + mut rx: broadcast::Receiver, + handler: impl Fn(T) + Send + 'static, +) -> JoinHandle<()> +where + T: Clone + Send + 'static, +{ + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => handler(event), + Err(RecvError::Closed) => { + info!("Event stream for {description} closed. Logging complete"); + break; + } + Err(RecvError::Lagged(count)) => { + warn!("Event stream for {description} lagged. Missed {count} messages.") + } + } + } + }) +} + +pub fn receive_and_log_events_with_filter( + rx: broadcast::Receiver, + filter: impl (Fn(&T) -> bool) + Send + 'static, +) -> JoinHandle<()> +where + T: Clone + Display + Send + 'static, +{ + receive_events("logging", rx, move |event| { + if filter(&event) { + info!("{}", event); + } + }) +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 5040e1475..30104d5f6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,7 @@ pub mod context; pub mod contracts; pub mod dev; +pub mod emit; pub mod eth; pub mod gas; pub mod grpc; @@ -11,5 +12,6 @@ pub mod precheck; pub mod protos; pub mod server; pub mod simulation; +pub mod strs; pub mod tracer; pub mod types; diff --git a/src/common/strs.rs b/src/common/strs.rs new file mode 100644 index 000000000..8d4c48a10 --- /dev/null +++ b/src/common/strs.rs @@ -0,0 +1,21 @@ +use std::{borrow::Cow, fmt::Debug}; + +// Formatting helpers + +/// Given an `Option`, converts the contents of the `Option` to a `String`, +/// returning the provided default `&str` if the option is `None`. Returns a +/// `Cow` in order to avoid allocation in the latter case. +pub fn to_string_or(x: Option, default: &str) -> Cow<'_, str> { + x.map(|x| Cow::Owned(x.to_string())) + .unwrap_or(Cow::Borrowed(default)) +} + +/// Like `to_string_or`, but uses debug formatting. +pub fn to_debug_or(x: Option, default: &str) -> Cow<'_, str> { + x.map(|x| Cow::Owned(format!("{x:?}"))) + .unwrap_or(Cow::Borrowed(default)) +} + +pub fn to_string_or_empty(x: Option) -> String { + x.map(|x| x.to_string()).unwrap_or_default() +} diff --git a/src/op_pool/emit.rs b/src/op_pool/emit.rs new file mode 100644 index 000000000..fba1e6101 --- /dev/null +++ b/src/op_pool/emit.rs @@ -0,0 +1,136 @@ +use std::{fmt::Display, sync::Arc}; + +use ethers::types::{Address, H256}; + +use crate::{ + common::{ + strs, + types::{Entity, EntityType, Timestamp, UserOperation}, + }, + op_pool::mempool::OperationOrigin, +}; + +#[derive(Clone, Debug)] +pub enum OpPoolEvent { + ReceivedOp { + op_hash: H256, + op: UserOperation, + block_number: u64, + origin: OperationOrigin, + valid_after: Timestamp, + valid_until: Timestamp, + entities: EntitySummary, + error: Option>, + }, + RemovedOp { + op_hash: H256, + reason: OpRemovalReason, + }, + RemovedEntity { + entity: Entity, + }, +} + +#[derive(Clone, Debug, Default)] +pub struct EntitySummary { + pub sender: EntityStatus, + pub factory: Option, + pub paymaster: Option, + pub aggregator: Option, +} + +#[derive(Clone, Debug, Default)] +pub struct EntityStatus { + pub address: Address, + pub needs_stake: bool, + pub reputation: EntityReputation, +} + +#[derive(Clone, Debug, Default)] +pub enum EntityReputation { + #[default] + Ok, + ThrottledButOk, + ThrottledAndRejected, + Banned, +} + +#[derive(Clone, Debug)] +pub enum OpRemovalReason { + Requsted, + Mined { + block_number: u64, + block_hash: H256, + tx_hash: H256, + }, + ThrottledAndOld { + added_at_block_number: u64, + current_block_number: u64, + }, + EntityRemoved { + entity: Entity, + }, +} + +impl EntitySummary { + pub fn set_status(&mut self, kind: EntityType, status: EntityStatus) { + match kind { + EntityType::Account => self.sender = status, + EntityType::Paymaster => self.paymaster = Some(status), + EntityType::Aggregator => self.aggregator = Some(status), + EntityType::Factory => self.factory = Some(status), + }; + } +} + +impl Display for OpPoolEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OpPoolEvent::ReceivedOp { + op_hash, + entities, + error, + .. + } => { + let intro = strs::to_string_or( + error + .as_ref() + .map(|error| format!("Pool rejected op with error. Error: {error}")), + "Pool accepted op.", + ); + write!( + f, + concat!("{}", " Op hash: {:?}", "{}", "{}", "{}", "{}",), + intro, + op_hash, + format_entity_status("Sender", Some(&entities.sender)), + format_entity_status("Factory", entities.factory.as_ref()), + format_entity_status("Paymaster", entities.paymaster.as_ref()), + format_entity_status("Aggregator", entities.aggregator.as_ref()), + ) + } + OpPoolEvent::RemovedOp { op_hash, reason } => { + write!( + f, + concat!( + "Removed op from pool.", + " Op hash: {:?}", + " Reason: {:?}", + ), + op_hash, reason, + ) + } + OpPoolEvent::RemovedEntity { entity } => { + write!( + f, + concat!("Removed entity from pool.", " Entity: {}",), + entity, + ) + } + } + } +} + +fn format_entity_status(name: &str, status: Option<&EntityStatus>) -> String { + strs::to_string_or_empty(status.map(|status| format!(" {name}: {:?}", status.address))) +} diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index 527f36416..e720cb68f 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -119,7 +119,7 @@ impl PoolOperation { /// Returns true if the operation requires the given entity to stake. /// /// For non-accounts, its possible that the entity is staked, but doesn't - /// ~need~ to take for this operation. For example, if the operation does not + /// _need_ to stake for this operation. For example, if the operation does not /// access any storage slots that require staking. In that case this function /// will return false. /// diff --git a/src/op_pool/mempool/pool.rs b/src/op_pool/mempool/pool.rs index 231866b31..fa20d11b2 100644 --- a/src/op_pool/mempool/pool.rs +++ b/src/op_pool/mempool/pool.rs @@ -166,16 +166,19 @@ impl PoolInner { None } - pub fn remove_entity(&mut self, entity: Entity) { + /// Removes all operations using the given entity, returning the hashes of + /// the removed operations. + pub fn remove_entity(&mut self, entity: Entity) -> Vec { let to_remove = self .by_hash .iter() .filter(|(_, uo)| uo.po.contains_entity(&entity)) .map(|(hash, _)| *hash) .collect::>(); - for hash in to_remove { + for &hash in &to_remove { self.remove_operation_by_hash(hash); } + to_remove } pub fn clear(&mut self) { diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index 60ee461fb..c26806a64 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -14,8 +14,9 @@ use super::{ Mempool, OperationOrigin, PoolConfig, PoolOperation, }; use crate::{ - common::{contracts::i_entry_point::IEntryPointEvents, types::Entity}, + common::{contracts::i_entry_point::IEntryPointEvents, emit::WithEntryPoint, types::Entity}, op_pool::{ + emit::{EntityReputation, EntityStatus, EntitySummary, OpPoolEvent, OpRemovalReason}, event::NewBlockEvent, reputation::{Reputation, ReputationManager, ReputationStatus}, }, @@ -31,8 +32,10 @@ const THROTTLED_OPS_BLOCK_LIMIT: u64 = 10; /// block on write locks. pub struct UoPool { entry_point: Address, + chain_id: u64, reputation: Arc, state: RwLock, + event_sender: broadcast::Sender>, } struct UoPoolState { @@ -45,15 +48,21 @@ impl UoPool where R: ReputationManager, { - pub fn new(args: PoolConfig, reputation: Arc) -> Self { + pub fn new( + args: PoolConfig, + reputation: Arc, + event_sender: broadcast::Sender>, + ) -> Self { Self { entry_point: args.entry_point, + chain_id: args.chain_id, reputation, state: RwLock::new(UoPoolState { pool: PoolInner::new(args), throttled_ops: HashMap::new(), block_number: 0, }), + event_sender, } } @@ -76,6 +85,13 @@ where } } } + + fn emit(&self, event: OpPoolEvent) { + let _ = self.event_sender.send(WithEntryPoint { + entry_point: self.entry_point, + event, + }); + } } impl Mempool for UoPool @@ -124,34 +140,79 @@ where state.block_number = new_block_number; } - fn add_operation(&self, _origin: OperationOrigin, op: PoolOperation) -> MempoolResult { + fn add_operation(&self, origin: OperationOrigin, op: PoolOperation) -> MempoolResult { let mut throttled = false; - for e in op.entities() { - match self.reputation.status(e.address) { - ReputationStatus::Ok => {} + let mut rejected_entity: Option = None; + let mut entity_summary = EntitySummary::default(); + for entity in op.entities() { + let address = entity.address; + let reputation = match self.reputation.status(address) { + ReputationStatus::Ok => EntityReputation::Ok, ReputationStatus::Throttled => { - if self.state.read().pool.address_count(e.address) > 0 { - return Err(MempoolError::EntityThrottled(e)); + if self.state.read().pool.address_count(address) > 0 { + rejected_entity = Some(entity); + EntityReputation::ThrottledAndRejected + } else { + throttled = true; + EntityReputation::ThrottledButOk } - throttled = true; } ReputationStatus::Banned => { - return Err(MempoolError::EntityThrottled(e)); + rejected_entity = Some(entity); + EntityReputation::Banned } + }; + let needs_stake = op.is_staked(entity.kind); + if needs_stake { + self.reputation.add_seen(address); } + entity_summary.set_status( + entity.kind, + EntityStatus { + address, + needs_stake, + reputation, + }, + ); + } - if op.is_staked(e.kind) { - self.reputation.add_seen(e.address); + let emit_event = { + let op_hash = op.uo.op_hash(self.entry_point, self.chain_id); + let valid_after = op.valid_time_range.valid_after; + let valid_until = op.valid_time_range.valid_until; + let op = op.uo.clone(); + move |block_number: u64, error: Option| { + self.emit(OpPoolEvent::ReceivedOp { + op_hash, + op, + block_number, + origin, + valid_after, + valid_until, + entities: entity_summary, + error: error.map(Arc::new), + }) } - } + }; + if let Some(entity) = rejected_entity { + let error = MempoolError::EntityThrottled(entity); + emit_event(self.state.read().block_number, Some(error.to_string())); + return Err(MempoolError::EntityThrottled(entity)); + } let mut state = self.state.write(); - let hash = state.pool.add_operation(op)?; let bn = state.block_number; + let hash = match state.pool.add_operation(op) { + Ok(hash) => hash, + Err(error) => { + emit_event(bn, Some(error.to_string())); + return Err(error); + } + }; if throttled { state.throttled_ops.insert(hash, bn); } - + emit_event(bn, None); Ok(hash) } @@ -172,7 +233,14 @@ where } fn remove_entity(&self, entity: Entity) { - self.state.write().pool.remove_entity(entity); + let removed_op_hashes = self.state.write().pool.remove_entity(entity); + self.emit(OpPoolEvent::RemovedEntity { entity }); + for op_hash in removed_op_hashes { + self.emit(OpPoolEvent::RemovedOp { + op_hash, + reason: OpRemovalReason::EntityRemoved { entity }, + }) + } } fn best_operations(&self, max: usize) -> Vec> { @@ -265,7 +333,8 @@ mod tests { blocklist: None, allowlist: None, }; - UoPool::new(args, mock_reputation()) + let (event_sender, _) = broadcast::channel(4); + UoPool::new(args, mock_reputation(), event_sender) } fn create_op(sender: Address, nonce: usize, max_fee_per_gas: usize) -> PoolOperation { diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 1fa837dce..47862bbad 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -1,3 +1,4 @@ +pub mod emit; mod event; mod mempool; mod reputation; diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index ef5515d50..88f749afa 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -2,23 +2,21 @@ use std::{sync::Arc, time::Duration}; use anyhow::{bail, Context}; use futures::future; -use tokio::{task::JoinHandle, try_join}; +use tokio::{sync::broadcast, task::JoinHandle, try_join}; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Server}; -use super::{ - event::EventProvider, - mempool::{Mempool, PoolConfig}, -}; use crate::{ common::{ + emit::WithEntryPoint, grpc::metrics::GrpcMetricsLayer, handle::{flatten_handle, Task}, protos::op_pool::{op_pool_server::OpPoolServer, OP_POOL_FILE_DESCRIPTOR_SET}, }, op_pool::{ - event::{EventListener, HttpBlockProviderFactory, WsBlockProviderFactory}, - mempool::uo_pool::UoPool, + emit::OpPoolEvent, + event::{EventListener, EventProvider, HttpBlockProviderFactory, WsBlockProviderFactory}, + mempool::{uo_pool::UoPool, Mempool, PoolConfig}, reputation::{HourlyMovingAverageReputation, ReputationParams}, server::OpPoolImpl, }, @@ -38,6 +36,7 @@ pub struct Args { #[derive(Debug)] pub struct PoolTask { args: Args, + event_sender: broadcast::Sender>, } #[async_trait] @@ -73,6 +72,7 @@ impl Task for PoolTask { let (pool, handle) = PoolTask::create_mempool( pool_config, event_provider.as_ref(), + self.event_sender.clone(), shutdown_token.clone(), ) .await @@ -143,8 +143,11 @@ impl Task for PoolTask { } impl PoolTask { - pub fn new(args: Args) -> PoolTask { - Self { args } + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + ) -> PoolTask { + Self { args, event_sender } } pub fn boxed(self) -> Box { @@ -154,6 +157,7 @@ impl PoolTask { async fn create_mempool( pool_config: &PoolConfig, event_provider: &dyn EventProvider, + event_sender: broadcast::Sender>, shutdown_token: CancellationToken, ) -> anyhow::Result<(Arc>, JoinHandle<()>)> { let entry_point = pool_config.entry_point; @@ -168,7 +172,11 @@ impl PoolTask { tokio::spawn(async move { reputation_runner.run().await }); // Mempool - let mp = Arc::new(UoPool::new(pool_config.clone(), Arc::clone(&reputation))); + let mp = Arc::new(UoPool::new( + pool_config.clone(), + Arc::clone(&reputation), + event_sender, + )); // Start mempool let mempool_events = event_provider .subscribe_by_entrypoint(entry_point)