From 045ff2e2e7645d44889b6ccc9ad1e3ed6f31e040 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Tue, 19 Sep 2023 17:09:18 -0400 Subject: [PATCH] feat: implement pool sharding and n-builders --- Makefile | 12 +- bin/rundler/src/cli/builder.rs | 18 ++- bin/rundler/src/cli/mod.rs | 8 ++ bin/rundler/src/cli/pool.rs | 2 + crates/builder/src/bundle_proposer.rs | 95 +++++++++---- crates/builder/src/bundle_sender.rs | 114 ++++++++-------- crates/builder/src/emit.rs | 118 ++++++++++++++-- crates/builder/src/lib.rs | 2 +- crates/builder/src/server/local.rs | 14 +- crates/builder/src/task.rs | 170 ++++++++++++++---------- crates/pool/proto/op_pool/op_pool.proto | 2 + crates/pool/src/mempool/mod.rs | 25 +++- crates/pool/src/mempool/uo_pool.rs | 66 +++++---- crates/pool/src/server/local.rs | 22 ++- crates/pool/src/server/mod.rs | 7 +- crates/pool/src/server/remote/client.rs | 8 +- crates/pool/src/server/remote/server.rs | 6 +- 17 files changed, 480 insertions(+), 209 deletions(-) diff --git a/Makefile b/Makefile index 641cec4c3..6b3339083 100644 --- a/Makefile +++ b/Makefile @@ -12,10 +12,20 @@ build: ## Build the project. clean: ## Clean the project. cargo clean +## Run all tests. .PHONY: test -test: test-unit ## Run all tests. +test: test-unit test-spec-integrated test-spec-modular .PHONY: test-unit test-unit: ## Run unit tests. cargo install cargo-nextest --locked cargo nextest run $(UNIT_TEST_ARGS) + +.PHONY: test-spec-integrated +test-spec-integrated: ## Run spec tests in integrated mode + test/spec-tests/local/run-spec-tests.sh + +.PHONY: test-spec-modular +test-spec-modular: ## Run spec tests in modular mode + test/spec-tests/remote/run-spec-tests.sh + diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 8c10397dd..21d0c22a8 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -3,7 +3,9 @@ use std::{collections::HashMap, net::SocketAddr, time::Duration}; use anyhow::Context; use clap::Args; use ethers::types::H256; -use rundler_builder::{self, BuilderEvent, BuilderTask, BuilderTaskArgs, LocalBuilderBuilder}; +use rundler_builder::{ + self, BuilderEvent, BuilderEventKind, BuilderTask, BuilderTaskArgs, LocalBuilderBuilder, +}; use rundler_pool::RemotePoolClient; use rundler_sim::{MempoolConfig, PriorityFeeMode}; use rundler_task::{ @@ -142,6 +144,14 @@ pub struct BuilderArgs { env = "BUILDER_BLOXROUTE_AUTH_HEADER" )] bloxroute_auth_header: Option, + /// The index offset to apply to the builder index + #[arg( + long = "builder_index_offset", + name = "builder_index_offset", + env = "BUILDER_INDEX_OFFSET", + default_value = "0" + )] + pub builder_index_offset: u64, } impl BuilderArgs { @@ -202,6 +212,8 @@ impl BuilderArgs { max_fee_increases: self.max_fee_increases, remote_address, bloxroute_auth_header: self.bloxroute_auth_header.clone(), + num_bundle_builders: common.num_builders, + bundle_builder_index_offset: self.builder_index_offset, }) } } @@ -261,11 +273,11 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho } pub fn is_nonspammy_event(event: &WithEntryPoint) -> bool { - if let BuilderEvent::FormedBundle { + if let BuilderEventKind::FormedBundle { tx_details, fee_increase_count, .. - } = &event.event + } = &event.event.kind { if tx_details.is_none() && *fee_increase_count == 0 { return false; diff --git a/bin/rundler/src/cli/mod.rs b/bin/rundler/src/cli/mod.rs index 02689688f..80059b7f3 100644 --- a/bin/rundler/src/cli/mod.rs +++ b/bin/rundler/src/cli/mod.rs @@ -215,6 +215,14 @@ pub struct CommonArgs { env = "MEMPOOL_CONFIG_PATH" )] pub mempool_config_path: Option, + + #[arg( + long = "num_builders", + name = "num_builders", + env = "NUM_BUILDERS", + default_value = "1" + )] + pub num_builders: u64, } const SIMULATION_GAS_OVERHEAD: u64 = 100_000; diff --git a/bin/rundler/src/cli/pool.rs b/bin/rundler/src/cli/pool.rs index 5758e0bb7..50dd88734 100644 --- a/bin/rundler/src/cli/pool.rs +++ b/bin/rundler/src/cli/pool.rs @@ -114,6 +114,8 @@ impl PoolArgs { Ok(PoolConfig { entry_point, chain_id: common.chain_id, + // Currently use the same shard count as the number of builders + num_shards: common.num_builders, max_userops_per_sender: self.max_userops_per_sender, min_replacement_fee_increase_percentage: self .min_replacement_fee_increase_percentage, diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index cbaf0db0e..573d10575 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -73,6 +73,7 @@ where P: Provider, C: PoolServer, { + builder_index: u64, pool: C, simulator: S, entry_point: E, @@ -109,7 +110,18 @@ where )?; // Limit the amount of gas in the bundle - let ops = self.limit_gas_in_bundle(ops); + tracing::debug!( + "Builder index: {}, starting bundle proposal with {} ops", + self.builder_index, + ops.len(), + ); + let (ops, gas_limit) = self.limit_gas_in_bundle(ops); + tracing::debug!( + "Builder index: {}, bundle proposal after limit had {} ops and {:?} gas limit", + self.builder_index, + ops.len(), + gas_limit + ); // Determine fees required for ops to be included in a bundle, and filter out ops that don't // meet the requirements. Simulate unfiltered ops. @@ -122,16 +134,17 @@ where { true } else { - self.emit(BuilderEvent::SkippedOp { - op_hash: self.op_hash(&op.uo), - reason: SkipReason::InsufficientFees { + self.emit(BuilderEvent::skipped_op( + self.builder_index, + self.op_hash(&op.uo), + SkipReason::InsufficientFees { required_fees: required_op_fees, actual_fees: GasFees { max_fee_per_gas: op.uo.max_fee_per_gas, max_priority_fee_per_gas: op.uo.max_priority_fee_per_gas, }, }, - }); + )); false } }) @@ -164,6 +177,12 @@ where expected_storage.merge(&op.simulation.expected_storage)?; } if let Some(gas_estimate) = gas_estimate { + tracing::debug!( + "Builder index: {}, bundle proposal succeeded with {} ops and {:?} gas limit", + self.builder_index, + context.iter_ops().count(), + gas_estimate + ); return Ok(Bundle { ops_per_aggregator: context.to_ops_per_aggregator(), gas_estimate, @@ -192,6 +211,7 @@ where C: PoolServer, { pub(crate) fn new( + builder_index: u64, pool: C, simulator: S, entry_point: E, @@ -200,6 +220,7 @@ where event_sender: broadcast::Sender>, ) -> Self { Self { + builder_index, pool, simulator, entry_point, @@ -250,10 +271,11 @@ where let simulation = match simulation { Ok(simulation) => simulation, Err(error) => { - self.emit(BuilderEvent::RejectedOp { - op_hash: self.op_hash(&op), - reason: OpRejectionReason::FailedRevalidation { error }, - }); + self.emit(BuilderEvent::rejected_op( + self.builder_index, + self.op_hash(&op), + OpRejectionReason::FailedRevalidation { error }, + )); rejected_ops.push(op); continue; } @@ -264,12 +286,13 @@ where .valid_time_range .contains(Timestamp::now(), TIME_RANGE_BUFFER) { - self.emit(BuilderEvent::SkippedOp { - op_hash: self.op_hash(&op), - reason: SkipReason::InvalidTimeRange { + self.emit(BuilderEvent::skipped_op( + self.builder_index, + self.op_hash(&op), + SkipReason::InvalidTimeRange { valid_range: simulation.valid_time_range, }, - }); + )); rejected_ops.push(op); continue; } @@ -282,10 +305,11 @@ where // Exclude ops that access the sender of another op in the // batch, but don't reject them (remove them from pool). info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender); - self.emit(BuilderEvent::SkippedOp { - op_hash: self.op_hash(&op), - reason: SkipReason::AccessedOtherSender { other_sender }, - }); + self.emit(BuilderEvent::skipped_op( + self.builder_index, + self.op_hash(&op), + SkipReason::AccessedOtherSender { other_sender }, + )); continue; } if let Some(paymaster) = op.paymaster() { @@ -386,12 +410,13 @@ where match handle_ops_out { HandleOpsOut::Success => Ok(Some(gas)), HandleOpsOut::FailedOp(index, message) => { - self.emit(BuilderEvent::RejectedOp { - op_hash: self.op_hash(context.get_op_at(index)?), - reason: OpRejectionReason::FailedInBundle { + self.emit(BuilderEvent::rejected_op( + self.builder_index, + self.op_hash(context.get_op_at(index)?), + OpRejectionReason::FailedInBundle { message: Arc::new(message.clone()), }, - }); + )); self.process_failed_op(context, index, message).await?; Ok(None) } @@ -405,8 +430,17 @@ where } async fn get_ops_from_pool(&self) -> anyhow::Result> { + // Use builder's index as the shard index to ensure that two builders don't + // attempt to bundle the same operations. + // + // NOTE: this assumes that the pool server has as many shards as there + // are builders. self.pool - .get_ops(self.entry_point.address(), self.settings.max_bundle_size) + .get_ops( + self.entry_point.address(), + self.settings.max_bundle_size, + self.builder_index, + ) .await .context("should get ops from pool") } @@ -483,18 +517,28 @@ where Ok(()) } - fn limit_gas_in_bundle(&self, ops: Vec) -> Vec { + fn limit_gas_in_bundle(&self, ops: Vec) -> (Vec, u64) { let mut gas_left = U256::from(self.settings.max_bundle_gas); let mut ops_in_bundle = Vec::new(); for op in ops { let gas = gas::user_operation_execution_gas_limit(&op.uo, self.settings.chain_id); if gas_left < gas { + self.emit(BuilderEvent::skipped_op( + self.builder_index, + self.op_hash(&op.uo), + SkipReason::GasLimit, + )); continue; } gas_left -= gas; ops_in_bundle.push(op); } - ops_in_bundle + ( + ops_in_bundle, + self.settings + .max_bundle_gas + .saturating_sub(gas_left.as_u64()), + ) } fn emit(&self, event: BuilderEvent) { @@ -1191,7 +1235,7 @@ mod tests { let mut pool_client = MockPoolServer::new(); pool_client .expect_get_ops() - .returning(move |_, _| Ok(ops.clone())); + .returning(move |_, _, _| Ok(ops.clone())); let simulations_by_op: HashMap<_, _> = mock_ops .into_iter() @@ -1241,6 +1285,7 @@ mod tests { .returning(move |address, _| signatures_by_aggregator[&address]()); let (event_sender, _) = broadcast::channel(16); let proposer = BundleProposerImpl::new( + 0, pool_client, simulator, entry_point, diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 5335de2ee..b803d0be0 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -14,7 +14,7 @@ use rundler_pool::PoolServer; use rundler_provider::EntryPoint; use rundler_sim::ExpectedStorage; use rundler_types::{Entity, GasFees, UserOperation}; -use rundler_utils::{emit::WithEntryPoint, math}; +use rundler_utils::emit::WithEntryPoint; use tokio::{ join, sync::{broadcast, mpsc, oneshot}, @@ -28,9 +28,6 @@ use crate::{ transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker}, }; -// Overhead on gas estimates to account for inaccuracies. -const GAS_ESTIMATE_OVERHEAD_PERCENT: u64 = 10; - #[async_trait] pub(crate) trait BundleSender: Send + Sync + 'static { async fn send_bundles_in_loop(&mut self); @@ -50,6 +47,7 @@ where T: TransactionTracker, C: PoolServer, { + builder_index: u64, manual_bundling_mode: Arc, send_bundle_receiver: mpsc::Receiver, chain_id: u64, @@ -188,7 +186,7 @@ where } => info!("Bundle initially had {initial_op_count} operations, but after increasing gas fees {attempt_number} time(s) it was empty"), SendBundleResult::StalledAtMaxFeeIncreases => warn!("Bundle failed to mine after {} fee increases", self.settings.max_fee_increases), SendBundleResult::Error(error) => { - BuilderMetrics::increment_bundle_txns_failed(); + BuilderMetrics::increment_bundle_txns_failed(self.builder_index); error!("Failed to send bundle. Will retry next block: {error:#?}"); } } @@ -211,6 +209,7 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( + builder_index: u64, manual_bundling_mode: Arc, send_bundle_receiver: mpsc::Receiver, chain_id: u64, @@ -224,6 +223,7 @@ where event_sender: broadcast::Sender>, ) -> Self { Self { + builder_index, manual_bundling_mode, send_bundle_receiver, chain_id, @@ -257,7 +257,7 @@ where attempt_number, .. } => { - BuilderMetrics::increment_bundle_txns_success(); + BuilderMetrics::increment_bundle_txns_success(self.builder_index); if attempt_number == 0 { info!("Bundle with hash {tx_hash:?} landed in block {block_number}"); } else { @@ -266,17 +266,19 @@ where } TrackerUpdate::StillPendingAfterWait => (), TrackerUpdate::LatestTxDropped { nonce } => { - self.emit(BuilderEvent::LatestTransactionDropped { - nonce: nonce.low_u64(), - }); - BuilderMetrics::increment_bundle_txns_dropped(); + self.emit(BuilderEvent::latest_transaction_dropped( + self.builder_index, + nonce.low_u64(), + )); + BuilderMetrics::increment_bundle_txns_dropped(self.builder_index); info!("Previous transaction dropped by sender"); } TrackerUpdate::NonceUsedForOtherTx { nonce } => { - self.emit(BuilderEvent::NonceUsedForOtherTransaction { - nonce: nonce.low_u64(), - }); - BuilderMetrics::increment_bundle_txns_nonce_used(); + self.emit(BuilderEvent::nonce_used_for_other_transaction( + self.builder_index, + nonce.low_u64(), + )); + BuilderMetrics::increment_bundle_txns_nonce_used(self.builder_index); info!("Nonce used by external transaction") } }; @@ -308,15 +310,16 @@ where let mut initial_op_count: Option = None; for fee_increase_count in 0..=self.settings.max_fee_increases { let Some(bundle_tx) = self.get_bundle_tx(nonce, required_fees).await? else { - self.emit(BuilderEvent::FormedBundle { - tx_details: None, - nonce: nonce.low_u64(), + self.emit(BuilderEvent::formed_bundle( + self.builder_index, + None, + nonce.low_u64(), fee_increase_count, required_fees, - }); + )); return Ok(match initial_op_count { Some(initial_op_count) => { - BuilderMetrics::increment_bundle_txns_abandoned(); + BuilderMetrics::increment_bundle_txns_abandoned(self.builder_index); SendBundleResult::NoOperationsAfterFeeIncreases { initial_op_count, attempt_number: fee_increase_count, @@ -335,7 +338,7 @@ where } let current_fees = GasFees::from(&tx); - BuilderMetrics::increment_bundle_txns_sent(); + BuilderMetrics::increment_bundle_txns_sent(self.builder_index); BuilderMetrics::set_current_fees(¤t_fees); let send_result = self @@ -345,16 +348,17 @@ where let update = match send_result { SendResult::TrackerUpdate(update) => update, SendResult::TxHash(tx_hash) => { - self.emit(BuilderEvent::FormedBundle { - tx_details: Some(BundleTxDetails { + self.emit(BuilderEvent::formed_bundle( + self.builder_index, + Some(BundleTxDetails { tx_hash, tx, op_hashes: Arc::new(op_hashes), }), - nonce: nonce.low_u64(), + nonce.low_u64(), fee_increase_count, required_fees, - }); + )); self.transaction_tracker.wait_for_update().await? } }; @@ -365,12 +369,13 @@ where block_number, attempt_number, } => { - self.emit(BuilderEvent::TransactionMined { + self.emit(BuilderEvent::transaction_mined( + self.builder_index, tx_hash, - nonce: nonce.low_u64(), + nonce.low_u64(), block_number, - }); - BuilderMetrics::increment_bundle_txns_success(); + )); + BuilderMetrics::increment_bundle_txns_success(self.builder_index); return Ok(SendBundleResult::Success { block_number, attempt_number, @@ -381,17 +386,19 @@ where info!("Transaction not mined for several blocks") } TrackerUpdate::LatestTxDropped { nonce } => { - self.emit(BuilderEvent::LatestTransactionDropped { - nonce: nonce.low_u64(), - }); - BuilderMetrics::increment_bundle_txns_dropped(); + self.emit(BuilderEvent::latest_transaction_dropped( + self.builder_index, + nonce.low_u64(), + )); + BuilderMetrics::increment_bundle_txns_dropped(self.builder_index); info!("Previous transaction dropped by sender"); } TrackerUpdate::NonceUsedForOtherTx { nonce } => { - self.emit(BuilderEvent::NonceUsedForOtherTransaction { - nonce: nonce.low_u64(), - }); - BuilderMetrics::increment_bundle_txns_nonce_used(); + self.emit(BuilderEvent::nonce_used_for_other_transaction( + self.builder_index, + nonce.low_u64(), + )); + BuilderMetrics::increment_bundle_txns_nonce_used(self.builder_index); bail!("nonce used by external transaction") } }; @@ -400,12 +407,12 @@ where current_fees.max_fee_per_gas, current_fees.max_priority_fee_per_gas, ); - BuilderMetrics::increment_bundle_txn_fee_increases(); + BuilderMetrics::increment_bundle_txn_fee_increases(self.builder_index); required_fees = Some( current_fees.increase_by_percent(self.settings.replacement_fee_percent_increase), ); } - BuilderMetrics::increment_bundle_txns_abandoned(); + BuilderMetrics::increment_bundle_txns_abandoned(self.builder_index); Ok(SendBundleResult::StalledAtMaxFeeIncreases) } @@ -452,12 +459,11 @@ where bundle.rejected_ops.len(), bundle.rejected_entities.len() ); - let gas = math::increase_by_percent(bundle.gas_estimate, GAS_ESTIMATE_OVERHEAD_PERCENT); let op_hashes: Vec<_> = bundle.iter_ops().map(|op| self.op_hash(op)).collect(); let mut tx = self.entry_point.get_send_bundle_transaction( bundle.ops_per_aggregator, self.beneficiary, - gas, + bundle.gas_estimate, bundle.gas_fees, ); tx.set_nonce(nonce); @@ -502,34 +508,34 @@ where struct BuilderMetrics {} impl BuilderMetrics { - fn increment_bundle_txns_sent() { - metrics::increment_counter!("builder_bundle_txns_sent"); + fn increment_bundle_txns_sent(builder_index: u64) { + metrics::increment_counter!("builder_bundle_txns_sent", "builder_index" => builder_index.to_string()); } - fn increment_bundle_txns_success() { - metrics::increment_counter!("builder_bundle_txns_success"); + fn increment_bundle_txns_success(builder_index: u64) { + metrics::increment_counter!("builder_bundle_txns_success", "builder_index" => builder_index.to_string()); } - fn increment_bundle_txns_dropped() { - metrics::increment_counter!("builder_bundle_txns_dropped"); + fn increment_bundle_txns_dropped(builder_index: u64) { + metrics::increment_counter!("builder_bundle_txns_dropped", "builder_index" => builder_index.to_string()); } // used when we decide to stop trying a transaction - fn increment_bundle_txns_abandoned() { - metrics::increment_counter!("builder_bundle_txns_abandoned"); + fn increment_bundle_txns_abandoned(builder_index: u64) { + metrics::increment_counter!("builder_bundle_txns_abandoned", "builder_index" => builder_index.to_string()); } // used when sending a transaction fails - fn increment_bundle_txns_failed() { - metrics::increment_counter!("builder_bundle_txns_failed"); + fn increment_bundle_txns_failed(builder_index: u64) { + metrics::increment_counter!("builder_bundle_txns_failed", "builder_index" => builder_index.to_string()); } - fn increment_bundle_txns_nonce_used() { - metrics::increment_counter!("builder_bundle_txns_nonce_used"); + fn increment_bundle_txns_nonce_used(builder_index: u64) { + metrics::increment_counter!("builder_bundle_txns_nonce_used", "builder_index" => builder_index.to_string()); } - fn increment_bundle_txn_fee_increases() { - metrics::increment_counter!("builder_bundle_fee_increases"); + fn increment_bundle_txn_fee_increases(builder_index: u64) { + metrics::increment_counter!("builder_bundle_fee_increases", "builder_index" => builder_index.to_string()); } fn set_current_fees(fees: &GasFees) { diff --git a/crates/builder/src/emit.rs b/crates/builder/src/emit.rs index 85ee41fd6..6d416f980 100644 --- a/crates/builder/src/emit.rs +++ b/crates/builder/src/emit.rs @@ -7,7 +7,91 @@ use rundler_utils::strs; /// Builder event #[derive(Clone, Debug)] -pub enum BuilderEvent { +pub struct BuilderEvent { + /// Builder index that emitted the event + pub builder_index: u64, + /// Event kind + pub kind: BuilderEventKind, +} + +impl BuilderEvent { + pub(crate) fn new(builder_index: u64, kind: BuilderEventKind) -> Self { + Self { + builder_index, + kind, + } + } + + pub(crate) fn formed_bundle( + builder_index: u64, + tx_details: Option, + nonce: u64, + fee_increase_count: u64, + required_fees: Option, + ) -> Self { + Self::new( + builder_index, + BuilderEventKind::FormedBundle { + tx_details, + nonce, + fee_increase_count, + required_fees, + }, + ) + } + + pub(crate) fn transaction_mined( + builder_index: u64, + tx_hash: H256, + nonce: u64, + block_number: u64, + ) -> Self { + Self::new( + builder_index, + BuilderEventKind::TransactionMined { + tx_hash, + nonce, + block_number, + }, + ) + } + + pub(crate) fn latest_transaction_dropped(builder_index: u64, nonce: u64) -> Self { + Self::new( + builder_index, + BuilderEventKind::LatestTransactionDropped { nonce }, + ) + } + + pub(crate) fn nonce_used_for_other_transaction(builder_index: u64, nonce: u64) -> Self { + Self::new( + builder_index, + BuilderEventKind::NonceUsedForOtherTransaction { nonce }, + ) + } + + pub(crate) fn skipped_op(builder_index: u64, op_hash: H256, reason: SkipReason) -> Self { + Self::new( + builder_index, + BuilderEventKind::SkippedOp { op_hash, reason }, + ) + } + + pub(crate) fn rejected_op( + builder_index: u64, + op_hash: H256, + reason: OpRejectionReason, + ) -> Self { + Self::new( + builder_index, + BuilderEventKind::RejectedOp { op_hash, reason }, + ) + } +} + +/// BuilderEventKind +#[derive(Clone, Debug)] +pub enum BuilderEventKind { /// A bundle was formed FormedBundle { /// Details of the transaction that was sent @@ -79,6 +163,8 @@ pub enum SkipReason { required_fees: GasFees, actual_fees: GasFees, }, + /// Bundle ran out of space by gas limit to include the operation + GasLimit, } /// Reason for rejecting an operation from a bundle @@ -92,8 +178,8 @@ pub enum OpRejectionReason { impl Display for BuilderEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - BuilderEvent::FormedBundle { + match &self.kind { + BuilderEventKind::FormedBundle { tx_details, nonce, fee_increase_count, @@ -117,6 +203,7 @@ impl Display for BuilderEvent { f, concat!( "Bundle transaction sent!", + " Builder index: {:?}", " Transaction hash: {:?}", " Nonce: {}", " Fee increases: {}", @@ -124,6 +211,7 @@ impl Display for BuilderEvent { " Required maxPriorityFeePerGas: {}", " Op hashes: {}", ), + self.builder_index, tx_details.tx_hash, nonce, fee_increase_count, @@ -136,11 +224,13 @@ impl Display for BuilderEvent { f, concat!( "Bundle was empty.", + " Builder index: {:?}", " Nonce: {}", " Fee increases: {}", " Required maxFeePerGas: {}", " Required maxPriorityFeePerGas: {}", ), + self.builder_index, nonce, fee_increase_count, required_max_fee_per_gas, @@ -148,7 +238,7 @@ impl Display for BuilderEvent { ), } } - BuilderEvent::TransactionMined { + BuilderEventKind::TransactionMined { tx_hash, nonce, block_number, @@ -156,26 +246,28 @@ impl Display for BuilderEvent { f, concat!( "Transaction mined!", + " Builder index: {:?}", " Transaction hash: {:?}", " Nonce: {}", " Block number: {}", ), - tx_hash, nonce, block_number, + self.builder_index, tx_hash, nonce, block_number, ), - BuilderEvent::LatestTransactionDropped { nonce } => { + BuilderEventKind::LatestTransactionDropped { nonce } => { write!( f, - "Latest transaction dropped. Higher fees are needed. Nonce: {nonce}" + "Latest transaction dropped. Higher fees are needed. Builder index: {:?} Nonce: {nonce}", + self.builder_index ) } - BuilderEvent::NonceUsedForOtherTransaction { nonce } => { - write!(f, "Transaction failed because nonce was used by another transaction outside of this Rundler. Nonce: {nonce}") + BuilderEventKind::NonceUsedForOtherTransaction { nonce } => { + write!(f, "Transaction failed because nonce was used by another transaction outside of this Rundler. Builder index: {:?} Nonce: {nonce}", self.builder_index) } - BuilderEvent::SkippedOp { op_hash, reason } => { - write!(f, "Op skipped in bundle (but remains in pool). Op hash: {op_hash:?} Reason: {reason:?}") + BuilderEventKind::SkippedOp { op_hash, reason } => { + write!(f, "Op skipped in bundle (but remains in pool). Builder index: {:?} Op hash: {op_hash:?} Reason: {reason:?}", self.builder_index) } - BuilderEvent::RejectedOp { op_hash, reason } => { - write!(f, "Op rejected from bundle and removed from pool. Op hash: {op_hash:?} Reason: {reason:?}") + BuilderEventKind::RejectedOp { op_hash, reason } => { + write!(f, "Op rejected from bundle and removed from pool. Builder index: {:?} Op hash: {op_hash:?} Reason: {reason:?}", self.builder_index) } } } diff --git a/crates/builder/src/lib.rs b/crates/builder/src/lib.rs index 9bbc22467..9bb110bf0 100644 --- a/crates/builder/src/lib.rs +++ b/crates/builder/src/lib.rs @@ -10,7 +10,7 @@ mod bundle_proposer; mod bundle_sender; mod emit; -pub use emit::BuilderEvent; +pub use emit::{BuilderEvent, BuilderEventKind}; mod sender; diff --git a/crates/builder/src/server/local.rs b/crates/builder/src/server/local.rs index 792134f47..a75ab92bd 100644 --- a/crates/builder/src/server/local.rs +++ b/crates/builder/src/server/local.rs @@ -45,14 +45,14 @@ impl LocalBuilderBuilder { pub fn run( self, manual_bundling_mode: Arc, - send_bundle_requester: mpsc::Sender, + send_bundle_requesters: Vec>, entry_points: Vec
, shutdown_token: CancellationToken, ) -> JoinHandle> { let mut runner = LocalBuilderServerRunner::new( self.req_receiver, manual_bundling_mode, - send_bundle_requester, + send_bundle_requesters, entry_points, ); tokio::spawn(async move { runner.run(shutdown_token).await }) @@ -67,7 +67,7 @@ pub struct LocalBuilderHandle { struct LocalBuilderServerRunner { req_receiver: mpsc::Receiver, - send_bundle_requester: mpsc::Sender, + send_bundle_requesters: Vec>, manual_bundling_mode: Arc, entry_points: Vec
, } @@ -138,13 +138,13 @@ impl LocalBuilderServerRunner { fn new( req_receiver: mpsc::Receiver, manual_bundling_mode: Arc, - send_bundle_requester: mpsc::Sender, + send_bundle_requesters: Vec>, entry_points: Vec
, ) -> Self { Self { req_receiver, manual_bundling_mode, - send_bundle_requester, + send_bundle_requesters, entry_points, } } @@ -166,10 +166,12 @@ impl LocalBuilderServerRunner { ServerRequestKind::DebugSendBundleNow => { if !self.manual_bundling_mode.load(Ordering::Relaxed) { break 'a Err(anyhow::anyhow!("bundling mode is not manual").into()) + } else if self.send_bundle_requesters.len() != 1 { + break 'a Err(anyhow::anyhow!("more than 1 bundle builder not supported in debug mode").into()) } let (tx, rx) = oneshot::channel(); - match self.send_bundle_requester.send(SendBundleRequest{ + match self.send_bundle_requesters[0].send(SendBundleRequest{ responder: tx }).await { Ok(()) => {}, diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index cbc79c195..71d83583b 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -7,7 +7,10 @@ use std::{ use anyhow::{bail, Context}; use async_trait::async_trait; -use ethers::types::{Address, H256}; +use ethers::{ + providers::{JsonRpcClient, Provider}, + types::{Address, H256}, +}; use ethers_signers::Signer; use rundler_pool::PoolServer; use rundler_sim::{ @@ -30,7 +33,7 @@ use tracing::info; use crate::{ bundle_proposer::{self, BundleProposerImpl}, - bundle_sender::{self, BundleSender, BundleSenderImpl}, + bundle_sender::{self, BundleSender, BundleSenderImpl, SendBundleRequest}, emit::BuilderEvent, sender::get_sender, server::{spawn_remote_builder_server, LocalBuilderBuilder}, @@ -94,6 +97,10 @@ pub struct Args { /// /// Checked ~after~ checking for conditional sender or Flashbots sender. pub bloxroute_auth_header: Option, + /// Number of bundle builders to start + pub num_bundle_builders: u64, + /// Index offset for bundle builders + pub bundle_builder_index_offset: u64, } /// Builder task @@ -114,6 +121,93 @@ where tracing::info!("Mempool config: {:?}", self.args.mempool_configs); let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; + let manual_bundling_mode = Arc::new(AtomicBool::new(false)); + + let mut spawn_guards = vec![]; + let mut send_bundle_txs = vec![]; + for i in 0..self.args.num_bundle_builders { + let (spawn_guard, send_bundle_tx) = self + .create_bundle_builder( + i + self.args.bundle_builder_index_offset, + Arc::clone(&manual_bundling_mode), + Arc::clone(&provider), + ) + .await?; + spawn_guards.push(spawn_guard); + send_bundle_txs.push(send_bundle_tx); + } + + let builder_handle = self.builder_builder.get_handle(); + let builder_runnder_handle = self.builder_builder.run( + manual_bundling_mode, + send_bundle_txs, + vec![self.args.entry_point_address], + shutdown_token.clone(), + ); + + let remote_handle = match self.args.remote_address { + Some(addr) => { + spawn_remote_builder_server( + addr, + self.args.chain_id, + builder_handle, + shutdown_token, + ) + .await? + } + None => tokio::spawn(async { Ok(()) }), + }; + + info!("Started bundle builder"); + + match try_join!( + handle::flatten_handle(builder_runnder_handle), + handle::flatten_handle(remote_handle), + ) { + Ok(_) => { + tracing::info!("Builder server shutdown"); + Ok(()) + } + Err(e) => { + tracing::error!("Builder server error: {e:?}"); + bail!("Builder server error: {e:?}") + } + } + } +} + +impl

BuilderTask

+where + P: PoolServer + Clone, +{ + /// Create a new builder task + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + builder_builder: LocalBuilderBuilder, + pool: P, + ) -> Self { + Self { + args, + event_sender, + builder_builder, + pool, + } + } + + /// Convert this task into a boxed task + pub fn boxed(self) -> Box { + Box::new(self) + } + + async fn create_bundle_builder( + &self, + index: u64, + manual_bundling_mode: Arc, + provider: Arc>, + ) -> anyhow::Result<(SpawnGuard, mpsc::Sender)> { + let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); + let signer = if let Some(pk) = &self.args.private_key { info!("Using local signer"); BundlerSigner::Local( @@ -193,10 +287,9 @@ where replacement_fee_percent_increase: self.args.replacement_fee_percent_increase, max_fee_increases: self.args.max_fee_increases, }; - let manual_bundling_mode = Arc::new(AtomicBool::new(false)); - let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); let proposer = BundleProposerImpl::new( + index, self.pool.clone(), simulator, entry_point.clone(), @@ -205,6 +298,7 @@ where self.event_sender.clone(), ); let mut builder = BundleSenderImpl::new( + index, manual_bundling_mode.clone(), send_bundle_rx, self.args.chain_id, @@ -213,74 +307,14 @@ where proposer, entry_point, transaction_tracker, - self.pool, + self.pool.clone(), builder_settings, self.event_sender.clone(), ); - let _builder_loop_guard = - { SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) }; - - let builder_handle = self.builder_builder.get_handle(); - let builder_runnder_handle = self.builder_builder.run( - manual_bundling_mode, + Ok(( + SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }), send_bundle_tx, - vec![self.args.entry_point_address], - shutdown_token.clone(), - ); - - let remote_handle = match self.args.remote_address { - Some(addr) => { - spawn_remote_builder_server( - addr, - self.args.chain_id, - builder_handle, - shutdown_token, - ) - .await? - } - None => tokio::spawn(async { Ok(()) }), - }; - - info!("Started bundle builder"); - - match try_join!( - handle::flatten_handle(builder_runnder_handle), - handle::flatten_handle(remote_handle), - ) { - Ok(_) => { - tracing::info!("Builder server shutdown"); - Ok(()) - } - Err(e) => { - tracing::error!("Builder server error: {e:?}"); - bail!("Builder server error: {e:?}") - } - } - } -} - -impl

BuilderTask

-where - P: PoolServer + Clone, -{ - /// Create a new builder task - pub fn new( - args: Args, - event_sender: broadcast::Sender>, - builder_builder: LocalBuilderBuilder, - pool: P, - ) -> Self { - Self { - args, - event_sender, - builder_builder, - pool, - } - } - - /// Convert this task into a boxed task - pub fn boxed(self) -> Box { - Box::new(self) + )) } } diff --git a/crates/pool/proto/op_pool/op_pool.proto b/crates/pool/proto/op_pool/op_pool.proto index 1e29ea935..3ec7d2ed8 100644 --- a/crates/pool/proto/op_pool/op_pool.proto +++ b/crates/pool/proto/op_pool/op_pool.proto @@ -153,6 +153,8 @@ message GetOpsRequest { bytes entry_point = 1; // The maximum number of UserOperations to return uint64 max_ops = 2; + // The mempool shard num retrieve UserOperations from + uint64 shard_index = 3; } message GetOpsResponse { oneof result { diff --git a/crates/pool/src/mempool/mod.rs b/crates/pool/src/mempool/mod.rs index 0a66223db..259c62473 100644 --- a/crates/pool/src/mempool/mod.rs +++ b/crates/pool/src/mempool/mod.rs @@ -47,16 +47,22 @@ pub trait Mempool: Send + Sync + 'static { /// Removes a set of operations from the pool. fn remove_operations(&self, hashes: &[H256]); - /// Removes all operations assocaited with a given entity from the pool. + /// Removes all operations associated with a given entity from the pool. fn remove_entity(&self, entity: Entity); /// Returns the best operations from the pool. /// /// Returns the best operations from the pool based on their gas bids up to - /// the specified maximum number of operations. + /// the specified maximum number of operations, limiting to one per sender. /// - /// NOTE: Will only return one operation per sender. - fn best_operations(&self, max: usize) -> Vec>; + /// The `shard_index` is used to divide the mempool into disjoint shards to ensure + /// that two bundle builders don't attempt to but bundle the same operations. If + /// the supplied `shard_index` does not exist, the call will error. + fn best_operations( + &self, + max: usize, + shard_index: u64, + ) -> MempoolResult>>; /// Returns the all operations from the pool up to a max size fn all_operations(&self, max: usize) -> Vec>; @@ -89,7 +95,7 @@ pub struct PoolConfig { pub max_size_of_pool_bytes: usize, /// Operations that are always banned from the mempool pub blocklist: Option>, - /// Operations that are allways allowed in the mempool, regardless of reputation + /// Operations that are always allowed in the mempool, regardless of reputation pub allowlist: Option>, /// Settings for precheck validation pub precheck_settings: PrecheckSettings, @@ -97,6 +103,11 @@ pub struct PoolConfig { pub sim_settings: SimulationSettings, /// Configuration for the mempool channels, by channel ID pub mempool_channel_configs: HashMap, + /// Number of mempool shards to use. A mempool shard is a disjoint subset of the mempool + /// that is used to ensure that two bundle builders don't attempt to but bundle the same + /// operations. The mempool is divided into shards by taking the hash of the operation + /// and modding it by the number of shards. + pub num_shards: u64, } /// Origin of an operation. @@ -156,7 +167,7 @@ impl PoolOperation { } } - /// Returns an iterator over all entities that are included in this opearation. + /// Returns an iterator over all entities that are included in this operation. pub fn entities(&'_ self) -> impl Iterator + '_ { EntityType::iter().filter_map(|entity| { self.entity_address(entity) @@ -164,7 +175,7 @@ impl PoolOperation { }) } - /// Returns an iterator over all staked entities that are included in this opearation. + /// Returns an iterator over all staked entities that are included in this operation. pub fn staked_entities(&'_ self) -> impl Iterator + '_ { EntityType::iter() .filter(|entity| self.is_staked(*entity)) diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index a41a13168..d40637762 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, }; -use ethers::types::{Address, H256}; +use ethers::types::{Address, H256, U256}; use itertools::Itertools; use parking_lot::RwLock; use rundler_sim::{Prechecker, Simulator}; @@ -35,6 +35,7 @@ const THROTTLED_OPS_BLOCK_LIMIT: u64 = 10; pub(crate) struct UoPool { entry_point: Address, chain_id: u64, + num_shards: u64, reputation: Arc, state: RwLock, event_sender: broadcast::Sender>, @@ -64,6 +65,7 @@ where Self { entry_point: config.entry_point, chain_id: config.chain_id, + num_shards: config.num_shards, reputation, state: RwLock::new(UoPoolState { pool: PoolInner::new(config.into()), @@ -314,20 +316,34 @@ where UoPoolMetrics::increment_removed_entities(self.entry_point); } - fn best_operations(&self, max: usize) -> Vec> { + fn best_operations( + &self, + max: usize, + shard_index: u64, + ) -> MempoolResult>> { + if shard_index >= self.num_shards { + Err(anyhow::anyhow!("Invalid shard ID"))?; + } + // get the best operations from the pool let ordered_ops = self.state.read().pool.best_operations(); // keep track of senders to avoid sending multiple ops from the same sender let mut senders = HashSet::

::new(); - ordered_ops + Ok(ordered_ops .into_iter() .filter(|op| { + // short-circuit the mod if there is only 1 shard + ((self.num_shards == 1) || + (U256::from_little_endian(op.uo.sender.as_bytes()) + .div_mod(self.num_shards.into()) + .1 + == shard_index.into())) && // filter out ops from senders we've already seen senders.insert(op.uo.sender) }) .take(max) - .collect() + .collect()) } fn all_operations(&self, max: usize) -> Vec> { @@ -393,9 +409,9 @@ mod tests { .add_operation(OperationOrigin::Local, op.op) .await .unwrap(); - check_ops(pool.best_operations(1), uos); + check_ops(pool.best_operations(1, 0).unwrap(), uos); pool.remove_operations(&[hash]); - assert_eq!(pool.best_operations(1), vec![]); + assert_eq!(pool.best_operations(1, 0).unwrap(), vec![]); } #[tokio::test] @@ -416,9 +432,9 @@ mod tests { .unwrap(); hashes.push(hash); } - check_ops(pool.best_operations(3), uos); + check_ops(pool.best_operations(3, 0).unwrap(), uos); pool.remove_operations(&hashes); - assert_eq!(pool.best_operations(3), vec![]); + assert_eq!(pool.best_operations(3, 0).unwrap(), vec![]); } #[tokio::test] @@ -437,9 +453,9 @@ mod tests { .await .unwrap(); } - check_ops(pool.best_operations(3), uos); + check_ops(pool.best_operations(3, 0).unwrap(), uos); pool.clear(); - assert_eq!(pool.best_operations(3), vec![]); + assert_eq!(pool.best_operations(3, 0).unwrap(), vec![]); } #[tokio::test] @@ -450,7 +466,7 @@ mod tests { create_op(Address::random(), 0, 1), ]) .await; - check_ops(pool.best_operations(3), uos.clone()); + check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -466,7 +482,7 @@ mod tests { unmined_ops: vec![], }); - check_ops(pool.best_operations(3), uos[1..].to_vec()); + check_ops(pool.best_operations(3, 0).unwrap(), uos[1..].to_vec()); } #[tokio::test] @@ -477,7 +493,7 @@ mod tests { create_op(Address::random(), 0, 1), ]) .await; - check_ops(pool.best_operations(3), uos.clone()); + check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -492,7 +508,10 @@ mod tests { }], unmined_ops: vec![], }); - check_ops(pool.best_operations(3), uos.clone()[1..].to_vec()); + check_ops( + pool.best_operations(3, 0).unwrap(), + uos.clone()[1..].to_vec(), + ); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -507,7 +526,7 @@ mod tests { nonce: uos[0].nonce, }], }); - check_ops(pool.best_operations(3), uos); + check_ops(pool.best_operations(3, 0).unwrap(), uos); } #[tokio::test] @@ -518,7 +537,7 @@ mod tests { create_op(Address::random(), 0, 1), ]) .await; - check_ops(pool.best_operations(3), uos.clone()); + check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -534,7 +553,7 @@ mod tests { unmined_ops: vec![], }); - check_ops(pool.best_operations(3), uos); + check_ops(pool.best_operations(3, 0).unwrap(), uos); } #[tokio::test] @@ -547,7 +566,7 @@ mod tests { ]) .await; // Only return 1 op per sender - check_ops(pool.best_operations(3), vec![uos[0].clone()]); + check_ops(pool.best_operations(3, 0).unwrap(), vec![uos[0].clone()]); let rep = pool.dump_reputation(); assert_eq!(rep.len(), 1); @@ -593,9 +612,9 @@ mod tests { pool.add_operation(OperationOrigin::Local, uos[0].clone()) .await .unwrap(); - check_ops(pool.best_operations(1), vec![uos[0].clone()]); + check_ops(pool.best_operations(1, 0).unwrap(), vec![uos[0].clone()]); - // Second op should be thorottled + // Second op should be throttled let ret = pool .add_operation(OperationOrigin::Local, uos[1].clone()) .await; @@ -627,7 +646,7 @@ mod tests { pool.add_operation(OperationOrigin::Local, uos[1].clone()) .await .unwrap(); - check_ops(pool.best_operations(1), vec![uos[1].clone()]); + check_ops(pool.best_operations(1, 0).unwrap(), vec![uos[1].clone()]); } #[tokio::test] @@ -669,7 +688,7 @@ mod tests { Err(MempoolError::PrecheckViolation(PrecheckViolation::InitCodeTooShort(_))) => {} _ => panic!("Expected InitCodeTooShort error"), } - assert_eq!(pool.best_operations(1), vec![]); + assert_eq!(pool.best_operations(1, 0).unwrap(), vec![]); } #[tokio::test] @@ -689,7 +708,7 @@ mod tests { Err(MempoolError::SimulationViolation(SimulationViolation::DidNotRevert)) => {} _ => panic!("Expected DidNotRevert error"), } - assert_eq!(pool.best_operations(1), vec![]); + assert_eq!(pool.best_operations(1, 0).unwrap(), vec![]); } #[derive(Clone, Debug)] @@ -739,6 +758,7 @@ mod tests { precheck_settings: PrecheckSettings::default(), sim_settings: SimulationSettings::default(), mempool_channel_configs: HashMap::new(), + num_shards: 1, }; let (event_sender, _) = broadcast::channel(4); UoPool::new(args, reputation, event_sender, prechecker, simulator) diff --git a/crates/pool/src/server/local.rs b/crates/pool/src/server/local.rs index ca6e38e8c..71d653e6a 100644 --- a/crates/pool/src/server/local.rs +++ b/crates/pool/src/server/local.rs @@ -118,10 +118,16 @@ impl PoolServer for LocalPoolHandle { } } - async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + async fn get_ops( + &self, + entry_point: Address, + max_ops: u64, + shard_index: u64, + ) -> PoolResult> { let req = ServerRequestKind::GetOps { entry_point, max_ops, + shard_index, }; let resp = self.send(req).await?; match resp { @@ -256,10 +262,15 @@ where }) } - fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + fn get_ops( + &self, + entry_point: Address, + max_ops: u64, + shard_index: u64, + ) -> PoolResult> { let mempool = self.get_pool(entry_point)?; Ok(mempool - .best_operations(max_ops as usize) + .best_operations(max_ops as usize, shard_index)? .iter() .map(|op| (**op).clone()) .collect()) @@ -365,8 +376,8 @@ where Err(e) => Err(e), } }, - ServerRequestKind::GetOps { entry_point, max_ops } => { - match self.get_ops(entry_point, max_ops) { + ServerRequestKind::GetOps { entry_point, max_ops, shard_index } => { + match self.get_ops(entry_point, max_ops, shard_index) { Ok(ops) => Ok(ServerResponse::GetOps { ops }), Err(e) => Err(e), } @@ -439,6 +450,7 @@ enum ServerRequestKind { GetOps { entry_point: Address, max_ops: u64, + shard_index: u64, }, RemoveOps { entry_point: Address, diff --git a/crates/pool/src/server/mod.rs b/crates/pool/src/server/mod.rs index 8382edbea..003ccaf8b 100644 --- a/crates/pool/src/server/mod.rs +++ b/crates/pool/src/server/mod.rs @@ -37,7 +37,12 @@ pub trait PoolServer: Send + Sync + 'static { async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult; /// Get operations from the pool - async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult>; + async fn get_ops( + &self, + entry_point: Address, + max_ops: u64, + shard_index: u64, + ) -> PoolResult>; /// Remove operations from the pool by hash async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()>; diff --git a/crates/pool/src/server/remote/client.rs b/crates/pool/src/server/remote/client.rs index 33bda3e58..d3024abde 100644 --- a/crates/pool/src/server/remote/client.rs +++ b/crates/pool/src/server/remote/client.rs @@ -141,13 +141,19 @@ impl PoolServer for RemotePoolClient { } } - async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + async fn get_ops( + &self, + entry_point: Address, + max_ops: u64, + shard_index: u64, + ) -> PoolResult> { let res = self .op_pool_client .clone() .get_ops(GetOpsRequest { entry_point: entry_point.as_bytes().to_vec(), max_ops, + shard_index, }) .await? .into_inner() diff --git a/crates/pool/src/server/remote/server.rs b/crates/pool/src/server/remote/server.rs index 4d1d3b228..524e994c6 100644 --- a/crates/pool/src/server/remote/server.rs +++ b/crates/pool/src/server/remote/server.rs @@ -143,7 +143,11 @@ impl OpPool for OpPoolImpl { let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.local_pool.get_ops(ep, req.max_ops).await { + let resp = match self + .local_pool + .get_ops(ep, req.max_ops, req.shard_index) + .await + { Ok(ops) => GetOpsResponse { result: Some(get_ops_response::Result::Success(GetOpsSuccess { ops: ops.iter().map(MempoolOp::from).collect(),