Skip to content

Commit

Permalink
feat: implement pool sharding and n-builders
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Sep 20, 2023
1 parent 67d441c commit 045ff2e
Show file tree
Hide file tree
Showing 17 changed files with 480 additions and 209 deletions.
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,20 @@ build: ## Build the project.
clean: ## Clean the project.
cargo clean

## Run all tests.
.PHONY: test
test: test-unit ## Run all tests.
test: test-unit test-spec-integrated test-spec-modular

.PHONY: test-unit
test-unit: ## Run unit tests.
cargo install cargo-nextest --locked
cargo nextest run $(UNIT_TEST_ARGS)

.PHONY: test-spec-integrated
test-spec-integrated: ## Run spec tests in integrated mode
test/spec-tests/local/run-spec-tests.sh

.PHONY: test-spec-modular
test-spec-modular: ## Run spec tests in modular mode
test/spec-tests/remote/run-spec-tests.sh

18 changes: 15 additions & 3 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{collections::HashMap, net::SocketAddr, time::Duration};
use anyhow::Context;
use clap::Args;
use ethers::types::H256;
use rundler_builder::{self, BuilderEvent, BuilderTask, BuilderTaskArgs, LocalBuilderBuilder};
use rundler_builder::{
self, BuilderEvent, BuilderEventKind, BuilderTask, BuilderTaskArgs, LocalBuilderBuilder,
};
use rundler_pool::RemotePoolClient;
use rundler_sim::{MempoolConfig, PriorityFeeMode};
use rundler_task::{
Expand Down Expand Up @@ -142,6 +144,14 @@ pub struct BuilderArgs {
env = "BUILDER_BLOXROUTE_AUTH_HEADER"
)]
bloxroute_auth_header: Option<String>,
/// The index offset to apply to the builder index
#[arg(
long = "builder_index_offset",
name = "builder_index_offset",
env = "BUILDER_INDEX_OFFSET",
default_value = "0"
)]
pub builder_index_offset: u64,
}

impl BuilderArgs {
Expand Down Expand Up @@ -202,6 +212,8 @@ impl BuilderArgs {
max_fee_increases: self.max_fee_increases,
remote_address,
bloxroute_auth_header: self.bloxroute_auth_header.clone(),
num_bundle_builders: common.num_builders,
bundle_builder_index_offset: self.builder_index_offset,
})
}
}
Expand Down Expand Up @@ -261,11 +273,11 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
}

pub fn is_nonspammy_event(event: &WithEntryPoint<BuilderEvent>) -> bool {
if let BuilderEvent::FormedBundle {
if let BuilderEventKind::FormedBundle {
tx_details,
fee_increase_count,
..
} = &event.event
} = &event.event.kind
{
if tx_details.is_none() && *fee_increase_count == 0 {
return false;
Expand Down
8 changes: 8 additions & 0 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ pub struct CommonArgs {
env = "MEMPOOL_CONFIG_PATH"
)]
pub mempool_config_path: Option<String>,

#[arg(
long = "num_builders",
name = "num_builders",
env = "NUM_BUILDERS",
default_value = "1"
)]
pub num_builders: u64,
}

