diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs index 222080d43..53c30308a 100644 --- a/src/builder/bundle_proposer.rs +++ b/src/builder/bundle_proposer.rs @@ -117,7 +117,18 @@ where )?; // Limit the amount of gas in the bundle - let ops = self.limit_gas_in_bundle(ops); + tracing::info!( + "Builder id: {}, starting bundle proposal with {} ops", + self.id, + ops.len(), + ); + let (ops, gas_limit) = self.limit_gas_in_bundle(ops); + tracing::info!( + "Builder id: {}, bundle proposal after limit had {} ops and {:?} gas limit", + self.id, + ops.len(), + gas_limit + ); // Determine fees required for ops to be included in a bundle, and filter out ops that don't // meet the requirements. Simulate unfiltered ops. @@ -174,6 +185,12 @@ where expected_storage.merge(&op.simulation.expected_storage)?; } if let Some(gas_estimate) = gas_estimate { + tracing::info!( + "Builder id: {}, bundle proposal succeeded with {} ops and {:?} gas limit", + self.id, + context.iter_ops().count(), + gas_estimate + ); return Ok(Bundle { ops_per_aggregator: context.to_ops_per_aggregator(), gas_estimate, @@ -309,7 +326,7 @@ where }; let max_cost = op.max_gas_cost(); if *balance < max_cost { - info!("Rejected paymaster {paymaster:?} becauase its balance {balance:?} was too low."); + info!("Rejected paymaster {paymaster:?} because its balance {balance:?} was too low."); paymasters_to_reject.push(paymaster); continue; } else { @@ -505,7 +522,7 @@ where Ok(()) } - fn limit_gas_in_bundle(&self, ops: Vec) -> Vec { + fn limit_gas_in_bundle(&self, ops: Vec) -> (Vec, u64) { let mut gas_left = U256::from(self.settings.max_bundle_gas); let mut ops_in_bundle = Vec::new(); for op in ops { @@ -516,7 +533,10 @@ where gas_left -= gas; ops_in_bundle.push(op); } - ops_in_bundle + ( + ops_in_bundle, + self.settings.max_bundle_gas - gas_left.as_u64(), + ) } fn emit(&self, event: BuilderEvent) { diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs index 02bc8aaf2..22a383562 100644 --- a/src/builder/bundle_sender.rs +++ b/src/builder/bundle_sender.rs @@ -26,7 +26,6 @@ use crate::{ block_watcher, emit::WithEntryPoint, gas::GasFees, - math, protos::op_pool::{ self, op_pool_client::OpPoolClient, RemoveEntitiesRequest, RemoveOpsRequest, }, @@ -34,9 +33,6 @@ use crate::{ }, }; -// Overhead on gas estimates to account for inaccuracies. -const GAS_ESTIMATE_OVERHEAD_PERCENT: u64 = 10; - #[derive(Debug)] pub struct Settings { pub replacement_fee_percent_increase: u64, @@ -414,12 +410,11 @@ where bundle.rejected_ops.len(), bundle.rejected_entities.len() ); - let gas = math::increase_by_percent(bundle.gas_estimate, GAS_ESTIMATE_OVERHEAD_PERCENT); let op_hashes: Vec<_> = bundle.iter_ops().map(|op| self.op_hash(op)).collect(); let mut tx = self.entry_point.get_send_bundle_transaction( bundle.ops_per_aggregator, self.beneficiary, - gas, + bundle.gas_estimate, bundle.gas_fees, ); tx.set_nonce(nonce); diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index 790f671ba..e0151f388 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -47,7 +47,7 @@ pub trait Mempool: Send + Sync { /// the specified maximum number of operations. /// /// NOTE: Will only return one operation per sender. - fn best_operations(&self, max: usize) -> Vec>; + fn best_operations(&self, max: usize, builder_id: u64) -> Vec>; /// Returns the all operations from the pool up to a max size fn all_operations(&self, max: usize) -> Vec>; diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index 224b1b3e2..b0156514f 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, }; -use ethers::types::{Address, H256}; +use ethers::types::{Address, H256, U256}; use itertools::Itertools; use parking_lot::RwLock; use tokio::sync::broadcast; @@ -38,6 +38,7 @@ pub struct UoPool { reputation: Arc, state: RwLock, event_sender: broadcast::Sender>, + num_builders: u64, } struct UoPoolState { @@ -54,6 +55,7 @@ where args: PoolConfig, reputation: Arc, event_sender: broadcast::Sender>, + num_builders: u64, ) -> Self { Self { entry_point: args.entry_point, @@ -65,6 +67,7 @@ where block_number: 0, }), event_sender, + num_builders, } } @@ -319,7 +322,7 @@ where UoPoolMetrics::increment_removed_entities(self.entry_point); } - fn best_operations(&self, max: usize) -> Vec> { + fn best_operations(&self, max: usize, builder_id: u64) -> Vec> { // get the best operations from the pool let ordered_ops = self.state.read().pool.best_operations(); // keep track of senders to avoid sending multiple ops from the same sender @@ -327,6 +330,13 @@ where ordered_ops .into_iter() + .filter(|op| { + (self.num_builders == 1) + | (U256::from_little_endian(op.uo.sender.as_bytes()) + .div_mod(self.num_builders.into()) + .1 + == builder_id.into()) + }) .filter(|op| { // filter out ops from senders we've already seen senders.insert(op.uo.sender) @@ -391,9 +401,9 @@ mod tests { let hash = pool .add_operation(OperationOrigin::Local, op.clone()) .unwrap(); - check_ops(pool.best_operations(1), vec![op]); + check_ops(pool.best_operations(1, 0), vec![op]); pool.remove_operations(&vec![hash]); - assert_eq!(pool.best_operations(1), vec![]); + assert_eq!(pool.best_operations(1, 0), vec![]); } #[test] @@ -406,9 +416,9 @@ mod tests { ]; let res = pool.add_operations(OperationOrigin::Local, ops.clone()); let hashes: Vec = res.into_iter().map(|r| r.unwrap()).collect(); - check_ops(pool.best_operations(3), ops); + check_ops(pool.best_operations(3, 0), ops); pool.remove_operations(&hashes); - assert_eq!(pool.best_operations(3), vec![]); + assert_eq!(pool.best_operations(3, 0), vec![]); } #[test] @@ -420,9 +430,9 @@ mod tests { create_op(Address::random(), 0, 1), ]; pool.add_operations(OperationOrigin::Local, ops.clone()); - check_ops(pool.best_operations(3), ops); + check_ops(pool.best_operations(3, 0), ops); pool.clear(); - assert_eq!(pool.best_operations(3), vec![]); + assert_eq!(pool.best_operations(3, 0), vec![]); } #[test] @@ -434,7 +444,7 @@ mod tests { create_op(Address::random(), 0, 1), ]; pool.add_operations(OperationOrigin::Local, ops.clone()); - check_ops(pool.best_operations(3), ops.clone()); + check_ops(pool.best_operations(3, 0), ops.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -450,7 +460,7 @@ mod tests { unmined_ops: vec![], }); - check_ops(pool.best_operations(3), ops[1..].to_vec()); + check_ops(pool.best_operations(3, 0), ops[1..].to_vec()); } #[test] @@ -462,7 +472,7 @@ mod tests { create_op(Address::random(), 0, 1), ]; pool.add_operations(OperationOrigin::Local, ops.clone()); - check_ops(pool.best_operations(3), ops.clone()); + check_ops(pool.best_operations(3, 0), ops.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -477,7 +487,7 @@ mod tests { }], unmined_ops: vec![], }); - check_ops(pool.best_operations(3), ops.clone()[1..].to_vec()); + check_ops(pool.best_operations(3, 0), ops.clone()[1..].to_vec()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -492,7 +502,7 @@ mod tests { nonce: ops[0].uo.nonce, }], }); - check_ops(pool.best_operations(3), ops); + check_ops(pool.best_operations(3, 0), ops); } #[test] @@ -504,7 +514,7 @@ mod tests { create_op(Address::random(), 0, 1), ]; pool.add_operations(OperationOrigin::Local, ops.clone()); - check_ops(pool.best_operations(3), ops.clone()); + check_ops(pool.best_operations(3, 0), ops.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -520,7 +530,7 @@ mod tests { unmined_ops: vec![], }); - check_ops(pool.best_operations(3), ops); + check_ops(pool.best_operations(3, 0), ops); } #[test] @@ -534,7 +544,7 @@ mod tests { ]; pool.add_operations(OperationOrigin::Local, ops.clone()); // Only return 1 op per sender - check_ops(pool.best_operations(3), vec![ops[0].clone()]); + check_ops(pool.best_operations(3, 0), vec![ops[0].clone()]); let rep = pool.dump_reputation(); assert_eq!(rep.len(), 1); @@ -579,7 +589,7 @@ mod tests { // First op should be included pool.add_operation(OperationOrigin::Local, ops[0].clone()) .unwrap(); - check_ops(pool.best_operations(1), vec![ops[0].clone()]); + check_ops(pool.best_operations(1, 0), vec![ops[0].clone()]); // Second op should be thorottled let ret = pool.add_operation(OperationOrigin::Local, ops[1].clone()); @@ -610,7 +620,7 @@ mod tests { // Second op should be included pool.add_operation(OperationOrigin::Local, ops[1].clone()) .unwrap(); - check_ops(pool.best_operations(1), vec![ops[1].clone()]); + check_ops(pool.best_operations(1, 0), vec![ops[1].clone()]); } #[test] @@ -650,6 +660,7 @@ mod tests { args, mock_reputation(THROTTLE_SLACK, BAN_SLACK), event_sender, + 1, ) } diff --git a/src/op_pool/server.rs b/src/op_pool/server.rs index f9ee47d93..cd38a7816 100644 --- a/src/op_pool/server.rs +++ b/src/op_pool/server.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use ethers::{ - types::{Address, H256, U256}, + types::{Address, H256}, utils::to_checksum, }; use prost::Message; @@ -23,19 +23,14 @@ use crate::common::protos::{ pub struct OpPoolImpl { chain_id: u64, mempools: HashMap>, - num_builders: u64, } impl OpPoolImpl where M: Mempool, { - pub fn new(chain_id: u64, mempools: HashMap>, num_builders: u64) -> Self { - Self { - chain_id, - mempools, - num_builders, - } + pub fn new(chain_id: u64, mempools: HashMap>) -> Self { + Self { chain_id, mempools } } fn get_mempool_for_entry_point(&self, req_entry_point: &[u8]) -> Result<&Arc> { @@ -95,19 +90,13 @@ where let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; let ops = mempool - .best_operations(req.max_ops as usize) + .best_operations(req.max_ops as usize, req.builder_id) .iter() - .filter(|op| { - (self.num_builders == 1) - | (U256::from_little_endian(op.uo.sender.as_bytes()) - .div_mod(self.num_builders.into()) - .1 - == req.builder_id.into()) - }) .map(|op| MempoolOp::try_from(&(**op))) .collect::, _>>() .map_err(|e| Status::internal(format!("Failed to convert to proto mempool op: {e}")))?; + tracing::info!("Returning {} ops to builder {}", ops.len(), req.builder_id); Ok(Response::new(GetOpsResponse { ops })) } @@ -354,7 +343,6 @@ mod tests { OpPoolImpl::::new( 1, HashMap::from([(TEST_ADDRESS_ARR.into(), MockMempool::default().into())]), - 1, ) } @@ -395,7 +383,7 @@ mod tests { fn remove_entity(&self, _entity: Entity) {} - fn best_operations(&self, _max: usize) -> Vec> { + fn best_operations(&self, _max: usize, _builder_id: u64) -> Vec> { vec![] } diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 14e46287e..f1d78ebd5 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -71,14 +71,15 @@ impl Task for PoolTask { let mut mempools = Vec::new(); let mut mempool_handles = Vec::new(); for pool_config in &self.args.pool_configs { - let (pool, handle) = PoolTask::create_mempool( - pool_config, - update_sender.subscribe(), - self.event_sender.clone(), - shutdown_token.clone(), - ) - .await - .context("should have created mempool")?; + let (pool, handle) = self + .create_mempool( + pool_config, + update_sender.subscribe(), + self.event_sender.clone(), + shutdown_token.clone(), + ) + .await + .context("should have created mempool")?; mempools.push(pool); mempool_handles.push(handle); @@ -99,11 +100,7 @@ impl Task for PoolTask { }); // gRPC server - let op_pool_server = OpPoolServer::new(OpPoolImpl::new( - chain_id, - mempool_map, - self.args.num_builders, - )); + let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_map)); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) .build()?; @@ -158,6 +155,7 @@ impl PoolTask { } async fn create_mempool( + &self, pool_config: &PoolConfig, update_rx: broadcast::Receiver>, event_sender: broadcast::Sender>, @@ -178,6 +176,7 @@ impl PoolTask { pool_config.clone(), Arc::clone(&reputation), event_sender, + self.args.num_builders, )); let mp_runner = Arc::clone(&mp); let handle =