Skip to content

Commit

Permalink
feat: Event streams
Browse files Browse the repository at this point in the history
Introduce streams of events from the op pool and builder, which provide
detailed debug information and can be consumed from broadcast channels.
As an initial application, provide logging based on the content of these
streams, which can be used to debug the path a user op takes.

To make this possible, several refactors are required:

* The input side of the channel is threaded through various structs used
  throughout, such as the bundle proposer.
* A new `emit` submodule is added in each of `cli::node``, `builder`,
  `op_pool`, and `common`. In each case, this module defines the
  relevant event types, including `Display` implementations for the
  default logging. The one in `common` also defines some helper
  functions for managing event streams.  them into log messages (logged
  via `tracing`).
* The transaction tracker now has separate steps for submitting a
  transaction and then waiting for it, rather than a singule function
  which does both. This is because we do not know the transaction hash
  until the transaction is sent, due to last-minute changes made by the
  transaction sender, and we want to be able to log the send as it
  occurs.
  • Loading branch information
dphilipson committed Jul 26, 2023
1 parent a29ff9f commit 69f6814
Show file tree
Hide file tree
Showing 20 changed files with 847 additions and 155 deletions.
70 changes: 55 additions & 15 deletions src/builder/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,26 @@ use futures::future;
use linked_hash_map::LinkedHashMap;
#[cfg(test)]
use mockall::automock;
use tokio::try_join;
use tokio::{sync::broadcast, try_join};
use tonic::{async_trait, transport::Channel};
use tracing::{error, info};

use crate::common::{
contracts::entry_point::UserOpsPerAggregator,
gas::{FeeEstimator, GasFees, PriorityFeeMode},
math,
protos::{
self,
op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp},
},
simulation::{SimulationError, SimulationSuccess, Simulator},
types::{
Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, Timestamp,
UserOperation,
use crate::{
builder::emit::{BuilderEvent, OpRejectionReason, SkipReason},
common::{
contracts::entry_point::UserOpsPerAggregator,
emit::WithEntryPoint,
gas::{FeeEstimator, GasFees, PriorityFeeMode},
math,
protos::{
self,
op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp},
},
simulation::{SimulationError, SimulationSuccess, Simulator},
types::{
Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike,
Timestamp, UserOperation,
},
},
};

Expand Down Expand Up @@ -57,6 +61,10 @@ impl Bundle {
pub fn is_empty(&self) -> bool {
self.ops_per_aggregator.is_empty()
}

pub fn iter_ops(&self) -> impl Iterator<Item = &UserOperation> + '_ {
self.ops_per_aggregator.iter().flat_map(|ops| &ops.user_ops)
}
}

#[cfg_attr(test, automock)]
Expand All @@ -79,10 +87,12 @@ where
chain_id: u64,
settings: Settings,
fee_estimator: FeeEstimator<P>,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}

#[derive(Debug)]
pub struct Settings {
pub chain_id: u64,
pub max_bundle_size: u64,
pub beneficiary: Address,
pub use_bundle_priority_fee: Option<bool>,
Expand Down Expand Up @@ -176,6 +186,7 @@ where
provider: Arc<P>,
chain_id: u64,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Self {
op_pool,
Expand All @@ -191,6 +202,7 @@ where
settings.bundle_priority_fee_overhead_percent,
),
settings,
event_sender,
}
}

Expand Down Expand Up @@ -233,17 +245,25 @@ where
let mut paymasters_to_reject = Vec::<Address>::new();
for (op, simulation) in ops_with_simulations {
let Some(simulation) = simulation else {
self.emit(BuilderEvent::RejectedOp {
op_hash: self.op_hash(&op),
reason: OpRejectionReason::FailedRevalidation,
});
rejected_ops.push(op);
continue;
};
if simulation
if let Some(&other_sender) = simulation
.accessed_addresses
.iter()
.any(|&address| address != op.sender && all_sender_addresses.contains(&address))
.find(|&address| *address != op.sender && all_sender_addresses.contains(address))
{
// Exclude ops that access the sender of another op in the
// batch, but don't reject them (remove them from pool).
info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender);
self.emit(BuilderEvent::SkippedOp {
op_hash: self.op_hash(&op),
reason: SkipReason::AccessedOtherSender { other_sender },
});
continue;
}
if let Some(paymaster) = op.paymaster() {
Expand Down Expand Up @@ -344,6 +364,12 @@ where
match handle_ops_out {
HandleOpsOut::Success => Ok(Some(gas)),
HandleOpsOut::FailedOp(index, message) => {
self.emit(BuilderEvent::RejectedOp {
op_hash: self.op_hash(context.get_op_at(index)?),
reason: OpRejectionReason::FailedInBundle {
message: Arc::new(message.clone()),
},
});
self.process_failed_op(context, index, message).await?;
Ok(None)
}
Expand Down Expand Up @@ -440,6 +466,17 @@ where

Ok(())
}

fn emit(&self, event: BuilderEvent) {
let _ = self.event_sender.send(WithEntryPoint {
entry_point: self.entry_point.address(),
event,
});
}

fn op_hash(&self, op: &UserOperation) -> H256 {
op.op_hash(self.entry_point.address(), self.settings.chain_id)
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -1155,19 +1192,22 @@ mod tests {
provider
.expect_aggregate_signatures()
.returning(move |address, _| signatures_by_aggregator[&address]());
let (event_sender, _) = broadcast::channel(16);
let proposer = BundleProposerImpl::new(
op_pool_handle.client.clone(),
simulator,
entry_point,
Arc::new(provider),
0,
Settings {
chain_id: 0,
max_bundle_size,
beneficiary,
use_bundle_priority_fee: Some(true),
priority_fee_mode: PriorityFeeMode::PriorityFeePercent(10),
bundle_priority_fee_overhead_percent: 0,
},
event_sender,
);
proposer
.make_bundle(None)
Expand Down
145 changes: 145 additions & 0 deletions src/builder/emit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::{fmt::Display, sync::Arc};

use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256};

use crate::common::{gas::GasFees, strs};

#[derive(Clone, Debug)]
pub enum BuilderEvent {
FormedBundle {
/// If `None`, means that the bundle contained no operations and so no
/// transaction was created.
tx_details: Option<BundleTxDetails>,
nonce: u64,
fee_increase_count: u64,
required_fees: Option<GasFees>,
},
TransactionMined {
tx_hash: H256,
nonce: u64,
block_number: u64,
},
LatestTransactionDropped {
nonce: u64,
},
NonceUsedForOtherTransaction {
nonce: u64,
},
SkippedOp {
op_hash: H256,
reason: SkipReason,
},
RejectedOp {
op_hash: H256,
reason: OpRejectionReason,
},
}

#[derive(Clone, Debug)]
pub struct BundleTxDetails {
pub tx_hash: H256,
pub tx: TypedTransaction,
pub op_hashes: Arc<Vec<H256>>,
}

#[derive(Clone, Debug)]
pub enum SkipReason {
AccessedOtherSender { other_sender: Address },
}

#[derive(Clone, Debug)]
pub enum OpRejectionReason {
FailedRevalidation,
FailedInBundle { message: Arc<String> },
}

impl Display for BuilderEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BuilderEvent::FormedBundle {
tx_details,
nonce,
fee_increase_count,
required_fees,
} => {
let required_max_fee_per_gas =
strs::to_string_or(required_fees.map(|fees| fees.max_fee_per_gas), "(default)");
let required_max_priority_fee_per_gas = strs::to_string_or(
required_fees.map(|fees| fees.max_priority_fee_per_gas),
"(default)",
);
match tx_details {
Some(tx_details) => {
let op_hashes = tx_details
.op_hashes
.iter()
.map(|hash| format!("{hash:?}"))
.collect::<Vec<_>>()
.join(", ");
write!(
f,
concat!(
"Bundle transaction sent!",
" Transaction hash: {:?}",
" Nonce: {}",
" Fee increases: {}",
" Required maxFeePerGas: {}",
" Required maxPriorityFeePerGas: {}",
" Op hashes: {}",
),
tx_details.tx_hash,
nonce,
fee_increase_count,
required_max_fee_per_gas,
required_max_priority_fee_per_gas,
op_hashes,
)
}
None => write!(
f,
concat!(
"Bundle was empty.",
" Nonce: {}",
" Fee increases: {}",
" Required maxFeePerGas: {}",
" Required maxPriorityFeePerGas: {}",
),
nonce,
fee_increase_count,
required_max_fee_per_gas,
required_max_priority_fee_per_gas
),
}
}
BuilderEvent::TransactionMined {
tx_hash,
nonce,
block_number,
} => write!(
f,
concat!(
"Transaction mined!",
" Transaction hash: {:?}",
" Nonce: {}",
" Block number: {}",
),
tx_hash, nonce, block_number,
),
BuilderEvent::LatestTransactionDropped { nonce } => {
write!(
f,
"Latest transaction dropped. Higher fees are needed. Nonce: {nonce}"
)
}
BuilderEvent::NonceUsedForOtherTransaction { nonce } => {
write!(f, "Transaction failed because nonce was used by another transaction outside of this Rundler. Nonce: {nonce}")
}
BuilderEvent::SkippedOp { op_hash, reason } => {
write!(f, "Op skipped in bundle (but remains in pool). Op hash: {op_hash:?} Reason: {reason:?}")
}
BuilderEvent::RejectedOp { op_hash, reason } => {
write!(f, "Op rejected from bundle and removed from pool. Op hash: {op_hash:?} Reason: {reason:?}")
}
}
}
}
1 change: 1 addition & 0 deletions src/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod bundle_proposer;
pub mod emit;
mod sender;
mod server;
mod signer;
Expand Down
Loading

0 comments on commit 69f6814

Please sign in to comment.