const SIMULATION_GAS_OVERHEAD: u64 = 100_000;
Expand Down
2 changes: 2 additions & 0 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ impl PoolArgs {
Ok(PoolConfig {
entry_point,
chain_id: common.chain_id,
// Currently use the same shard count as the number of builders
num_shards: common.num_builders,
max_userops_per_sender: self.max_userops_per_sender,
min_replacement_fee_increase_percentage: self
.min_replacement_fee_increase_percentage,
Expand Down
95 changes: 70 additions & 25 deletions crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ where
P: Provider,
C: PoolServer,
{
builder_index: u64,
pool: C,
simulator: S,
entry_point: E,
Expand Down Expand Up @@ -109,7 +110,18 @@ where
)?;

// Limit the amount of gas in the bundle
let ops = self.limit_gas_in_bundle(ops);
tracing::debug!(
"Builder index: {}, starting bundle proposal with {} ops",
self.builder_index,
ops.len(),
);
let (ops, gas_limit) = self.limit_gas_in_bundle(ops);
tracing::debug!(
"Builder index: {}, bundle proposal after limit had {} ops and {:?} gas limit",
self.builder_index,
ops.len(),
gas_limit
);

// Determine fees required for ops to be included in a bundle, and filter out ops that don't
// meet the requirements. Simulate unfiltered ops.
Expand All @@ -122,16 +134,17 @@ where
{
true
} else {
self.emit(BuilderEvent::SkippedOp {
op_hash: self.op_hash(&op.uo),
reason: SkipReason::InsufficientFees {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.op_hash(&op.uo),
SkipReason::InsufficientFees {
required_fees: required_op_fees,
actual_fees: GasFees {
max_fee_per_gas: op.uo.max_fee_per_gas,
max_priority_fee_per_gas: op.uo.max_priority_fee_per_gas,
},
},
});
));
false
}
})
Expand Down Expand Up @@ -164,6 +177,12 @@ where
expected_storage.merge(&op.simulation.expected_storage)?;
}
if let Some(gas_estimate) = gas_estimate {
tracing::debug!(
"Builder index: {}, bundle proposal succeeded with {} ops and {:?} gas limit",
self.builder_index,
context.iter_ops().count(),
gas_estimate
);
return Ok(Bundle {
ops_per_aggregator: context.to_ops_per_aggregator(),
gas_estimate,
Expand Down Expand Up @@ -192,6 +211,7 @@ where
C: PoolServer,
{
pub(crate) fn new(
builder_index: u64,
pool: C,
simulator: S,
entry_point: E,
Expand All @@ -200,6 +220,7 @@ where
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Self {
builder_index,
pool,
simulator,
entry_point,
Expand Down Expand Up @@ -250,10 +271,11 @@ where
let simulation = match simulation {
Ok(simulation) => simulation,
Err(error) => {
self.emit(BuilderEvent::RejectedOp {
op_hash: self.op_hash(&op),
reason: OpRejectionReason::FailedRevalidation { error },
});
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.op_hash(&op),
OpRejectionReason::FailedRevalidation { error },
));
rejected_ops.push(op);
continue;
}
Expand All @@ -264,12 +286,13 @@ where
.valid_time_range
.contains(Timestamp::now(), TIME_RANGE_BUFFER)
{
self.emit(BuilderEvent::SkippedOp {
op_hash: self.op_hash(&op),
reason: SkipReason::InvalidTimeRange {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.op_hash(&op),
SkipReason::InvalidTimeRange {
valid_range: simulation.valid_time_range,
},
});
));
rejected_ops.push(op);
continue;
}
Expand All @@ -282,10 +305,11 @@ where
// Exclude ops that access the sender of another op in the
// batch, but don't reject them (remove them from pool).
info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender);
self.emit(BuilderEvent::SkippedOp {
op_hash: self.op_hash(&op),
reason: SkipReason::AccessedOtherSender { other_sender },
});
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.op_hash(&op),
SkipReason::AccessedOtherSender { other_sender },
));
continue;
}
if let Some(paymaster) = op.paymaster() {
Expand Down Expand Up @@ -386,12 +410,13 @@ where
match handle_ops_out {
HandleOpsOut::Success => Ok(Some(gas)),
HandleOpsOut::FailedOp(index, message) => {
self.emit(BuilderEvent::RejectedOp {
op_hash: self.op_hash(context.get_op_at(index)?),
reason: OpRejectionReason::FailedInBundle {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.op_hash(context.get_op_at(index)?),
OpRejectionReason::FailedInBundle {
message: Arc::new(message.clone()),
},
});
));
self.process_failed_op(context, index, message).await?;
Ok(None)
}
Expand All @@ -405,8 +430,17 @@ where
}

async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<PoolOperation>> {
// Use builder's index as the shard index to ensure that two builders don't
// attempt to bundle the same operations.
//
// NOTE: this assumes that the pool server has as many shards as there
// are builders.
self.pool
.get_ops(self.entry_point.address(), self.settings.max_bundle_size)
.get_ops(
self.entry_point.address(),
self.settings.max_bundle_size,
self.builder_index,
)
.await
.context("should get ops from pool")
}
Expand Down Expand Up @@ -483,18 +517,28 @@ where
Ok(())
}

fn limit_gas_in_bundle(&self, ops: Vec<PoolOperation>) -> Vec<PoolOperation> {
fn limit_gas_in_bundle(&self, ops: Vec<PoolOperation>) -> (Vec<PoolOperation>, u64) {
let mut gas_left = U256::from(self.settings.max_bundle_gas);
let mut ops_in_bundle = Vec::new();
for op in ops {
let gas = gas::user_operation_execution_gas_limit(&op.uo, self.settings.chain_id);
if gas_left < gas {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.op_hash(&op.uo),
SkipReason::GasLimit,
));
continue;
}
gas_left -= gas;
ops_in_bundle.push(op);
}
ops_in_bundle
(
ops_in_bundle,
self.settings
.max_bundle_gas
.saturating_sub(gas_left.as_u64()),
)
}

fn emit(&self, event: BuilderEvent) {
Expand Down Expand Up @@ -1191,7 +1235,7 @@ mod tests {
let mut pool_client = MockPoolServer::new();
pool_client
.expect_get_ops()
.returning(move |_, _| Ok(ops.clone()));
.returning(move |_, _, _| Ok(ops.clone()));

let simulations_by_op: HashMap<_, _> = mock_ops
.into_iter()
Expand Down Expand Up @@ -1241,6 +1285,7 @@ mod tests {
.returning(move |address, _| signatures_by_aggregator[&address]());
let (event_sender, _) = broadcast::channel(16);
let proposer = BundleProposerImpl::new(
0,
pool_client,
simulator,
entry_point,
Expand Down
Loading

0 comments on commit 045ff2e

Please sign in to comment.