Skip to content

Commit

Permalink
feat: add drop UO rpc endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Feb 21, 2024
1 parent 21797d5 commit f8c1c93
Show file tree
Hide file tree
Showing 31 changed files with 550 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 17 additions & 2 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use builder::BuilderCliArgs;
use node::NodeCliArgs;
use pool::PoolCliArgs;
use rpc::RpcCliArgs;
use rundler_rpc::EthApiSettings;
use rundler_rpc::{EthApiSettings, RundlerApiSettings};
use rundler_sim::{
EstimationSettings, PrecheckSettings, PriorityFeeMode, SimulationSettings, MIN_CALL_GAS_LIMIT,
};
Expand Down Expand Up @@ -298,7 +298,7 @@ impl TryFrom<&CommonArgs> for EstimationSettings {
impl TryFrom<&CommonArgs> for PrecheckSettings {
type Error = anyhow::Error;

fn try_from(value: &CommonArgs) -> anyhow::Result<Self> {
fn try_from(value: &CommonArgs) -> Result<Self, Self::Error> {
Ok(Self {
max_verification_gas: value.max_verification_gas.into(),
max_total_execution_gas: value.max_bundle_gas.into(),
Expand Down Expand Up @@ -330,6 +330,21 @@ impl From<&CommonArgs> for EthApiSettings {
}
}

impl TryFrom<&CommonArgs> for RundlerApiSettings {
type Error = anyhow::Error;

fn try_from(value: &CommonArgs) -> Result<Self, Self::Error> {
Ok(Self {
priority_fee_mode: PriorityFeeMode::try_from(
value.priority_fee_mode_kind.as_str(),
value.priority_fee_mode_value,
)?,
bundle_priority_fee_overhead_percent: value.bundle_priority_fee_overhead_percent,
max_verification_gas: value.max_verification_gas,
})
}
}

/// CLI options for the metrics server
#[derive(Debug, Args)]
#[command(next_help_heading = "Metrics")]
Expand Down
1 change: 1 addition & 0 deletions bin/rundler/src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn run(
(&common_args).try_into()?,
(&common_args).into(),
(&common_args).try_into()?,
(&common_args).try_into()?,
)?;

let (event_sender, event_rx) =
Expand Down
9 changes: 9 additions & 0 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ pub struct PoolArgs {
default_value = "true"
)]
pub reputation_tracking_enabled: bool,

#[arg(
long = "pool.drop_min_num_blocks",
name = "pool.drop_min_num_blocks",
env = "POOL_DROP_MIN_NUM_BLOCKS",
default_value = "10"
)]
pub drop_min_num_blocks: u64,
}

impl PoolArgs {
Expand Down Expand Up @@ -182,6 +190,7 @@ impl PoolArgs {
throttled_entity_live_blocks: self.throttled_entity_live_blocks,
paymaster_tracking_enabled: self.paymaster_tracking_enabled,
reputation_tracking_enabled: self.reputation_tracking_enabled,
drop_min_num_blocks: self.drop_min_num_blocks,
};

Ok(PoolTaskArgs {
Expand Down
5 changes: 4 additions & 1 deletion bin/rundler/src/cli/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::Context;
use clap::Args;
use rundler_builder::RemoteBuilderClient;
use rundler_pool::RemotePoolClient;
use rundler_rpc::{EthApiSettings, RpcTask, RpcTaskArgs};
use rundler_rpc::{EthApiSettings, RpcTask, RpcTaskArgs, RundlerApiSettings};
use rundler_sim::{EstimationSettings, PrecheckSettings};
use rundler_task::{server::connect_with_retries_shutdown, spawn_tasks_with_shutdown};
use rundler_types::chain::ChainSpec;
Expand Down Expand Up @@ -86,6 +86,7 @@ impl RpcArgs {
common: &CommonArgs,
precheck_settings: PrecheckSettings,
eth_api_settings: EthApiSettings,
rundler_api_settings: RundlerApiSettings,
estimation_settings: EstimationSettings,
) -> anyhow::Result<RpcTaskArgs> {
let apis = self
Expand All @@ -105,6 +106,7 @@ impl RpcArgs {
api_namespaces: apis,
precheck_settings,
eth_api_settings,
rundler_api_settings,
estimation_settings,
rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?),
max_connections: self.max_connections,
Expand Down Expand Up @@ -154,6 +156,7 @@ pub async fn run(
(&common_args).try_into()?,
(&common_args).into(),
(&common_args).try_into()?,
(&common_args).try_into()?,
)?;

let pool = connect_with_retries_shutdown(
Expand Down
25 changes: 25 additions & 0 deletions crates/pool/proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ service OpPool {
// Removes UserOperations from the mempool
rpc RemoveOps(RemoveOpsRequest) returns (RemoveOpsResponse);

// Remove a UserOperation by its id
rpc RemoveOpById(RemoveOpByIdRequest) returns (RemoveOpByIdResponse);

// Handles a list of updates to be performed on entities
rpc UpdateEntities(UpdateEntitiesRequest) returns (UpdateEntitiesResponse);

Expand Down Expand Up @@ -281,6 +284,21 @@ message RemoveOpsResponse {
}
message RemoveOpsSuccess {}

message RemoveOpByIdRequest {
bytes entry_point = 1;
bytes sender = 2;
bytes nonce = 3;
}
message RemoveOpByIdResponse {
oneof result {
RemoveOpByIdSuccess success = 1;
MempoolError failure = 2;
}
}
message RemoveOpByIdSuccess {
bytes hash = 1;
}

message UpdateEntitiesRequest {
// The serilaized entry point address
bytes entry_point = 1;
Expand Down Expand Up @@ -428,6 +446,7 @@ message MempoolError {
SenderAddressUsedAsAlternateEntity sender_address_used_as_alternate_entity = 13;
AssociatedStorageIsAlternateSender associated_storage_is_alternate_sender = 14;
PaymasterBalanceTooLow paymaster_balance_too_low = 15;
OperationDropTooSoon operation_drop_too_soon = 16;
}
}

Expand Down Expand Up @@ -474,6 +493,12 @@ message UnsupportedAggregatorError {

message InvalidSignatureError {}

message OperationDropTooSoon {
uint64 added_at = 1;
uint64 attempted_at = 2;
uint64 must_wait = 3;
}

// PRECHECK VIOLATIONS
message PrecheckViolationError {
oneof violation {
Expand Down
3 changes: 3 additions & 0 deletions crates/pool/src/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub enum MempoolError {
/// An unknown entry point was specified
#[error("Unknown entry point {0}")]
UnknownEntryPoint(Address),
/// The operation drop attempt too soon after being added to the pool
#[error("Operation drop attempt too soon after being added to the pool. Added at {0}, attempted to drop at {1}, must wait {2} blocks.")]
OperationDropTooSoon(u64, u64, u64),
}

impl From<SimulationError> for MempoolError {
Expand Down
9 changes: 8 additions & 1 deletion crates/pool/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use ethers::types::{Address, H256, U256};
#[cfg(test)]
use mockall::automock;
use rundler_sim::{EntityInfos, MempoolConfig, PrecheckSettings, SimulationSettings};
use rundler_types::{Entity, EntityType, EntityUpdate, UserOperation, ValidTimeRange};
use rundler_types::{
Entity, EntityType, EntityUpdate, UserOperation, UserOperationId, ValidTimeRange,
};
use tonic::async_trait;
pub(crate) use uo_pool::UoPool;

Expand All @@ -63,6 +65,9 @@ pub trait Mempool: Send + Sync + 'static {
/// Removes a set of operations from the pool.
fn remove_operations(&self, hashes: &[H256]);

/// Removes an operation from the pool by its ID.
fn remove_op_by_id(&self, id: UserOperationId) -> MempoolResult<Option<H256>>;

/// Updates the reputation of an entity.
fn update_entity(&self, entity_update: EntityUpdate);

Expand Down Expand Up @@ -153,6 +158,8 @@ pub struct PoolConfig {
pub paymaster_tracking_enabled: bool,
/// Boolean field used to toggle the operation of the reputation tracker
pub reputation_tracking_enabled: bool,
/// The minimum number of blocks a user operation must be in the mempool before it can be dropped
pub drop_min_num_blocks: u64,
}

/// Stake status structure
Expand Down
34 changes: 34 additions & 0 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ impl PoolInner {
self.by_hash.get(&hash).map(|o| o.po.clone())
}

pub(crate) fn get_operation_by_id(&self, id: UserOperationId) -> Option<Arc<PoolOperation>> {
self.by_id.get(&id).map(|o| o.po.clone())
}

pub(crate) fn remove_operation_by_hash(&mut self, hash: H256) -> Option<Arc<PoolOperation>> {
let ret = self.remove_operation_internal(hash, None);
self.update_metrics();
Expand Down Expand Up @@ -634,6 +638,36 @@ mod tests {
check_map_entry(pool.best.iter().next(), Some(&op));
}

#[test]
fn test_get_by_hash() {
let mut pool = PoolInner::new(conf());
let op = create_op(Address::random(), 0, 1);
let hash = pool.add_operation(op.clone(), None).unwrap();

let get_op = pool.get_operation_by_hash(hash).unwrap();
assert_eq!(op, *get_op);

assert_eq!(pool.get_operation_by_hash(H256::random()), None);
}

#[test]
fn test_get_by_id() {
let mut pool = PoolInner::new(conf());
let op = create_op(Address::random(), 0, 1);
pool.add_operation(op.clone(), None).unwrap();
let id = op.uo.id();

let get_op = pool.get_operation_by_id(id).unwrap();
assert_eq!(op, *get_op);

let bad_id = UserOperationId {
sender: Address::random(),
nonce: 0.into(),
};

assert_eq!(pool.get_operation_by_id(bad_id), None);
}

#[test]
fn add_multiple_ops() {
let mut pool = PoolInner::new(conf());
Expand Down
43 changes: 42 additions & 1 deletion crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use parking_lot::RwLock;
use rundler_provider::{EntryPoint, PaymasterHelper, ProviderResult};
use rundler_sim::{Prechecker, Simulator};
use rundler_types::{Entity, EntityUpdate, EntityUpdateType, UserOperation};
use rundler_types::{Entity, EntityUpdate, EntityUpdateType, UserOperation, UserOperationId};
use rundler_utils::emit::WithEntryPoint;
use tokio::sync::broadcast;
use tonic::async_trait;
Expand Down Expand Up @@ -509,6 +509,46 @@ where
UoPoolMetrics::increment_removed_operations(count, self.config.entry_point);
}

fn remove_op_by_id(&self, id: UserOperationId) -> MempoolResult<Option<H256>> {
// Check for the operation in the pool and its age
let po = {
let state = self.state.read();
match state.pool.get_operation_by_id(id) {
Some(po) => {
if po.sim_block_number + self.config.drop_min_num_blocks > state.block_number {
// TODO return error
return Err(MempoolError::OperationDropTooSoon(
po.sim_block_number,
state.block_number,
self.config.drop_min_num_blocks,
));
}
po
}
None => return Ok(None),
}
};

let hash = po.uo.op_hash(self.config.entry_point, self.config.chain_id);

// This can return none if the operation was removed by another thread
if self
.state
.write()
.pool
.remove_operation_by_hash(hash)
.is_none()
{
return Ok(None);
}

self.emit(OpPoolEvent::RemovedOp {
op_hash: hash,
reason: OpRemovalReason::Requested,
});
Ok(Some(hash))
}

fn update_entity(&self, update: EntityUpdate) {
let entity = update.entity;
match update.update_type {
Expand Down Expand Up @@ -1426,6 +1466,7 @@ mod tests {
throttled_entity_live_blocks: 10,
paymaster_tracking_enabled: true,
reputation_tracking_enabled: true,
drop_min_num_blocks: 10,
};
let (event_sender, _) = broadcast::channel(4);

Expand Down
37 changes: 36 additions & 1 deletion crates/pool/src/server/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use ethers::types::{Address, H256};
use futures_util::Stream;
use rundler_task::server::{HealthCheck, ServerStatus};
use rundler_types::{EntityUpdate, UserOperation};
use rundler_types::{EntityUpdate, UserOperation, UserOperationId};
use tokio::{
sync::{broadcast, mpsc, oneshot},
task::JoinHandle,
Expand Down Expand Up @@ -170,6 +170,19 @@ impl PoolServer for LocalPoolHandle {
}
}

async fn remove_op_by_id(
&self,
entry_point: Address,
id: UserOperationId,
) -> PoolResult<Option<H256>> {
let req = ServerRequestKind::RemoveOpById { entry_point, id };
let resp = self.send(req).await?;
match resp {
ServerResponse::RemoveOpById { hash } => Ok(hash),
_ => Err(PoolServerError::UnexpectedResponse),
}
}

async fn update_entities(
&self,
entry_point: Address,
Expand Down Expand Up @@ -391,6 +404,15 @@ where
Ok(())
}

fn remove_op_by_id(
&self,
entry_point: Address,
id: UserOperationId,
) -> PoolResult<Option<H256>> {
let mempool = self.get_pool(entry_point)?;
mempool.remove_op_by_id(id).map_err(|e| e.into())
}

fn update_entities<'a>(
&self,
entry_point: Address,
Expand Down Expand Up @@ -572,6 +594,12 @@ where
Err(e) => Err(e),
}
},
ServerRequestKind::RemoveOpById { entry_point, id } => {
match self.remove_op_by_id(entry_point, id) {
Ok(hash) => Ok(ServerResponse::RemoveOpById{ hash }),
Err(e) => Err(e),
}
},
ServerRequestKind::AdminSetTracking{ entry_point, paymaster, reputation } => {
match self.admin_set_tracking(entry_point, paymaster, reputation) {
Ok(_) => Ok(ServerResponse::AdminSetTracking),
Expand Down Expand Up @@ -661,6 +689,10 @@ enum ServerRequestKind {
entry_point: Address,
ops: Vec<H256>,
},
RemoveOpById {
entry_point: Address,
id: UserOperationId,
},
UpdateEntities {
entry_point: Address,
entity_updates: Vec<EntityUpdate>,
Expand Down Expand Up @@ -714,6 +746,9 @@ enum ServerResponse {
op: Option<PoolOperation>,
},
RemoveOps,
RemoveOpById {
hash: Option<H256>,
},
UpdateEntities,
DebugClearState,
AdminSetTracking,
Expand Down
Loading

0 comments on commit f8c1c93

Please sign in to comment.