From 69f68145acf374fc037cf45bb10d40d314cc4919 Mon Sep 17 00:00:00 2001 From: David Philipson Date: Thu, 29 Jun 2023 15:30:52 -0700 Subject: [PATCH] feat: Event streams Introduce streams of events from the op pool and builder, which provide detailed debug information and can be consumed from broadcast channels. As an initial application, provide logging based on the content of these streams, which can be used to debug the path a user op takes. To make this possible, several refactors are required: * The input side of the channel is threaded through various structs used throughout, such as the bundle proposer. * A new `emit` submodule is added in each of `cli::node``, `builder`, `op_pool`, and `common`. In each case, this module defines the relevant event types, including `Display` implementations for the default logging. The one in `common` also defines some helper functions for managing event streams. them into log messages (logged via `tracing`). * The transaction tracker now has separate steps for submitting a transaction and then waiting for it, rather than a singule function which does both. This is because we do not know the transaction hash until the transaction is sent, due to last-minute changes made by the transaction sender, and we want to be able to log the send as it occurs. --- src/builder/bundle_proposer.rs | 70 +++++++++++--- src/builder/emit.rs | 145 +++++++++++++++++++++++++++++ src/builder/mod.rs | 1 + src/builder/server.rs | 84 +++++++++++++---- src/builder/task.rs | 15 ++- src/builder/transaction_tracker.rs | 86 +++++++++++------ src/cli/builder.rs | 27 +++++- src/cli/node.rs | 57 ------------ src/cli/node/events.rs | 38 ++++++++ src/cli/node/mod.rs | 92 ++++++++++++++++++ src/cli/pool.rs | 15 ++- src/common/emit.rs | 72 ++++++++++++++ src/common/mod.rs | 2 + src/common/strs.rs | 21 +++++ src/op_pool/emit.rs | 136 +++++++++++++++++++++++++++ src/op_pool/mempool/mod.rs | 2 +- src/op_pool/mempool/pool.rs | 7 +- src/op_pool/mempool/uo_pool.rs | 103 ++++++++++++++++---- src/op_pool/mod.rs | 1 + src/op_pool/task.rs | 28 ++++-- 20 files changed, 847 insertions(+), 155 deletions(-) create mode 100644 src/builder/emit.rs delete mode 100644 src/cli/node.rs create mode 100644 src/cli/node/events.rs create mode 100644 src/cli/node/mod.rs create mode 100644 src/common/emit.rs create mode 100644 src/common/strs.rs create mode 100644 src/op_pool/emit.rs diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs index 5317af922..14f65730e 100644 --- a/src/builder/bundle_proposer.rs +++ b/src/builder/bundle_proposer.rs @@ -12,22 +12,26 @@ use futures::future; use linked_hash_map::LinkedHashMap; #[cfg(test)] use mockall::automock; -use tokio::try_join; +use tokio::{sync::broadcast, try_join}; use tonic::{async_trait, transport::Channel}; use tracing::{error, info}; -use crate::common::{ - contracts::entry_point::UserOpsPerAggregator, - gas::{FeeEstimator, GasFees, PriorityFeeMode}, - math, - protos::{ - self, - op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp}, - }, - simulation::{SimulationError, SimulationSuccess, Simulator}, - types::{ - Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, Timestamp, - UserOperation, +use crate::{ + builder::emit::{BuilderEvent, OpRejectionReason, SkipReason}, + common::{ + contracts::entry_point::UserOpsPerAggregator, + emit::WithEntryPoint, + gas::{FeeEstimator, GasFees, PriorityFeeMode}, + math, + protos::{ + self, + op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp}, + }, + simulation::{SimulationError, SimulationSuccess, Simulator}, + types::{ + Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, + Timestamp, UserOperation, + }, }, }; @@ -57,6 +61,10 @@ impl Bundle { pub fn is_empty(&self) -> bool { self.ops_per_aggregator.is_empty() } + + pub fn iter_ops(&self) -> impl Iterator + '_ { + self.ops_per_aggregator.iter().flat_map(|ops| &ops.user_ops) + } } #[cfg_attr(test, automock)] @@ -79,10 +87,12 @@ where chain_id: u64, settings: Settings, fee_estimator: FeeEstimator

, + event_sender: broadcast::Sender>, } #[derive(Debug)] pub struct Settings { + pub chain_id: u64, pub max_bundle_size: u64, pub beneficiary: Address, pub use_bundle_priority_fee: Option, @@ -176,6 +186,7 @@ where provider: Arc

, chain_id: u64, settings: Settings, + event_sender: broadcast::Sender>, ) -> Self { Self { op_pool, @@ -191,6 +202,7 @@ where settings.bundle_priority_fee_overhead_percent, ), settings, + event_sender, } } @@ -233,17 +245,25 @@ where let mut paymasters_to_reject = Vec::

::new(); for (op, simulation) in ops_with_simulations { let Some(simulation) = simulation else { + self.emit(BuilderEvent::RejectedOp { + op_hash: self.op_hash(&op), + reason: OpRejectionReason::FailedRevalidation, + }); rejected_ops.push(op); continue; }; - if simulation + if let Some(&other_sender) = simulation .accessed_addresses .iter() - .any(|&address| address != op.sender && all_sender_addresses.contains(&address)) + .find(|&address| *address != op.sender && all_sender_addresses.contains(address)) { // Exclude ops that access the sender of another op in the // batch, but don't reject them (remove them from pool). info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender); + self.emit(BuilderEvent::SkippedOp { + op_hash: self.op_hash(&op), + reason: SkipReason::AccessedOtherSender { other_sender }, + }); continue; } if let Some(paymaster) = op.paymaster() { @@ -344,6 +364,12 @@ where match handle_ops_out { HandleOpsOut::Success => Ok(Some(gas)), HandleOpsOut::FailedOp(index, message) => { + self.emit(BuilderEvent::RejectedOp { + op_hash: self.op_hash(context.get_op_at(index)?), + reason: OpRejectionReason::FailedInBundle { + message: Arc::new(message.clone()), + }, + }); self.process_failed_op(context, index, message).await?; Ok(None) } @@ -440,6 +466,17 @@ where Ok(()) } + + fn emit(&self, event: BuilderEvent) { + let _ = self.event_sender.send(WithEntryPoint { + entry_point: self.entry_point.address(), + event, + }); + } + + fn op_hash(&self, op: &UserOperation) -> H256 { + op.op_hash(self.entry_point.address(), self.settings.chain_id) + } } #[derive(Clone, Debug)] @@ -1155,6 +1192,7 @@ mod tests { provider .expect_aggregate_signatures() .returning(move |address, _| signatures_by_aggregator[&address]()); + let (event_sender, _) = broadcast::channel(16); let proposer = BundleProposerImpl::new( op_pool_handle.client.clone(), simulator, @@ -1162,12 +1200,14 @@ mod tests { Arc::new(provider), 0, Settings { + chain_id: 0, max_bundle_size, beneficiary, use_bundle_priority_fee: Some(true), priority_fee_mode: PriorityFeeMode::PriorityFeePercent(10), bundle_priority_fee_overhead_percent: 0, }, + event_sender, ); proposer .make_bundle(None) diff --git a/src/builder/emit.rs b/src/builder/emit.rs new file mode 100644 index 000000000..55ee97ccc --- /dev/null +++ b/src/builder/emit.rs @@ -0,0 +1,145 @@ +use std::{fmt::Display, sync::Arc}; + +use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256}; + +use crate::common::{gas::GasFees, strs}; + +#[derive(Clone, Debug)] +pub enum BuilderEvent { + FormedBundle { + /// If `None`, means that the bundle contained no operations and so no + /// transaction was created. + tx_details: Option, + nonce: u64, + fee_increase_count: u64, + required_fees: Option, + }, + TransactionMined { + tx_hash: H256, + nonce: u64, + block_number: u64, + }, + LatestTransactionDropped { + nonce: u64, + }, + NonceUsedForOtherTransaction { + nonce: u64, + }, + SkippedOp { + op_hash: H256, + reason: SkipReason, + }, + RejectedOp { + op_hash: H256, + reason: OpRejectionReason, + }, +} + +#[derive(Clone, Debug)] +pub struct BundleTxDetails { + pub tx_hash: H256, + pub tx: TypedTransaction, + pub op_hashes: Arc>, +} + +#[derive(Clone, Debug)] +pub enum SkipReason { + AccessedOtherSender { other_sender: Address }, +} + +#[derive(Clone, Debug)] +pub enum OpRejectionReason { + FailedRevalidation, + FailedInBundle { message: Arc }, +} + +impl Display for BuilderEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BuilderEvent::FormedBundle { + tx_details, + nonce, + fee_increase_count, + required_fees, + } => { + let required_max_fee_per_gas = + strs::to_string_or(required_fees.map(|fees| fees.max_fee_per_gas), "(default)"); + let required_max_priority_fee_per_gas = strs::to_string_or( + required_fees.map(|fees| fees.max_priority_fee_per_gas), + "(default)", + ); + match tx_details { + Some(tx_details) => { + let op_hashes = tx_details + .op_hashes + .iter() + .map(|hash| format!("{hash:?}")) + .collect::>() + .join(", "); + write!( + f, + concat!( + "Bundle transaction sent!", + " Transaction hash: {:?}", + " Nonce: {}", + " Fee increases: {}", + " Required maxFeePerGas: {}", + " Required maxPriorityFeePerGas: {}", + " Op hashes: {}", + ), + tx_details.tx_hash, + nonce, + fee_increase_count, + required_max_fee_per_gas, + required_max_priority_fee_per_gas, + op_hashes, + ) + } + None => write!( + f, + concat!( + "Bundle was empty.", + " Nonce: {}", + " Fee increases: {}", + " Required maxFeePerGas: {}", + " Required maxPriorityFeePerGas: {}", + ), + nonce, + fee_increase_count, + required_max_fee_per_gas, + required_max_priority_fee_per_gas + ), + } + } + BuilderEvent::TransactionMined { + tx_hash, + nonce, + block_number, + } => write!( + f, + concat!( + "Transaction mined!", + " Transaction hash: {:?}", + " Nonce: {}", + " Block number: {}", + ), + tx_hash, nonce, block_number, + ), + BuilderEvent::LatestTransactionDropped { nonce } => { + write!( + f, + "Latest transaction dropped. Higher fees are needed. Nonce: {nonce}" + ) + } + BuilderEvent::NonceUsedForOtherTransaction { nonce } => { + write!(f, "Transaction failed because nonce was used by another transaction outside of this Rundler. Nonce: {nonce}") + } + BuilderEvent::SkippedOp { op_hash, reason } => { + write!(f, "Op skipped in bundle (but remains in pool). Op hash: {op_hash:?} Reason: {reason:?}") + } + BuilderEvent::RejectedOp { op_hash, reason } => { + write!(f, "Op rejected from bundle and removed from pool. Op hash: {op_hash:?} Reason: {reason:?}") + } + } + } +} diff --git a/src/builder/mod.rs b/src/builder/mod.rs index f53332dd4..d346782c9 100644 --- a/src/builder/mod.rs +++ b/src/builder/mod.rs @@ -1,4 +1,5 @@ mod bundle_proposer; +pub mod emit; mod sender; mod server; mod signer; diff --git a/src/builder/server.rs b/src/builder/server.rs index bf90ef62f..f2a67bee4 100644 --- a/src/builder/server.rs +++ b/src/builder/server.rs @@ -11,16 +11,18 @@ use ethers::{ providers::{Http, Middleware, Provider, RetryClient}, types::{transaction::eip2718::TypedTransaction, Address, H256, U256}, }; -use tokio::{join, time}; +use tokio::{join, sync::broadcast, time}; use tonic::{async_trait, transport::Channel, Request, Response, Status}; use tracing::{debug, error, info, trace, warn}; use crate::{ builder::{ bundle_proposer::BundleProposer, - transaction_tracker::{TrackerUpdate, TransactionTracker}, + emit::{BuilderEvent, BundleTxDetails}, + transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker}, }, common::{ + emit::WithEntryPoint, gas::GasFees, math, protos::{ @@ -64,13 +66,14 @@ where // TODO: Figure out what we really want to do for detecting new blocks. provider: Arc>>, settings: Settings, + event_sender: broadcast::Sender>, } #[derive(Debug)] struct BundleTx { tx: TypedTransaction, expected_storage: ExpectedStorage, - op_count: usize, + op_hashes: Vec, } #[derive(Debug)] @@ -106,6 +109,7 @@ where transaction_tracker: T, provider: Arc>>, settings: Settings, + event_sender: broadcast::Sender>, ) -> Self { Self { is_manual_bundling_mode: AtomicBool::new(false), @@ -118,6 +122,7 @@ where transaction_tracker, provider, settings, + event_sender, } } @@ -186,11 +191,17 @@ where } } TrackerUpdate::StillPendingAfterWait => (), - TrackerUpdate::LatestTxDropped => { + TrackerUpdate::LatestTxDropped { nonce } => { + self.emit(BuilderEvent::LatestTransactionDropped { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_dropped(); info!("Previous transaction dropped by sender"); } - TrackerUpdate::NonceUsedForOtherTx => { + TrackerUpdate::NonceUsedForOtherTx { nonce } => { + self.emit(BuilderEvent::NonceUsedForOtherTransaction { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_nonce_used(); info!("Nonce used by external transaction") } @@ -222,14 +233,20 @@ where let (nonce, mut required_fees) = self.transaction_tracker.get_nonce_and_required_fees()?; BuilderMetrics::set_nonce(nonce); let mut initial_op_count: Option = None; - for num_increases in 0..=self.settings.max_fee_increases { + for fee_increase_count in 0..=self.settings.max_fee_increases { let Some(bundle_tx) = self.get_bundle_tx(nonce, required_fees).await? else { + self.emit(BuilderEvent::FormedBundle { + tx_details: None, + nonce: nonce.low_u64(), + fee_increase_count, + required_fees, + }); return Ok(match initial_op_count { Some(initial_op_count) => { BuilderMetrics::increment_bundle_txns_abandoned(); SendBundleResult::NoOperationsAfterFeeIncreases { initial_op_count, - attempt_number: num_increases, + attempt_number: fee_increase_count, } } None => SendBundleResult::NoOperationsInitially, @@ -238,27 +255,49 @@ where let BundleTx { tx, expected_storage, - op_count, + op_hashes, } = bundle_tx; if initial_op_count.is_none() { - initial_op_count = Some(op_count); + initial_op_count = Some(op_hashes.len()); } let current_fees = GasFees::from(&tx); BuilderMetrics::increment_bundle_txns_sent(); BuilderMetrics::set_current_fees(¤t_fees); - let update = self + let send_result = self .transaction_tracker - .send_transaction_and_wait(tx, &expected_storage) + .send_transaction(tx.clone(), &expected_storage) .await?; + let update = match send_result { + SendResult::TrackerUpdate(update) => update, + SendResult::TxHash(tx_hash) => { + self.emit(BuilderEvent::FormedBundle { + tx_details: Some(BundleTxDetails { + tx_hash, + tx, + op_hashes: Arc::new(op_hashes), + }), + nonce: nonce.low_u64(), + fee_increase_count, + required_fees, + }); + self.transaction_tracker.wait_for_update().await? + } + }; match update { TrackerUpdate::Mined { tx_hash, + nonce, gas_fees: _, block_number, attempt_number, } => { + self.emit(BuilderEvent::TransactionMined { + tx_hash, + nonce: nonce.low_u64(), + block_number, + }); BuilderMetrics::increment_bundle_txns_success(); return Ok(SendBundleResult::Success { block_number, @@ -269,17 +308,23 @@ where TrackerUpdate::StillPendingAfterWait => { info!("Transaction not mined for several blocks") } - TrackerUpdate::LatestTxDropped => { + TrackerUpdate::LatestTxDropped { nonce } => { + self.emit(BuilderEvent::LatestTransactionDropped { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_dropped(); info!("Previous transaction dropped by sender"); } - TrackerUpdate::NonceUsedForOtherTx => { + TrackerUpdate::NonceUsedForOtherTx { nonce } => { + self.emit(BuilderEvent::NonceUsedForOtherTransaction { + nonce: nonce.low_u64(), + }); BuilderMetrics::increment_bundle_txns_nonce_used(); bail!("nonce used by external transaction") } }; info!( - "Bundle transaction failed to mine after {num_increases} fee increases (maxFeePerGas: {}, maxPriorityFeePerGas: {}).", + "Bundle transaction failed to mine after {fee_increase_count} fee increases (maxFeePerGas: {}, maxPriorityFeePerGas: {}).", current_fees.max_fee_per_gas, current_fees.max_priority_fee_per_gas, ); @@ -356,7 +401,7 @@ where bundle.rejected_entities.len() ); let gas = math::increase_by_percent(bundle.gas_estimate, GAS_ESTIMATE_OVERHEAD_PERCENT); - let op_count = bundle.len(); + let op_hashes: Vec<_> = bundle.iter_ops().map(|op| self.op_hash(op)).collect(); let mut tx = self.entry_point.get_send_bundle_transaction( bundle.ops_per_aggregator, self.beneficiary, @@ -367,7 +412,7 @@ where Ok(Some(BundleTx { tx, expected_storage: bundle.expected_storage, - op_count, + op_hashes, })) } @@ -401,6 +446,13 @@ where fn op_hash(&self, op: &UserOperation) -> H256 { op.op_hash(self.entry_point.address(), self.chain_id) } + + fn emit(&self, event: BuilderEvent) { + let _ = self.event_sender.send(WithEntryPoint { + entry_point: self.entry_point.address(), + event, + }); + } } #[async_trait] diff --git a/src/builder/task.rs b/src/builder/task.rs index 009bd4347..cd7f7ffef 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -7,7 +7,7 @@ use ethers::{ }; use ethers_signers::Signer; use rusoto_core::Region; -use tokio::{select, time}; +use tokio::{select, sync::broadcast, time}; use tokio_util::sync::CancellationToken; use tonic::{ async_trait, @@ -20,6 +20,7 @@ use crate::{ builder::{ self, bundle_proposer::{self, BundleProposerImpl}, + emit::BuilderEvent, sender::get_sender, server::{BuilderImpl, DummyBuilder}, signer::{BundlerSigner, KmsSigner, LocalSigner}, @@ -27,6 +28,7 @@ use crate::{ }, common::{ contracts::i_entry_point::IEntryPoint, + emit::WithEntryPoint, gas::PriorityFeeMode, handle::{SpawnGuard, Task}, mempool::MempoolConfig, @@ -69,6 +71,7 @@ pub struct Args { #[derive(Debug)] pub struct BuilderTask { args: Args, + event_sender: broadcast::Sender>, } #[async_trait] @@ -111,6 +114,7 @@ impl Task for BuilderTask { }; let beneficiary = signer.address(); let proposer_settings = bundle_proposer::Settings { + chain_id: self.args.chain_id, max_bundle_size: self.args.max_bundle_size, beneficiary, use_bundle_priority_fee: self.args.use_bundle_priority_fee, @@ -133,6 +137,7 @@ impl Task for BuilderTask { Arc::clone(&provider), self.args.chain_id, proposer_settings, + self.event_sender.clone(), ); let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; let transaction_sender = get_sender( @@ -166,6 +171,7 @@ impl Task for BuilderTask { transaction_tracker, provider, builder_settings, + self.event_sender.clone(), )); let _builder_loop_guard = { @@ -208,8 +214,11 @@ impl Task for BuilderTask { } impl BuilderTask { - pub fn new(args: Args) -> BuilderTask { - Self { args } + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + ) -> BuilderTask { + Self { args, event_sender } } pub fn boxed(self) -> Box { diff --git a/src/builder/transaction_tracker.rs b/src/builder/transaction_tracker.rs index d96d2a86c..3455c6208 100644 --- a/src/builder/transaction_tracker.rs +++ b/src/builder/transaction_tracker.rs @@ -27,34 +27,52 @@ use crate::{ pub trait TransactionTracker: Send + Sync + 'static { fn get_nonce_and_required_fees(&self) -> anyhow::Result<(U256, Option)>; - async fn check_for_update_now(&self) -> anyhow::Result>; + /// Sends the provided transaction and typically returns its transaction + /// hash, but if the transaction failed to send because another transaction + /// with the same nonce mined first, then returns information about that + /// transaction instead. + async fn send_transaction( + &self, + tx: TypedTransaction, + expected_stroage: &ExpectedStorage, + ) -> anyhow::Result; - /// Sends a transaction then waits until one of the following occurs: + /// Waits until one of the following occurs: /// - /// 1. One of our transactions mines (not necessarily the most recent one). + /// 1. One of our transactions mines (not necessarily the one just sent). /// 2. All our send transactions have dropped. /// 3. Our nonce has changed but none of our transactions mined. This means /// that a transaction from our account other than one of the ones we are /// tracking has mined. This should not normally happen. /// 4. Several new blocks have passed. - async fn send_transaction_and_wait( - &self, - tx: TypedTransaction, - expected_storage: &ExpectedStorage, - ) -> anyhow::Result; + async fn wait_for_update(&self) -> anyhow::Result; + + /// Like `wait_for_update`, except it returns immediately if there is no + /// update rather than waiting for several new blocks. + async fn check_for_update_now(&self) -> anyhow::Result>; +} + +pub enum SendResult { + TxHash(H256), + TrackerUpdate(TrackerUpdate), } #[derive(Debug)] pub enum TrackerUpdate { Mined { tx_hash: H256, + nonce: U256, gas_fees: GasFees, block_number: u64, attempt_number: u64, }, StillPendingAfterWait, - LatestTxDropped, - NonceUsedForOtherTx, + LatestTxDropped { + nonce: U256, + }, + NonceUsedForOtherTx { + nonce: U256, + }, } #[derive(Debug)] @@ -102,18 +120,20 @@ where Ok(self.inner()?.get_nonce_and_required_fees()) } - async fn check_for_update_now(&self) -> anyhow::Result> { - self.inner()?.check_for_update_now().await - } - - async fn send_transaction_and_wait( + async fn send_transaction( &self, tx: TypedTransaction, expected_storage: &ExpectedStorage, - ) -> anyhow::Result { - self.inner()? - .send_transaction_and_wait(tx, expected_storage) - .await + ) -> anyhow::Result { + self.inner()?.send_transaction(tx, expected_storage).await + } + + async fn wait_for_update(&self) -> anyhow::Result { + self.inner()?.wait_for_update().await + } + + async fn check_for_update_now(&self) -> anyhow::Result> { + self.inner()?.check_for_update_now().await } } @@ -169,17 +189,20 @@ where (self.nonce, gas_fees) } - async fn send_transaction_and_wait( + async fn send_transaction( &mut self, tx: TypedTransaction, expected_storage: &ExpectedStorage, - ) -> anyhow::Result { + ) -> anyhow::Result { self.validate_transaction(&tx)?; let gas_fees = GasFees::from(&tx); let send_result = self.sender.send_transaction(tx, expected_storage).await; let sent_tx = match send_result { Ok(sent_tx) => sent_tx, - Err(error) => return self.handle_send_error(error).await, + Err(error) => { + let tracker_update = self.handle_send_error(error).await?; + return Ok(SendResult::TrackerUpdate(tracker_update)); + } }; info!("Sent transaction {:?}", sent_tx.tx_hash); self.transactions.push(PendingTransaction { @@ -189,7 +212,7 @@ where }); self.has_dropped = false; self.attempt_count += 1; - self.wait_for_update_or_new_blocks().await + Ok(SendResult::TxHash(sent_tx.tx_hash)) } /// When we fail to send a transaction, it may be because another @@ -201,12 +224,14 @@ where return Err(error); }; match &update { - TrackerUpdate::Mined { .. } | TrackerUpdate::NonceUsedForOtherTx => Ok(update), - TrackerUpdate::StillPendingAfterWait | TrackerUpdate::LatestTxDropped => Err(error), + TrackerUpdate::Mined { .. } | TrackerUpdate::NonceUsedForOtherTx { .. } => Ok(update), + TrackerUpdate::StillPendingAfterWait | TrackerUpdate::LatestTxDropped { .. } => { + Err(error) + } } } - async fn wait_for_update_or_new_blocks(&mut self) -> anyhow::Result { + async fn wait_for_update(&mut self) -> anyhow::Result { let start_block_number = self .provider .get_block_number() @@ -235,7 +260,7 @@ where if self.nonce < external_nonce { // The nonce has changed. Check to see which of our transactions has // mined, if any. - let mut out = TrackerUpdate::NonceUsedForOtherTx; + let mut out = TrackerUpdate::NonceUsedForOtherTx { nonce: self.nonce }; for tx in self.transactions.iter().rev() { let status = self .sender @@ -245,6 +270,7 @@ where if let TxStatus::Mined { block_number } = status { out = TrackerUpdate::Mined { tx_hash: tx.tx_hash, + nonce: self.nonce, gas_fees: tx.gas_fees, block_number, attempt_number: tx.attempt_number, @@ -274,9 +300,11 @@ where Ok(match status { TxStatus::Pending => None, TxStatus::Mined { block_number } => { - self.set_nonce_and_clear_state(self.nonce + 1); + let nonce = self.nonce; + self.set_nonce_and_clear_state(nonce + 1); Some(TrackerUpdate::Mined { tx_hash: last_tx.tx_hash, + nonce, gas_fees: last_tx.gas_fees, block_number, attempt_number: last_tx.attempt_number, @@ -284,7 +312,7 @@ where } TxStatus::Dropped => { self.has_dropped = true; - Some(TrackerUpdate::LatestTxDropped) + Some(TrackerUpdate::LatestTxDropped { nonce: self.nonce }) } }) } diff --git a/src/cli/builder.rs b/src/cli/builder.rs index 46bf3728c..2463aa58e 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -3,12 +3,16 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Context; use clap::Args; use ethers::types::H256; +use tokio::sync::broadcast; use super::{json::get_json_config, CommonArgs}; use crate::{ - builder::{self, BuilderTask}, + builder::{self, emit::BuilderEvent, BuilderTask}, common::{ - gas::PriorityFeeMode, handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, + emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, + gas::PriorityFeeMode, + handle::spawn_tasks_with_shutdown, + mempool::MempoolConfig, server::format_server_addr, }, }; @@ -230,11 +234,28 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho pool_url, } = builder_args; let task_args = builder_args.to_args(&common_args, pool_url).await?; + let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + + emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event); spawn_tasks_with_shutdown( - [BuilderTask::new(task_args).boxed()], + [BuilderTask::new(task_args, event_sender).boxed()], tokio::signal::ctrl_c(), ) .await; Ok(()) } + +pub fn is_nonspammy_event(event: &WithEntryPoint) -> bool { + if let BuilderEvent::FormedBundle { + tx_details, + fee_increase_count, + .. + } = &event.event + { + if tx_details.is_none() && *fee_increase_count == 0 { + return false; + } + } + true +} diff --git a/src/cli/node.rs b/src/cli/node.rs deleted file mode 100644 index 2d0c807c4..000000000 --- a/src/cli/node.rs +++ /dev/null @@ -1,57 +0,0 @@ -use clap::Args; - -use super::{builder::BuilderArgs, pool::PoolArgs, rpc::RpcArgs, CommonArgs}; -use crate::{ - builder::BuilderTask, - common::{handle, server::format_server_addr}, - op_pool::PoolTask, - rpc::RpcTask, -}; - -#[derive(Debug, Args)] -pub struct NodeCliArgs { - #[command(flatten)] - pool: PoolArgs, - - #[command(flatten)] - builder: BuilderArgs, - - #[command(flatten)] - rpc: RpcArgs, -} - -pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { - let NodeCliArgs { - pool: pool_args, - builder: builder_args, - rpc: rpc_args, - } = bundler_args; - - let pool_url = format_server_addr(&pool_args.host, pool_args.port, false); - let builder_url = builder_args.url(false); - - let pool_task_args = pool_args.to_args(&common_args).await?; - let builder_task_args = builder_args.to_args(&common_args, pool_url.clone()).await?; - let rpc_task_args = rpc_args - .to_args( - &common_args, - pool_url, - builder_url, - (&common_args).try_into()?, - (&common_args).into(), - (&common_args).try_into()?, - ) - .await?; - - handle::spawn_tasks_with_shutdown( - [ - PoolTask::new(pool_task_args).boxed(), - BuilderTask::new(builder_task_args).boxed(), - RpcTask::new(rpc_task_args).boxed(), - ], - tokio::signal::ctrl_c(), - ) - .await; - - Ok(()) -} diff --git a/src/cli/node/events.rs b/src/cli/node/events.rs new file mode 100644 index 000000000..1236a686b --- /dev/null +++ b/src/cli/node/events.rs @@ -0,0 +1,38 @@ +use std::fmt::Display; + +use ethers::types::Address; + +use crate::{builder::emit::BuilderEvent, op_pool::emit::OpPoolEvent}; + +#[derive(Clone, Debug)] +pub enum Event { + OpPoolEvent(OpPoolEvent), + BuilderEvent(BuilderEvent), +} + +#[derive(Clone, Debug)] +pub struct WithEntryPoint { + pub entry_point: Address, + pub event: T, +} + +impl From for Event { + fn from(event: OpPoolEvent) -> Self { + Self::OpPoolEvent(event) + } +} + +impl From for Event { + fn from(event: BuilderEvent) -> Self { + Self::BuilderEvent(event) + } +} + +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Event::OpPoolEvent(event) => event.fmt(f), + Event::BuilderEvent(event) => event.fmt(f), + } + } +} diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs new file mode 100644 index 000000000..c08c1ea5c --- /dev/null +++ b/src/cli/node/mod.rs @@ -0,0 +1,92 @@ +use clap::Args; +use tokio::sync::broadcast; + +use self::events::Event; +use crate::{ + builder::{emit::BuilderEvent, BuilderTask}, + cli::{ + builder::{self, BuilderArgs}, + pool::PoolArgs, + rpc::RpcArgs, + CommonArgs, + }, + common::{ + emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, + handle, + server::format_server_addr, + }, + op_pool::{emit::OpPoolEvent, PoolTask}, + rpc::RpcTask, +}; +mod events; + +#[derive(Debug, Args)] +pub struct NodeCliArgs { + #[command(flatten)] + pool: PoolArgs, + + #[command(flatten)] + builder: BuilderArgs, + + #[command(flatten)] + rpc: RpcArgs, +} + +pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { + let NodeCliArgs { + pool: pool_args, + builder: builder_args, + rpc: rpc_args, + } = bundler_args; + + let pool_url = format_server_addr(&pool_args.host, pool_args.port, false); + let builder_url = builder_args.url(false); + + let pool_task_args = pool_args.to_args(&common_args).await?; + let builder_task_args = builder_args.to_args(&common_args, pool_url.clone()).await?; + let rpc_task_args = rpc_args + .to_args( + &common_args, + pool_url, + builder_url, + (&common_args).try_into()?, + (&common_args).into(), + (&common_args).try_into()?, + ) + .await?; + + let (event_sender, event_rx) = + broadcast::channel::>(EVENT_CHANNEL_CAPACITY); + let (op_pool_event_sender, op_pool_event_rx) = + broadcast::channel::>(EVENT_CHANNEL_CAPACITY); + let (builder_event_sender, builder_event_rx) = + broadcast::channel::>(EVENT_CHANNEL_CAPACITY); + + emit::receive_and_log_events_with_filter(event_rx, |_| true); + emit::receive_events("op pool", op_pool_event_rx, { + let event_sender = event_sender.clone(); + move |event| { + let _ = event_sender.send(WithEntryPoint::of(event)); + } + }); + emit::receive_events("builder", builder_event_rx, { + let event_sender = event_sender.clone(); + move |event| { + if builder::is_nonspammy_event(&event) { + let _ = event_sender.send(WithEntryPoint::of(event)); + } + } + }); + + handle::spawn_tasks_with_shutdown( + [ + PoolTask::new(pool_task_args, op_pool_event_sender).boxed(), + BuilderTask::new(builder_task_args, builder_event_sender).boxed(), + RpcTask::new(rpc_task_args).boxed(), + ], + tokio::signal::ctrl_c(), + ) + .await; + + Ok(()) +} diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 93b1d06df..65a29c387 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -2,11 +2,15 @@ use std::time::Duration; use anyhow::Context; use clap::Args; +use tokio::sync::broadcast; use super::CommonArgs; use crate::{ cli::json::get_json_config, - common::handle::spawn_tasks_with_shutdown, + common::{ + emit::{self, EVENT_CHANNEL_CAPACITY}, + handle::spawn_tasks_with_shutdown, + }, op_pool::{self, PoolConfig, PoolTask}, }; @@ -136,7 +140,14 @@ pub struct PoolCliArgs { pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { let PoolCliArgs { pool: pool_args } = pool_args; let task_args = pool_args.to_args(&common_args).await?; + let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); - spawn_tasks_with_shutdown([PoolTask::new(task_args).boxed()], tokio::signal::ctrl_c()).await; + emit::receive_and_log_events_with_filter(event_rx, |_| true); + + spawn_tasks_with_shutdown( + [PoolTask::new(task_args, event_sender).boxed()], + tokio::signal::ctrl_c(), + ) + .await; Ok(()) } diff --git a/src/common/emit.rs b/src/common/emit.rs new file mode 100644 index 000000000..84a284475 --- /dev/null +++ b/src/common/emit.rs @@ -0,0 +1,72 @@ +use std::fmt::Display; + +use ethers::types::Address; +use tokio::{ + sync::broadcast::{self, error::RecvError}, + task::JoinHandle, +}; +use tracing::{info, warn}; + +// Events should be in the tens of kilobytes at most (they can contain the +// contents of an op or transaction), so allocating tens of megabytes for the +// channel should be fine. +pub const EVENT_CHANNEL_CAPACITY: usize = 1000; + +#[derive(Clone, Debug)] +pub struct WithEntryPoint { + pub entry_point: Address, + pub event: T, +} + +impl WithEntryPoint { + pub fn of>(value: WithEntryPoint) -> Self { + Self { + entry_point: value.entry_point, + event: value.event.into(), + } + } +} + +impl Display for WithEntryPoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} Entrypoint: {:?}", self.event, self.entry_point) + } +} + +pub fn receive_events( + description: &'static str, + mut rx: broadcast::Receiver, + handler: impl Fn(T) + Send + 'static, +) -> JoinHandle<()> +where + T: Clone + Send + 'static, +{ + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => handler(event), + Err(RecvError::Closed) => { + info!("Event stream for {description} closed. Logging complete"); + break; + } + Err(RecvError::Lagged(count)) => { + warn!("Event stream for {description} lagged. Missed {count} messages.") + } + } + } + }) +} + +pub fn receive_and_log_events_with_filter( + rx: broadcast::Receiver, + filter: impl (Fn(&T) -> bool) + Send + 'static, +) -> JoinHandle<()> +where + T: Clone + Display + Send + 'static, +{ + receive_events("logging", rx, move |event| { + if filter(&event) { + info!("{}", event); + } + }) +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 5040e1475..30104d5f6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,7 @@ pub mod context; pub mod contracts; pub mod dev; +pub mod emit; pub mod eth; pub mod gas; pub mod grpc; @@ -11,5 +12,6 @@ pub mod precheck; pub mod protos; pub mod server; pub mod simulation; +pub mod strs; pub mod tracer; pub mod types; diff --git a/src/common/strs.rs b/src/common/strs.rs new file mode 100644 index 000000000..8d4c48a10 --- /dev/null +++ b/src/common/strs.rs @@ -0,0 +1,21 @@ +use std::{borrow::Cow, fmt::Debug}; + +// Formatting helpers + +/// Given an `Option`, converts the contents of the `Option` to a `String`, +/// returning the provided default `&str` if the option is `None`. Returns a +/// `Cow` in order to avoid allocation in the latter case. +pub fn to_string_or(x: Option, default: &str) -> Cow<'_, str> { + x.map(|x| Cow::Owned(x.to_string())) + .unwrap_or(Cow::Borrowed(default)) +} + +/// Like `to_string_or`, but uses debug formatting. +pub fn to_debug_or(x: Option, default: &str) -> Cow<'_, str> { + x.map(|x| Cow::Owned(format!("{x:?}"))) + .unwrap_or(Cow::Borrowed(default)) +} + +pub fn to_string_or_empty(x: Option) -> String { + x.map(|x| x.to_string()).unwrap_or_default() +} diff --git a/src/op_pool/emit.rs b/src/op_pool/emit.rs new file mode 100644 index 000000000..fba1e6101 --- /dev/null +++ b/src/op_pool/emit.rs @@ -0,0 +1,136 @@ +use std::{fmt::Display, sync::Arc}; + +use ethers::types::{Address, H256}; + +use crate::{ + common::{ + strs, + types::{Entity, EntityType, Timestamp, UserOperation}, + }, + op_pool::mempool::OperationOrigin, +}; + +#[derive(Clone, Debug)] +pub enum OpPoolEvent { + ReceivedOp { + op_hash: H256, + op: UserOperation, + block_number: u64, + origin: OperationOrigin, + valid_after: Timestamp, + valid_until: Timestamp, + entities: EntitySummary, + error: Option>, + }, + RemovedOp { + op_hash: H256, + reason: OpRemovalReason, + }, + RemovedEntity { + entity: Entity, + }, +} + +#[derive(Clone, Debug, Default)] +pub struct EntitySummary { + pub sender: EntityStatus, + pub factory: Option, + pub paymaster: Option, + pub aggregator: Option, +} + +#[derive(Clone, Debug, Default)] +pub struct EntityStatus { + pub address: Address, + pub needs_stake: bool, + pub reputation: EntityReputation, +} + +#[derive(Clone, Debug, Default)] +pub enum EntityReputation { + #[default] + Ok, + ThrottledButOk, + ThrottledAndRejected, + Banned, +} + +#[derive(Clone, Debug)] +pub enum OpRemovalReason { + Requsted, + Mined { + block_number: u64, + block_hash: H256, + tx_hash: H256, + }, + ThrottledAndOld { + added_at_block_number: u64, + current_block_number: u64, + }, + EntityRemoved { + entity: Entity, + }, +} + +impl EntitySummary { + pub fn set_status(&mut self, kind: EntityType, status: EntityStatus) { + match kind { + EntityType::Account => self.sender = status, + EntityType::Paymaster => self.paymaster = Some(status), + EntityType::Aggregator => self.aggregator = Some(status), + EntityType::Factory => self.factory = Some(status), + }; + } +} + +impl Display for OpPoolEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OpPoolEvent::ReceivedOp { + op_hash, + entities, + error, + .. + } => { + let intro = strs::to_string_or( + error + .as_ref() + .map(|error| format!("Pool rejected op with error. Error: {error}")), + "Pool accepted op.", + ); + write!( + f, + concat!("{}", " Op hash: {:?}", "{}", "{}", "{}", "{}",), + intro, + op_hash, + format_entity_status("Sender", Some(&entities.sender)), + format_entity_status("Factory", entities.factory.as_ref()), + format_entity_status("Paymaster", entities.paymaster.as_ref()), + format_entity_status("Aggregator", entities.aggregator.as_ref()), + ) + } + OpPoolEvent::RemovedOp { op_hash, reason } => { + write!( + f, + concat!( + "Removed op from pool.", + " Op hash: {:?}", + " Reason: {:?}", + ), + op_hash, reason, + ) + } + OpPoolEvent::RemovedEntity { entity } => { + write!( + f, + concat!("Removed entity from pool.", " Entity: {}",), + entity, + ) + } + } + } +} + +fn format_entity_status(name: &str, status: Option<&EntityStatus>) -> String { + strs::to_string_or_empty(status.map(|status| format!(" {name}: {:?}", status.address))) +} diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index 527f36416..e720cb68f 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -119,7 +119,7 @@ impl PoolOperation { /// Returns true if the operation requires the given entity to stake. /// /// For non-accounts, its possible that the entity is staked, but doesn't - /// ~need~ to take for this operation. For example, if the operation does not + /// _need_ to stake for this operation. For example, if the operation does not /// access any storage slots that require staking. In that case this function /// will return false. /// diff --git a/src/op_pool/mempool/pool.rs b/src/op_pool/mempool/pool.rs index 231866b31..fa20d11b2 100644 --- a/src/op_pool/mempool/pool.rs +++ b/src/op_pool/mempool/pool.rs @@ -166,16 +166,19 @@ impl PoolInner { None } - pub fn remove_entity(&mut self, entity: Entity) { + /// Removes all operations using the given entity, returning the hashes of + /// the removed operations. + pub fn remove_entity(&mut self, entity: Entity) -> Vec { let to_remove = self .by_hash .iter() .filter(|(_, uo)| uo.po.contains_entity(&entity)) .map(|(hash, _)| *hash) .collect::>(); - for hash in to_remove { + for &hash in &to_remove { self.remove_operation_by_hash(hash); } + to_remove } pub fn clear(&mut self) { diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index 60ee461fb..c26806a64 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -14,8 +14,9 @@ use super::{ Mempool, OperationOrigin, PoolConfig, PoolOperation, }; use crate::{ - common::{contracts::i_entry_point::IEntryPointEvents, types::Entity}, + common::{contracts::i_entry_point::IEntryPointEvents, emit::WithEntryPoint, types::Entity}, op_pool::{ + emit::{EntityReputation, EntityStatus, EntitySummary, OpPoolEvent, OpRemovalReason}, event::NewBlockEvent, reputation::{Reputation, ReputationManager, ReputationStatus}, }, @@ -31,8 +32,10 @@ const THROTTLED_OPS_BLOCK_LIMIT: u64 = 10; /// block on write locks. pub struct UoPool { entry_point: Address, + chain_id: u64, reputation: Arc, state: RwLock, + event_sender: broadcast::Sender>, } struct UoPoolState { @@ -45,15 +48,21 @@ impl UoPool where R: ReputationManager, { - pub fn new(args: PoolConfig, reputation: Arc) -> Self { + pub fn new( + args: PoolConfig, + reputation: Arc, + event_sender: broadcast::Sender>, + ) -> Self { Self { entry_point: args.entry_point, + chain_id: args.chain_id, reputation, state: RwLock::new(UoPoolState { pool: PoolInner::new(args), throttled_ops: HashMap::new(), block_number: 0, }), + event_sender, } } @@ -76,6 +85,13 @@ where } } } + + fn emit(&self, event: OpPoolEvent) { + let _ = self.event_sender.send(WithEntryPoint { + entry_point: self.entry_point, + event, + }); + } } impl Mempool for UoPool @@ -124,34 +140,79 @@ where state.block_number = new_block_number; } - fn add_operation(&self, _origin: OperationOrigin, op: PoolOperation) -> MempoolResult { + fn add_operation(&self, origin: OperationOrigin, op: PoolOperation) -> MempoolResult { let mut throttled = false; - for e in op.entities() { - match self.reputation.status(e.address) { - ReputationStatus::Ok => {} + let mut rejected_entity: Option = None; + let mut entity_summary = EntitySummary::default(); + for entity in op.entities() { + let address = entity.address; + let reputation = match self.reputation.status(address) { + ReputationStatus::Ok => EntityReputation::Ok, ReputationStatus::Throttled => { - if self.state.read().pool.address_count(e.address) > 0 { - return Err(MempoolError::EntityThrottled(e)); + if self.state.read().pool.address_count(address) > 0 { + rejected_entity = Some(entity); + EntityReputation::ThrottledAndRejected + } else { + throttled = true; + EntityReputation::ThrottledButOk } - throttled = true; } ReputationStatus::Banned => { - return Err(MempoolError::EntityThrottled(e)); + rejected_entity = Some(entity); + EntityReputation::Banned } + }; + let needs_stake = op.is_staked(entity.kind); + if needs_stake { + self.reputation.add_seen(address); } + entity_summary.set_status( + entity.kind, + EntityStatus { + address, + needs_stake, + reputation, + }, + ); + } - if op.is_staked(e.kind) { - self.reputation.add_seen(e.address); + let emit_event = { + let op_hash = op.uo.op_hash(self.entry_point, self.chain_id); + let valid_after = op.valid_time_range.valid_after; + let valid_until = op.valid_time_range.valid_until; + let op = op.uo.clone(); + move |block_number: u64, error: Option| { + self.emit(OpPoolEvent::ReceivedOp { + op_hash, + op, + block_number, + origin, + valid_after, + valid_until, + entities: entity_summary, + error: error.map(Arc::new), + }) } - } + }; + if let Some(entity) = rejected_entity { + let error = MempoolError::EntityThrottled(entity); + emit_event(self.state.read().block_number, Some(error.to_string())); + return Err(MempoolError::EntityThrottled(entity)); + } let mut state = self.state.write(); - let hash = state.pool.add_operation(op)?; let bn = state.block_number; + let hash = match state.pool.add_operation(op) { + Ok(hash) => hash, + Err(error) => { + emit_event(bn, Some(error.to_string())); + return Err(error); + } + }; if throttled { state.throttled_ops.insert(hash, bn); } - + emit_event(bn, None); Ok(hash) } @@ -172,7 +233,14 @@ where } fn remove_entity(&self, entity: Entity) { - self.state.write().pool.remove_entity(entity); + let removed_op_hashes = self.state.write().pool.remove_entity(entity); + self.emit(OpPoolEvent::RemovedEntity { entity }); + for op_hash in removed_op_hashes { + self.emit(OpPoolEvent::RemovedOp { + op_hash, + reason: OpRemovalReason::EntityRemoved { entity }, + }) + } } fn best_operations(&self, max: usize) -> Vec> { @@ -265,7 +333,8 @@ mod tests { blocklist: None, allowlist: None, }; - UoPool::new(args, mock_reputation()) + let (event_sender, _) = broadcast::channel(4); + UoPool::new(args, mock_reputation(), event_sender) } fn create_op(sender: Address, nonce: usize, max_fee_per_gas: usize) -> PoolOperation { diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 1fa837dce..47862bbad 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -1,3 +1,4 @@ +pub mod emit; mod event; mod mempool; mod reputation; diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index ef5515d50..88f749afa 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -2,23 +2,21 @@ use std::{sync::Arc, time::Duration}; use anyhow::{bail, Context}; use futures::future; -use tokio::{task::JoinHandle, try_join}; +use tokio::{sync::broadcast, task::JoinHandle, try_join}; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Server}; -use super::{ - event::EventProvider, - mempool::{Mempool, PoolConfig}, -}; use crate::{ common::{ + emit::WithEntryPoint, grpc::metrics::GrpcMetricsLayer, handle::{flatten_handle, Task}, protos::op_pool::{op_pool_server::OpPoolServer, OP_POOL_FILE_DESCRIPTOR_SET}, }, op_pool::{ - event::{EventListener, HttpBlockProviderFactory, WsBlockProviderFactory}, - mempool::uo_pool::UoPool, + emit::OpPoolEvent, + event::{EventListener, EventProvider, HttpBlockProviderFactory, WsBlockProviderFactory}, + mempool::{uo_pool::UoPool, Mempool, PoolConfig}, reputation::{HourlyMovingAverageReputation, ReputationParams}, server::OpPoolImpl, }, @@ -38,6 +36,7 @@ pub struct Args { #[derive(Debug)] pub struct PoolTask { args: Args, + event_sender: broadcast::Sender>, } #[async_trait] @@ -73,6 +72,7 @@ impl Task for PoolTask { let (pool, handle) = PoolTask::create_mempool( pool_config, event_provider.as_ref(), + self.event_sender.clone(), shutdown_token.clone(), ) .await @@ -143,8 +143,11 @@ impl Task for PoolTask { } impl PoolTask { - pub fn new(args: Args) -> PoolTask { - Self { args } + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + ) -> PoolTask { + Self { args, event_sender } } pub fn boxed(self) -> Box { @@ -154,6 +157,7 @@ impl PoolTask { async fn create_mempool( pool_config: &PoolConfig, event_provider: &dyn EventProvider, + event_sender: broadcast::Sender>, shutdown_token: CancellationToken, ) -> anyhow::Result<(Arc>, JoinHandle<()>)> { let entry_point = pool_config.entry_point; @@ -168,7 +172,11 @@ impl PoolTask { tokio::spawn(async move { reputation_runner.run().await }); // Mempool - let mp = Arc::new(UoPool::new(pool_config.clone(), Arc::clone(&reputation))); + let mp = Arc::new(UoPool::new( + pool_config.clone(), + Arc::clone(&reputation), + event_sender, + )); // Start mempool let mempool_events = event_provider .subscribe_by_entrypoint(entry_point)