diff --git a/proto/op_pool.proto b/proto/op_pool.proto index 33363f095..5b3d8e883 100644 --- a/proto/op_pool.proto +++ b/proto/op_pool.proto @@ -60,12 +60,23 @@ message GetSupportedEntryPointsResponse { message AddOpRequest { bytes entry_point = 1; - MempoolOp op = 2; + UserOperation op = 2; } message AddOpResponse { + oneof result { + AddOpSuccess success = 1; + AddOpFailure failure = 2; + } +} + +message AddOpSuccess { bytes hash = 1; } +message AddOpFailure { + MempoolError error = 1; +} + message GetOpsRequest { bytes entry_point = 1; uint64 max_ops = 2; @@ -122,22 +133,203 @@ enum ReputationStatus { BANNED = 2; } -message ErrorInfo { - string reason = 1; - map metadata = 2; +// MEMPOOL ERRORS + +message MempoolError { + oneof error { + string internal = 1; + ReplacementUnderpricedError replacement_underpriced = 2; + MaxOperationsReachedError max_operations_reached = 3; + EntityThrottledError entity_throttled = 4; + DiscardedOnInsertError discarded_on_insert = 5; + PrecheckViolationError precheck_violation = 6; + SimulationViolationError simulation_violation = 7; + InvalidSignatureError invalid_signature = 8; + UnsupportedAggregatorError unsupported_aggregator = 9; + } +} + +message ReplacementUnderpricedError { + bytes current_fee = 1; + bytes current_priority_fee = 2; +} + +message MaxOperationsReachedError { + uint64 num_ops = 1; + bytes sender_address = 2; +} + +message EntityThrottledError { + Entity entity = 1; +} + +message DiscardedOnInsertError {} + +message InvalidSignatureError {} + +message UnsupportedAggregatorError { + bytes aggregator_address = 1; +} + +// PRECHECK VIOLATIONS +message PrecheckViolationError { + oneof violation { + InitCodeTooShort init_code_too_short = 1; + SenderIsNotContractAndNoInitCode sender_is_not_contract_and_no_init_code = 2; + ExistingSenderWithInitCode existing_sender_with_init_code = 3; + FactoryIsNotContract factory_is_not_contract = 4; + VerificationGasLimitTooHigh verification_gas_limit_too_high = 5; + PreVerificationGasTooLow pre_verification_gas_too_low = 6; + PaymasterTooShort paymaster_too_short = 7; + PaymasterIsNotContract paymaster_is_not_contract = 8; + PaymasterDepositTooLow paymaster_deposit_too_low = 9; + SenderFundsTooLow sender_funds_too_low = 10; + MaxFeePerGasTooLow max_fee_per_gas_too_low = 11; + MaxPriorityFeePerGasTooLow max_priority_fee_per_gas_too_low = 12; + CallGasLimitTooLow call_gas_limit_too_low = 13; + } +} + +message InitCodeTooShort { + uint64 length = 1; +} + +message SenderIsNotContractAndNoInitCode { + bytes sender_address = 1; +} + +message ExistingSenderWithInitCode { + bytes sender_address = 1; +} + +message FactoryIsNotContract { + bytes factory_address = 1; +} + +message VerificationGasLimitTooHigh { + bytes actual_gas = 1; + bytes max_gas = 2; +} + +message PreVerificationGasTooLow { + bytes actual_gas = 1; + bytes min_gas = 2; +} + +message PaymasterTooShort { + uint64 length = 1; +} + +message PaymasterIsNotContract { + bytes paymaster_address = 1; +} + +message PaymasterDepositTooLow { + bytes actual_deposit = 1; + bytes min_deposit = 2; +} + +message SenderFundsTooLow { + bytes actual_funds = 1; + bytes min_funds = 2; +} + +message MaxFeePerGasTooLow { + bytes actual_fee = 1; + bytes min_fee = 2; +} + +message MaxPriorityFeePerGasTooLow { + bytes actual_fee = 1; + bytes min_fee = 2; +} + +message CallGasLimitTooLow { + bytes actual_gas_limit = 1; + bytes min_gas_limit = 2; +} + +// SIMULATION VIOLATIONS +message SimulationViolationError { + oneof violation { + UnintendedRevertWithMessage unintended_revert_with_message = 1; + UsedForbiddenOpcode used_forbidden_opcode = 2; + UsedForbiddenPrecompile used_forbidden_precompile = 3; + FactoryCalledCreate2Twice factory_called_create2_twice = 4; + InvalidStorageAccess invalid_storage_access = 5; + NotStaked not_staked = 6; + UnintendedRevert unintended_revert = 7; + DidNotRevert did_not_revert = 8; + WrongNumberOfPhases wrong_number_of_phases = 9; + CallHadValue call_had_value = 10; + OutOfGas out_of_gas = 11; + AccessedUndeployedContract accessed_undeployed_contract = 12; + CalledBannedEntryPointMethod called_banned_entry_point_method = 13; + CodeHashChanged code_hash_changed = 14; + AggregatorValidationFailed aggregator_validation_failed = 15; + } +} + +message UnintendedRevertWithMessage { + Entity entity = 1; + string reason = 2; +} + +message UsedForbiddenOpcode { + Entity entity = 1; + bytes contract_address = 2; + uint32 opcode = 3; +} + +message UsedForbiddenPrecompile { + Entity entity = 1; + bytes contract_address = 2; + bytes precompile_address = 3; } -enum ErrorReason { - ERROR_REASON_UNSPECIFIED = 0; - ERROR_REASON_INTERNAL = 1; - ERROR_REASON_ENTITY_THROTTLED = 2; - ERROR_REASON_OPERATION_REJECTED = 3; - ERROR_REASON_REPLACEMENT_UNDERPRICED = 4; - ERROR_REASON_OPERATION_DISCARDED_ON_INSERT = 5; +message FactoryCalledCreate2Twice { + bytes factory_address = 1; } -enum ErrorMetadataKey { - ERROR_METADATA_KEY_UNSPECIFIED = 0; - ERROR_METADATA_KEY_CURRENT_MAX_PRIORITY_FEE_PER_GAS = 1; - ERROR_METADATA_KEY_CURRENT_MAX_FEE_PER_GAS = 2; +message InvalidStorageAccess { + Entity entity = 1; + bytes contract_address = 2; + bytes slot = 3; } + +message NotStaked { + Entity entity = 1; + bytes min_stake = 2; + bytes min_unstake_delay = 3; +} + +message UnintendedRevert { + EntityType entity_type = 1; +} + +message DidNotRevert {} + +message WrongNumberOfPhases { + uint32 num_phases = 1; +} + +message CallHadValue { + Entity entity = 1; +} + +message OutOfGas { + Entity entity = 1; +} + +message AccessedUndeployedContract { + Entity entity = 1; + bytes contract_address = 2; +} + +message CalledBannedEntryPointMethod { + Entity entity = 1; +} + +message CodeHashChanged {} + +message AggregatorValidationFailed {} diff --git a/src/cli/node.rs b/src/cli/node.rs index 2d0c807c4..f71e73f65 100644 --- a/src/cli/node.rs +++ b/src/cli/node.rs @@ -38,8 +38,6 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: pool_url, builder_url, (&common_args).try_into()?, - (&common_args).into(), - (&common_args).try_into()?, ) .await?; diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 93b1d06df..432896d1c 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -1,12 +1,13 @@ -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use anyhow::Context; use clap::Args; +use ethers::types::H256; use super::CommonArgs; use crate::{ cli::json::get_json_config, - common::handle::spawn_tasks_with_shutdown, + common::{handle::spawn_tasks_with_shutdown, mempool::MempoolConfig}, op_pool::{self, PoolConfig, PoolTask}, }; @@ -96,6 +97,14 @@ impl PoolArgs { tracing::info!("blocklist: {:?}", blocklist); tracing::info!("allowlist: {:?}", allowlist); + let mempool_channel_configs = match &common.mempool_config_path { + Some(path) => { + get_json_config::>(path, &common.aws_region).await? + } + None => HashMap::from([(H256::zero(), MempoolConfig::default())]), + }; + tracing::info!("Mempool channel configs: {:?}", mempool_channel_configs); + let pool_configs = common .entry_points .iter() @@ -110,16 +119,24 @@ impl PoolArgs { max_size_of_pool_bytes: self.max_size_in_bytes, blocklist: blocklist.clone(), allowlist: allowlist.clone(), + precheck_settings: common.try_into()?, + sim_settings: common.try_into()?, + mempool_channel_configs: mempool_channel_configs.clone(), }) }) .collect::>>()?; + let http_url = common + .node_http + .as_ref() + .ok_or_else(|| anyhow::anyhow!("node_http is required"))?; + Ok(op_pool::Args { port: self.port, host: self.host.clone(), chain_id: common.chain_id, ws_url: common.node_ws.clone(), - http_url: common.node_http.clone(), + http_url: http_url.to_owned(), http_poll_interval: Duration::from_millis(self.http_poll_interval_millis), pool_configs, }) diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index 6658b685d..e07f0e283 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -1,12 +1,11 @@ -use std::{collections::HashMap, time::Duration}; +use std::time::Duration; use anyhow::Context; use clap::Args; -use ethers::types::H256; -use super::{json::get_json_config, CommonArgs}; +use super::CommonArgs; use crate::{ - common::{handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, precheck, simulation}, + common::handle::spawn_tasks_with_shutdown, rpc::{self, estimation, RpcTask}, }; @@ -61,8 +60,6 @@ impl RpcArgs { common: &CommonArgs, pool_url: String, builder_url: String, - precheck_settings: precheck::Settings, - sim_settings: simulation::Settings, estimation_settings: estimation::Settings, ) -> anyhow::Result { let apis = self @@ -71,13 +68,6 @@ impl RpcArgs { .map(|api| api.parse()) .collect::, _>>()?; - let mempool_configs = match &common.mempool_config_path { - Some(path) => { - get_json_config::>(path, &common.aws_region).await? - } - None => HashMap::from([(H256::zero(), MempoolConfig::default())]), - }; - Ok(rpc::Args { port: self.port, host: self.host.clone(), @@ -95,11 +85,8 @@ impl RpcArgs { .context("rpc requires node_http arg")?, chain_id: common.chain_id, api_namespaces: apis, - precheck_settings, - sim_settings, estimation_settings, rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?), - mempool_configs, }) } } @@ -142,8 +129,6 @@ pub async fn run(rpc_args: RpcCliArgs, common_args: CommonArgs) -> anyhow::Resul pool_url, builder_url, (&common_args).try_into()?, - (&common_args).into(), - (&common_args).try_into()?, ) .await?; diff --git a/src/common/precheck.rs b/src/common/precheck.rs index 69a45ec6d..e5993d209 100644 --- a/src/common/precheck.rs +++ b/src/common/precheck.rs @@ -18,7 +18,7 @@ use crate::common::{ pub const MIN_CALL_GAS_LIMIT: U256 = U256([9100, 0, 0, 0]); #[async_trait] -pub trait Prechecker { +pub trait Prechecker: Send + Sync + 'static { async fn check(&self, op: &UserOperation) -> Result<(), PrecheckError>; } @@ -41,6 +41,17 @@ pub struct Settings { pub priority_fee_mode: gas::PriorityFeeMode, } +impl Default for Settings { + fn default() -> Self { + Self { + max_verification_gas: 5_000_000.into(), + use_bundle_priority_fee: None, + bundle_priority_fee_overhead_percent: 0, + priority_fee_mode: gas::PriorityFeeMode::Fixed(0), + } + } +} + #[derive(Copy, Clone, Debug)] struct AsyncData { factory_exists: bool, @@ -133,7 +144,7 @@ impl PrecheckerImpl { let mut violations = ArrayVec::new(); if op.verification_gas_limit > max_verification_gas { - violations.push(PrecheckViolation::VerificationGasTooHigh( + violations.push(PrecheckViolation::VerificationGasLimitTooHigh( op.verification_gas_limit, max_verification_gas, )); @@ -288,7 +299,7 @@ impl PrecheckerImpl { } } -#[derive(Clone, Debug, parse_display::Display, Eq, PartialEq)] +#[derive(Clone, Debug, parse_display::Display, Eq, PartialEq, Ord, PartialOrd)] pub enum PrecheckViolation { #[display("initCode must start with a 20-byte factory address, but was only {0} bytes")] InitCodeTooShort(usize), @@ -299,7 +310,7 @@ pub enum PrecheckViolation { #[display("initCode indicates factory with no code: {0:?}")] FactoryIsNotContract(Address), #[display("verificationGasLimit is {0} but must be at most {1}")] - VerificationGasTooHigh(U256, U256), + VerificationGasLimitTooHigh(U256, U256), #[display("preVerificationGas is {0} but must be at least {1}")] PreVerificationGasTooLow(U256, U256), #[display("paymasterAndData must start a 20-byte paymaster address, but was only {0} bytes")] diff --git a/src/common/types/mod.rs b/src/common/types/mod.rs index 74c8488f1..285c69d2d 100644 --- a/src/common/types/mod.rs +++ b/src/common/types/mod.rs @@ -20,7 +20,7 @@ use ethers::{ use parse_display::Display; pub use provider_like::*; use serde::{ser::SerializeStruct, Deserialize, Serialize}; -use strum::EnumIter; +use strum::{EnumIter, IntoEnumIterator}; pub use timestamp::*; pub use validation_results::*; pub use violations::*; @@ -99,6 +99,22 @@ impl UserOperation { let max_gas = self.call_gas_limit + self.verification_gas_limit + self.pre_verification_gas; max_gas * self.max_fee_per_gas } + + pub fn entities(&'_ self) -> impl Iterator + '_ { + EntityType::iter().filter_map(|entity| { + self.entity_address(entity) + .map(|address| Entity::new(entity, address)) + }) + } + + fn entity_address(&self, entity: EntityType) -> Option
{ + match entity { + EntityType::Account => Some(self.sender), + EntityType::Paymaster => self.paymaster(), + EntityType::Factory => self.factory(), + EntityType::Aggregator => None, + } + } } #[derive(Display, Debug, Clone, Ord, Copy, Eq, PartialEq, EnumIter, PartialOrd, Deserialize)] diff --git a/src/op_pool/error.rs b/src/op_pool/error.rs new file mode 100644 index 000000000..eff333e16 --- /dev/null +++ b/src/op_pool/error.rs @@ -0,0 +1,559 @@ +use anyhow::{bail, Context}; +use ethers::types::Opcode; + +use super::mempool::error::MempoolError; +use crate::common::{ + precheck::PrecheckViolation, + protos::{ + from_bytes, + op_pool::{ + mempool_error, precheck_violation_error, simulation_violation_error, + AccessedUndeployedContract, AggregatorValidationFailed, CallGasLimitTooLow, + CallHadValue, CalledBannedEntryPointMethod, CodeHashChanged, DidNotRevert, + DiscardedOnInsertError, Entity, EntityThrottledError, EntityType, + ExistingSenderWithInitCode, FactoryCalledCreate2Twice, FactoryIsNotContract, + InitCodeTooShort, InvalidSignatureError, InvalidStorageAccess, MaxFeePerGasTooLow, + MaxOperationsReachedError, MaxPriorityFeePerGasTooLow, + MempoolError as ProtoMempoolError, NotStaked, OutOfGas, PaymasterDepositTooLow, + PaymasterIsNotContract, PaymasterTooShort, PreVerificationGasTooLow, + PrecheckViolationError as ProtoPrecheckViolationError, ReplacementUnderpricedError, + SenderFundsTooLow, SenderIsNotContractAndNoInitCode, + SimulationViolationError as ProtoSimulationViolationError, UnintendedRevert, + UnintendedRevertWithMessage, UnsupportedAggregatorError, UsedForbiddenOpcode, + UsedForbiddenPrecompile, VerificationGasLimitTooHigh, WrongNumberOfPhases, + }, + to_le_bytes, + }, + simulation::{SimulationViolation, StorageSlot}, +}; + +impl TryFrom for MempoolError { + type Error = anyhow::Error; + + fn try_from(value: ProtoMempoolError) -> Result { + Ok(match value.error { + Some(mempool_error::Error::Internal(e)) => MempoolError::Other(anyhow::Error::msg(e)), + Some(mempool_error::Error::ReplacementUnderpriced(e)) => { + MempoolError::ReplacementUnderpriced( + from_bytes(&e.current_fee)?, + from_bytes(&e.current_priority_fee)?, + ) + } + Some(mempool_error::Error::MaxOperationsReached(e)) => { + MempoolError::MaxOperationsReached( + e.num_ops as usize, + from_bytes(&e.sender_address)?, + ) + } + Some(mempool_error::Error::EntityThrottled(e)) => MempoolError::EntityThrottled( + (&e.entity.context("should have entity in error")?).try_into()?, + ), + Some(mempool_error::Error::DiscardedOnInsert(_)) => MempoolError::DiscardedOnInsert, + Some(mempool_error::Error::PrecheckViolation(e)) => { + MempoolError::PrecheckViolation(e.try_into()?) + } + Some(mempool_error::Error::SimulationViolation(e)) => { + MempoolError::SimulationViolation(e.try_into()?) + } + Some(mempool_error::Error::InvalidSignature(_)) => MempoolError::InvalidSignature, + Some(mempool_error::Error::UnsupportedAggregator(e)) => { + MempoolError::UnsupportedAggregator(from_bytes(&e.aggregator_address)?) + } + None => MempoolError::Other(anyhow::anyhow!("unknown proto mempool error")), + }) + } +} + +impl From for ProtoMempoolError { + fn from(value: MempoolError) -> Self { + match value { + MempoolError::Other(e) => ProtoMempoolError { + error: Some(mempool_error::Error::Internal(e.to_string())), + }, + MempoolError::ReplacementUnderpriced(fee, priority_fee) => ProtoMempoolError { + error: Some(mempool_error::Error::ReplacementUnderpriced( + ReplacementUnderpricedError { + current_fee: to_le_bytes(fee), + current_priority_fee: to_le_bytes(priority_fee), + }, + )), + }, + MempoolError::MaxOperationsReached(ops, addr) => ProtoMempoolError { + error: Some(mempool_error::Error::MaxOperationsReached( + MaxOperationsReachedError { + num_ops: ops as u64, + sender_address: addr.as_bytes().to_vec(), + }, + )), + }, + MempoolError::EntityThrottled(entity) => ProtoMempoolError { + error: Some(mempool_error::Error::EntityThrottled( + EntityThrottledError { + entity: Some((&entity).into()), + }, + )), + }, + MempoolError::DiscardedOnInsert => ProtoMempoolError { + error: Some(mempool_error::Error::DiscardedOnInsert( + DiscardedOnInsertError {}, + )), + }, + MempoolError::PrecheckViolation(violation) => ProtoMempoolError { + error: Some(mempool_error::Error::PrecheckViolation(violation.into())), + }, + MempoolError::SimulationViolation(violation) => ProtoMempoolError { + error: Some(mempool_error::Error::SimulationViolation(violation.into())), + }, + MempoolError::InvalidSignature => ProtoMempoolError { + error: Some(mempool_error::Error::InvalidSignature( + InvalidSignatureError {}, + )), + }, + MempoolError::UnsupportedAggregator(agg) => ProtoMempoolError { + error: Some(mempool_error::Error::UnsupportedAggregator( + UnsupportedAggregatorError { + aggregator_address: agg.as_bytes().to_vec(), + }, + )), + }, + } + } +} + +impl From for ProtoPrecheckViolationError { + fn from(value: PrecheckViolation) -> Self { + match value { + PrecheckViolation::InitCodeTooShort(length) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::InitCodeTooShort( + InitCodeTooShort { + length: length as u64, + }, + )), + }, + PrecheckViolation::SenderIsNotContractAndNoInitCode(addr) => { + ProtoPrecheckViolationError { + violation: Some( + precheck_violation_error::Violation::SenderIsNotContractAndNoInitCode( + SenderIsNotContractAndNoInitCode { + sender_address: addr.as_bytes().to_vec(), + }, + ), + ), + } + } + PrecheckViolation::ExistingSenderWithInitCode(addr) => ProtoPrecheckViolationError { + violation: Some( + precheck_violation_error::Violation::ExistingSenderWithInitCode( + ExistingSenderWithInitCode { + sender_address: addr.as_bytes().to_vec(), + }, + ), + ), + }, + PrecheckViolation::FactoryIsNotContract(addr) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::FactoryIsNotContract( + FactoryIsNotContract { + factory_address: addr.as_bytes().to_vec(), + }, + )), + }, + PrecheckViolation::VerificationGasLimitTooHigh(actual, max) => { + ProtoPrecheckViolationError { + violation: Some( + precheck_violation_error::Violation::VerificationGasLimitTooHigh( + VerificationGasLimitTooHigh { + actual_gas: to_le_bytes(actual), + max_gas: to_le_bytes(max), + }, + ), + ), + } + } + PrecheckViolation::PreVerificationGasTooLow(actual, min) => { + ProtoPrecheckViolationError { + violation: Some( + precheck_violation_error::Violation::PreVerificationGasTooLow( + PreVerificationGasTooLow { + actual_gas: to_le_bytes(actual), + min_gas: to_le_bytes(min), + }, + ), + ), + } + } + PrecheckViolation::PaymasterTooShort(length) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::PaymasterTooShort( + PaymasterTooShort { + length: length as u64, + }, + )), + }, + PrecheckViolation::PaymasterIsNotContract(addr) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::PaymasterIsNotContract( + PaymasterIsNotContract { + paymaster_address: addr.as_bytes().to_vec(), + }, + )), + }, + PrecheckViolation::PaymasterDepositTooLow(actual, min) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::PaymasterDepositTooLow( + PaymasterDepositTooLow { + actual_deposit: to_le_bytes(actual), + min_deposit: to_le_bytes(min), + }, + )), + }, + PrecheckViolation::SenderFundsTooLow(actual, min) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::SenderFundsTooLow( + SenderFundsTooLow { + actual_funds: to_le_bytes(actual), + min_funds: to_le_bytes(min), + }, + )), + }, + PrecheckViolation::MaxFeePerGasTooLow(actual, min) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::MaxFeePerGasTooLow( + MaxFeePerGasTooLow { + actual_fee: to_le_bytes(actual), + min_fee: to_le_bytes(min), + }, + )), + }, + PrecheckViolation::MaxPriorityFeePerGasTooLow(actual, min) => { + ProtoPrecheckViolationError { + violation: Some( + precheck_violation_error::Violation::MaxPriorityFeePerGasTooLow( + MaxPriorityFeePerGasTooLow { + actual_fee: to_le_bytes(actual), + min_fee: to_le_bytes(min), + }, + ), + ), + } + } + PrecheckViolation::CallGasLimitTooLow(actual, min) => ProtoPrecheckViolationError { + violation: Some(precheck_violation_error::Violation::CallGasLimitTooLow( + CallGasLimitTooLow { + actual_gas_limit: to_le_bytes(actual), + min_gas_limit: to_le_bytes(min), + }, + )), + }, + } + } +} + +impl TryFrom for PrecheckViolation { + type Error = anyhow::Error; + + fn try_from(value: ProtoPrecheckViolationError) -> Result { + Ok(match value.violation { + Some(precheck_violation_error::Violation::InitCodeTooShort(e)) => { + PrecheckViolation::InitCodeTooShort(e.length as usize) + } + Some(precheck_violation_error::Violation::SenderIsNotContractAndNoInitCode(e)) => { + PrecheckViolation::SenderIsNotContractAndNoInitCode(from_bytes(&e.sender_address)?) + } + Some(precheck_violation_error::Violation::ExistingSenderWithInitCode(e)) => { + PrecheckViolation::ExistingSenderWithInitCode(from_bytes(&e.sender_address)?) + } + Some(precheck_violation_error::Violation::FactoryIsNotContract(e)) => { + PrecheckViolation::FactoryIsNotContract(from_bytes(&e.factory_address)?) + } + Some(precheck_violation_error::Violation::VerificationGasLimitTooHigh(e)) => { + PrecheckViolation::VerificationGasLimitTooHigh( + from_bytes(&e.actual_gas)?, + from_bytes(&e.max_gas)?, + ) + } + Some(precheck_violation_error::Violation::PreVerificationGasTooLow(e)) => { + PrecheckViolation::PreVerificationGasTooLow( + from_bytes(&e.actual_gas)?, + from_bytes(&e.min_gas)?, + ) + } + Some(precheck_violation_error::Violation::PaymasterTooShort(e)) => { + PrecheckViolation::PaymasterTooShort(e.length as usize) + } + Some(precheck_violation_error::Violation::PaymasterIsNotContract(e)) => { + PrecheckViolation::PaymasterIsNotContract(from_bytes(&e.paymaster_address)?) + } + Some(precheck_violation_error::Violation::PaymasterDepositTooLow(e)) => { + PrecheckViolation::PaymasterDepositTooLow( + from_bytes(&e.actual_deposit)?, + from_bytes(&e.min_deposit)?, + ) + } + Some(precheck_violation_error::Violation::SenderFundsTooLow(e)) => { + PrecheckViolation::SenderFundsTooLow( + from_bytes(&e.actual_funds)?, + from_bytes(&e.min_funds)?, + ) + } + Some(precheck_violation_error::Violation::MaxFeePerGasTooLow(e)) => { + PrecheckViolation::MaxFeePerGasTooLow( + from_bytes(&e.actual_fee)?, + from_bytes(&e.min_fee)?, + ) + } + Some(precheck_violation_error::Violation::MaxPriorityFeePerGasTooLow(e)) => { + PrecheckViolation::MaxPriorityFeePerGasTooLow( + from_bytes(&e.actual_fee)?, + from_bytes(&e.min_fee)?, + ) + } + Some(precheck_violation_error::Violation::CallGasLimitTooLow(e)) => { + PrecheckViolation::CallGasLimitTooLow( + from_bytes(&e.actual_gas_limit)?, + from_bytes(&e.min_gas_limit)?, + ) + } + None => { + bail!("unknown proto mempool precheck violation") + } + }) + } +} + +impl From for ProtoSimulationViolationError { + fn from(value: SimulationViolation) -> Self { + match value { + SimulationViolation::UnintendedRevertWithMessage(et, reason, maybe_address) => { + ProtoSimulationViolationError { + violation: Some( + simulation_violation_error::Violation::UnintendedRevertWithMessage( + UnintendedRevertWithMessage { + entity: Some(Entity { + kind: EntityType::from(et) as i32, + address: maybe_address + .map_or(vec![], |addr| addr.as_bytes().to_vec()), + }), + reason, + }, + ), + ), + } + } + SimulationViolation::UsedForbiddenOpcode(entity, addr, opcode) => { + ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::UsedForbiddenOpcode( + UsedForbiddenOpcode { + entity: Some((&entity).into()), + contract_address: addr.as_bytes().to_vec(), + opcode: opcode.0 as u32, + }, + )), + } + } + SimulationViolation::UsedForbiddenPrecompile( + entity, + contract_addr, + precompile_addr, + ) => ProtoSimulationViolationError { + violation: Some( + simulation_violation_error::Violation::UsedForbiddenPrecompile( + UsedForbiddenPrecompile { + entity: Some((&entity).into()), + contract_address: contract_addr.as_bytes().to_vec(), + precompile_address: precompile_addr.as_bytes().to_vec(), + }, + ), + ), + }, + SimulationViolation::FactoryCalledCreate2Twice(addr) => ProtoSimulationViolationError { + violation: Some( + simulation_violation_error::Violation::FactoryCalledCreate2Twice( + FactoryCalledCreate2Twice { + factory_address: addr.as_bytes().to_vec(), + }, + ), + ), + }, + SimulationViolation::InvalidStorageAccess(entity, slot) => { + ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::InvalidStorageAccess( + InvalidStorageAccess { + entity: Some((&entity).into()), + contract_address: slot.address.as_bytes().to_vec(), + slot: to_le_bytes(slot.slot), + }, + )), + } + } + SimulationViolation::NotStaked(entity, min_stake, min_unstake_delay) => { + ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::NotStaked( + NotStaked { + entity: Some((&entity).into()), + min_stake: to_le_bytes(min_stake), + min_unstake_delay: to_le_bytes(min_unstake_delay), + }, + )), + } + } + SimulationViolation::UnintendedRevert(et) => ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::UnintendedRevert( + UnintendedRevert { + entity_type: EntityType::from(et) as i32, + }, + )), + }, + SimulationViolation::DidNotRevert => ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::DidNotRevert( + DidNotRevert {}, + )), + }, + SimulationViolation::WrongNumberOfPhases(num_phases) => ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::WrongNumberOfPhases( + WrongNumberOfPhases { num_phases }, + )), + }, + SimulationViolation::CallHadValue(entity) => ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::CallHadValue( + CallHadValue { + entity: Some((&entity).into()), + }, + )), + }, + SimulationViolation::OutOfGas(entity) => ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::OutOfGas(OutOfGas { + entity: Some((&entity).into()), + })), + }, + SimulationViolation::AccessedUndeployedContract(entity, contract_addr) => { + ProtoSimulationViolationError { + violation: Some( + simulation_violation_error::Violation::AccessedUndeployedContract( + AccessedUndeployedContract { + entity: Some((&entity).into()), + contract_address: contract_addr.as_bytes().to_vec(), + }, + ), + ), + } + } + SimulationViolation::CalledBannedEntryPointMethod(entity) => { + ProtoSimulationViolationError { + violation: Some( + simulation_violation_error::Violation::CalledBannedEntryPointMethod( + CalledBannedEntryPointMethod { + entity: Some((&entity).into()), + }, + ), + ), + } + } + SimulationViolation::CodeHashChanged => ProtoSimulationViolationError { + violation: Some(simulation_violation_error::Violation::CodeHashChanged( + CodeHashChanged {}, + )), + }, + SimulationViolation::AggregatorValidationFailed => ProtoSimulationViolationError { + violation: Some( + simulation_violation_error::Violation::AggregatorValidationFailed( + AggregatorValidationFailed {}, + ), + ), + }, + } + } +} + +impl TryFrom for SimulationViolation { + type Error = anyhow::Error; + + fn try_from(value: ProtoSimulationViolationError) -> Result { + Ok(match value.violation { + Some(simulation_violation_error::Violation::UnintendedRevertWithMessage(e)) => { + let entity = e.entity.context("should have entity in error")?; + let addr = if entity.address.is_empty() { + None + } else { + Some(from_bytes(&entity.address)?) + }; + + SimulationViolation::UnintendedRevertWithMessage( + crate::common::types::EntityType::try_from( + EntityType::from_i32(entity.kind).context("unknown entity type")?, + )?, + e.reason, + addr, + ) + } + Some(simulation_violation_error::Violation::UsedForbiddenOpcode(e)) => { + SimulationViolation::UsedForbiddenOpcode( + (&e.entity.context("should have entity in error")?).try_into()?, + from_bytes(&e.contract_address)?, + crate::common::simulation::ViolationOpCode(Opcode::try_from(e.opcode as u8)?), + ) + } + Some(simulation_violation_error::Violation::UsedForbiddenPrecompile(e)) => { + SimulationViolation::UsedForbiddenPrecompile( + (&e.entity.context("should have entity in error")?).try_into()?, + from_bytes(&e.contract_address)?, + from_bytes(&e.precompile_address)?, + ) + } + Some(simulation_violation_error::Violation::FactoryCalledCreate2Twice(e)) => { + SimulationViolation::FactoryCalledCreate2Twice(from_bytes(&e.factory_address)?) + } + Some(simulation_violation_error::Violation::InvalidStorageAccess(e)) => { + SimulationViolation::InvalidStorageAccess( + (&e.entity.context("should have entity in error")?).try_into()?, + StorageSlot { + address: from_bytes(&e.contract_address)?, + slot: from_bytes(&e.slot)?, + }, + ) + } + Some(simulation_violation_error::Violation::NotStaked(e)) => { + SimulationViolation::NotStaked( + (&e.entity.context("should have entity in error")?).try_into()?, + from_bytes(&e.min_stake)?, + from_bytes(&e.min_unstake_delay)?, + ) + } + Some(simulation_violation_error::Violation::UnintendedRevert(e)) => { + SimulationViolation::UnintendedRevert(crate::common::types::EntityType::try_from( + EntityType::from_i32(e.entity_type).context("unknown entity type")?, + )?) + } + Some(simulation_violation_error::Violation::DidNotRevert(_)) => { + SimulationViolation::DidNotRevert + } + Some(simulation_violation_error::Violation::WrongNumberOfPhases(e)) => { + SimulationViolation::WrongNumberOfPhases(e.num_phases) + } + Some(simulation_violation_error::Violation::CallHadValue(e)) => { + SimulationViolation::CallHadValue( + (&e.entity.context("should have entity in error")?).try_into()?, + ) + } + Some(simulation_violation_error::Violation::OutOfGas(e)) => { + SimulationViolation::OutOfGas( + (&e.entity.context("should have entity in error")?).try_into()?, + ) + } + Some(simulation_violation_error::Violation::AccessedUndeployedContract(e)) => { + SimulationViolation::AccessedUndeployedContract( + (&e.entity.context("should have entity in error")?).try_into()?, + from_bytes(&e.contract_address)?, + ) + } + Some(simulation_violation_error::Violation::CalledBannedEntryPointMethod(e)) => { + SimulationViolation::CalledBannedEntryPointMethod( + (&e.entity.context("should have entity in error")?).try_into()?, + ) + } + Some(simulation_violation_error::Violation::CodeHashChanged(_)) => { + SimulationViolation::CodeHashChanged + } + Some(simulation_violation_error::Violation::AggregatorValidationFailed(_)) => { + SimulationViolation::AggregatorValidationFailed + } + None => { + bail!("unknown proto mempool simulation violation") + } + }) + } +} diff --git a/src/op_pool/mempool/error.rs b/src/op_pool/mempool/error.rs index 640188cf9..3d6d1e695 100644 --- a/src/op_pool/mempool/error.rs +++ b/src/op_pool/mempool/error.rs @@ -1,6 +1,10 @@ use ethers::{abi::Address, types::U256}; -use crate::common::types::Entity; +use crate::common::{ + precheck::{PrecheckError, PrecheckViolation}, + simulation::{SimulationError, SimulationViolation}, + types::Entity, +}; /// Mempool result type. pub type MempoolResult = std::result::Result; @@ -23,4 +27,40 @@ pub enum MempoolError { EntityThrottled(Entity), #[error("Operation was discarded on inserting")] DiscardedOnInsert, + #[error("Operation violation during precheck {0}")] + PrecheckViolation(PrecheckViolation), + #[error("Operation violation during simulation {0}")] + SimulationViolation(SimulationViolation), + #[error("Invalid signature")] + InvalidSignature, + #[error("Unsupported aggregator {0}")] + UnsupportedAggregator(Address), +} + +impl From for MempoolError { + fn from(mut error: SimulationError) -> Self { + let SimulationError::Violations(violations) = &mut error else { + return Self::Other(error.into()); + }; + + let Some(violation) = violations.iter().min() else { + return Self::Other(error.into()); + }; + + Self::SimulationViolation(violation.clone()) + } +} + +impl From for MempoolError { + fn from(mut error: PrecheckError) -> Self { + let PrecheckError::Violations(violations) = &mut error else { + return Self::Other(error.into()); + }; + + let Some(violation) = violations.iter().min() else { + return Self::Other(error.into()); + }; + + Self::PrecheckViolation(violation.clone()) + } } diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index a393be327..a02f11237 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -3,20 +3,26 @@ mod pool; mod size; pub mod uo_pool; -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use ethers::types::{Address, H256}; use strum::IntoEnumIterator; +use tonic::async_trait; use self::error::MempoolResult; -use super::event::NewBlockEvent; +use super::{event::NewBlockEvent, reputation::Reputation}; use crate::common::{ - protos::op_pool::Reputation, + mempool::MempoolConfig, + precheck, simulation, types::{Entity, EntityType, UserOperation, ValidTimeRange}, }; +#[async_trait] /// In-memory operation pool -pub trait Mempool: Send + Sync { +pub trait Mempool: Send + Sync + 'static { /// Returns the entry point address this pool targets. fn entry_point(&self) -> Address; @@ -25,21 +31,12 @@ pub trait Mempool: Send + Sync { /// Pool is updated according to the new blocks events. fn on_new_block(&self, event: &NewBlockEvent); - /// Adds a validated user operation to the pool. - /// - /// Adds a user operation to the pool that was submitted via a local - /// RPC call and was validated before submission. - fn add_operation(&self, origin: OperationOrigin, op: PoolOperation) -> MempoolResult; - - /// Adds multiple validated user operations to the pool. - /// - /// Adds multiple user operations to the pool that were discovered - /// via the P2P gossip protocol. - fn add_operations( + /// Adds a user operation to the pool + async fn add_operation( &self, origin: OperationOrigin, - operations: impl IntoIterator, - ) -> Vec>; + op: UserOperation, + ) -> MempoolResult; /// Removes a set of operations from the pool. fn remove_operations<'a>(&self, hashes: impl IntoIterator); @@ -87,6 +84,12 @@ pub struct PoolConfig { pub blocklist: Option>, /// Operations that are allways allowed in the mempool, regardless of reputation pub allowlist: Option>, + /// Settings for precheck validation + pub precheck_settings: precheck::Settings, + /// Settings for simulation validation + pub sim_settings: simulation::Settings, + /// Configuration for the mempool channels, by channel ID + pub mempool_channel_configs: HashMap, } /// Origin of an operation. diff --git a/src/op_pool/mempool/pool.rs b/src/op_pool/mempool/pool.rs index 231866b31..da11a88af 100644 --- a/src/op_pool/mempool/pool.rs +++ b/src/op_pool/mempool/pool.rs @@ -52,21 +52,26 @@ impl PoolInner { } } - pub fn add_operation(&mut self, op: PoolOperation) -> MempoolResult { - // Check for replacement by ID - if let Some(pool_op) = self.by_id.get(&op.uo.id()) { - if op.uo.max_fee_per_gas > u128::MAX.into() - || op.uo.max_priority_fee_per_gas > u128::MAX.into() + pub fn check_replacement_fees(&self, op: &UserOperation) -> MempoolResult<()> { + if let Some(pool_op) = self.by_id.get(&op.id()) { + let (replacement_priority_fee, replacement_fee) = + self.get_min_replacement_fees(pool_op.uo()); + + if op.max_priority_fee_per_gas < replacement_priority_fee + || op.max_fee_per_gas < replacement_fee { - // TODO(danc): we can likely filter out operations with much smaller fees - // based on the maximum gas limit of the block. Using this for now. - return Err(anyhow::anyhow!( - "Fee is too high: max_fee_per_gas={}, max_priority_fee_per_gas={}", - op.uo.max_fee_per_gas, - op.uo.max_priority_fee_per_gas - ))?; + return Err(MempoolError::ReplacementUnderpriced( + replacement_priority_fee, + replacement_fee, + )); } + } + Ok(()) + } + pub fn add_operation(&mut self, op: PoolOperation) -> MempoolResult { + // Check for replacement by ID + if let Some(pool_op) = self.by_id.get(&op.uo.id()) { let (replacement_priority_fee, replacement_fee) = self.get_min_replacement_fees(pool_op.uo()); @@ -130,16 +135,6 @@ impl PoolInner { Ok(hash) } - pub fn add_operations( - &mut self, - operations: impl IntoIterator, - ) -> Vec> { - operations - .into_iter() - .map(|op| self.add_operation(op)) - .collect() - } - pub fn best_operations(&self) -> impl Iterator> { self.best.clone().into_iter().map(|v| v.po) } @@ -280,6 +275,7 @@ impl PartialEq for OrderedPoolOperation { #[cfg(test)] mod tests { use super::*; + use crate::common::{precheck, simulation}; #[test] fn add_single_op() { @@ -572,6 +568,9 @@ mod tests { max_size_of_pool_bytes: 20 * size_of_op(), blocklist: None, allowlist: None, + precheck_settings: precheck::Settings::default(), + sim_settings: simulation::Settings::default(), + mempool_channel_configs: HashMap::new(), } } diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index 4c1ba6d5b..1940f16bd 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -7,6 +7,7 @@ use ethers::types::{Address, H256}; use parking_lot::RwLock; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; +use tonic::async_trait; use super::{ error::{MempoolError, MempoolResult}, @@ -16,10 +17,14 @@ use super::{ use crate::{ common::{ contracts::i_entry_point::IEntryPointEvents, - protos::op_pool::{Reputation, ReputationStatus}, - types::Entity, + precheck::Prechecker, + simulation::Simulator, + types::{Entity, UserOperation}, + }, + op_pool::{ + event::NewBlockEvent, + reputation::{Reputation, ReputationManager, ReputationStatus}, }, - op_pool::{event::NewBlockEvent, reputation::ReputationManager}, }; /// The number of blocks that a throttled operation is allowed to be in the mempool @@ -30,10 +35,12 @@ const THROTTLED_OPS_BLOCK_LIMIT: u64 = 10; /// Wrapper around a pool object that implements thread-safety /// via a RwLock. Safe to call from multiple threads. Methods /// block on write locks. -pub struct UoPool { +pub struct UoPool { entry_point: Address, reputation: Arc, state: RwLock, + prechecker: P, + simulator: S, } struct UoPoolState { @@ -42,11 +49,13 @@ struct UoPoolState { block_number: u64, } -impl UoPool +impl UoPool where R: ReputationManager, + P: Prechecker, + S: Simulator, { - pub fn new(args: PoolConfig, reputation: Arc) -> Self { + pub fn new(args: PoolConfig, reputation: Arc, prechecker: P, simulator: S) -> Self { Self { entry_point: args.entry_point, reputation, @@ -55,6 +64,8 @@ where throttled_ops: HashMap::new(), block_number: 0, }), + prechecker, + simulator, } } @@ -79,9 +90,12 @@ where } } -impl Mempool for UoPool +#[async_trait] +impl Mempool for UoPool where R: ReputationManager, + P: Prechecker + 'static, + S: Simulator + 'static, { fn entry_point(&self) -> Address { self.entry_point @@ -116,7 +130,6 @@ where to_remove.insert(*hash); } } - for hash in to_remove { state.pool.remove_operation_by_hash(hash); state.throttled_ops.remove(&hash); @@ -125,7 +138,15 @@ where state.block_number = new_block_number; } - fn add_operation(&self, _origin: OperationOrigin, op: PoolOperation) -> MempoolResult { + async fn add_operation( + &self, + _origin: OperationOrigin, + op: UserOperation, + ) -> MempoolResult { + // TODO(danc) aggregator throttling is not implemented + // TODO(danc) catch ops with aggregators prior to simulation + + // Check if op has a throttled/banned entity let mut throttled = false; for e in op.entities() { match self.reputation.status(e.address) { @@ -140,14 +161,46 @@ where return Err(MempoolError::EntityThrottled(e)); } } + } + + // Check if op is replacing another op, and if so, ensure its fees are high enough + self.state.read().pool.check_replacement_fees(&op)?; + + // Prechecks + self.prechecker.check(&op).await?; + + // Simulation + let sim_result = self + .simulator + .simulate_validation(op.clone(), None, None) + .await?; + + if sim_result.signature_failed { + return Err(MempoolError::InvalidSignature); + } else if let Some(agg) = &sim_result.aggregator { + return Err(MempoolError::UnsupportedAggregator(agg.address)); + } - if op.is_staked(e.kind) { + let pool_op = PoolOperation { + uo: op, + aggregator: None, + valid_time_range: sim_result.valid_time_range, + expected_code_hash: sim_result.code_hash, + sim_block_hash: sim_result.block_hash, + entities_needing_stake: sim_result.entities_needing_stake, + account_is_staked: sim_result.account_is_staked, + }; + + // Update reputation + for e in pool_op.entities() { + if pool_op.is_staked(e.kind) { self.reputation.add_seen(e.address); } } + // Add op to pool let mut state = self.state.write(); - let hash = state.pool.add_operation(op)?; + let hash = state.pool.add_operation(pool_op)?; let bn = state.block_number; if throttled { state.throttled_ops.insert(hash, bn); @@ -156,14 +209,6 @@ where Ok(hash) } - fn add_operations( - &self, - _origin: OperationOrigin, - operations: impl IntoIterator, - ) -> Vec> { - self.state.write().pool.add_operations(operations) - } - fn remove_operations<'a>(&self, hashes: impl IntoIterator) { // hold the lock for the duration of the operation let mut state = self.state.write(); @@ -210,107 +255,121 @@ where } } -#[cfg(test)] -mod tests { - use super::*; - use crate::common::{ - protos::op_pool::{Reputation, ReputationStatus}, - types::UserOperation, - }; - - #[test] - fn add_single_op() { - let pool = create_pool(); - let op = create_op(Address::random(), 0, 0); - let hash = pool - .add_operation(OperationOrigin::Local, op.clone()) - .unwrap(); - check_ops(pool.best_operations(1), vec![op]); - pool.remove_operations(&vec![hash]); - assert_eq!(pool.best_operations(1), vec![]); - } - - #[test] - fn add_multiple_ops() { - let pool = create_pool(); - let ops = vec![ - create_op(Address::random(), 0, 3), - create_op(Address::random(), 0, 2), - create_op(Address::random(), 0, 1), - ]; - let res = pool.add_operations(OperationOrigin::Local, ops.clone()); - let hashes: Vec = res.into_iter().map(|r| r.unwrap()).collect(); - check_ops(pool.best_operations(3), ops); - pool.remove_operations(&hashes); - assert_eq!(pool.best_operations(3), vec![]); - } - - #[test] - fn clear() { - let pool = create_pool(); - let ops = vec![ - create_op(Address::random(), 0, 3), - create_op(Address::random(), 0, 2), - create_op(Address::random(), 0, 1), - ]; - pool.add_operations(OperationOrigin::Local, ops.clone()); - check_ops(pool.best_operations(3), ops); - pool.clear(); - assert_eq!(pool.best_operations(3), vec![]); - } - - fn create_pool() -> UoPool { - let args = PoolConfig { - entry_point: Address::random(), - chain_id: 1, - max_userops_per_sender: 16, - min_replacement_fee_increase_percentage: 10, - max_size_of_pool_bytes: 10000, - blocklist: None, - allowlist: None, - }; - UoPool::new(args, mock_reputation()) - } - - fn create_op(sender: Address, nonce: usize, max_fee_per_gas: usize) -> PoolOperation { - PoolOperation { - uo: UserOperation { - sender, - nonce: nonce.into(), - max_fee_per_gas: max_fee_per_gas.into(), - ..UserOperation::default() - }, - ..PoolOperation::default() - } - } - - fn check_ops(ops: Vec>, expected: Vec) { - assert_eq!(ops.len(), expected.len()); - for (actual, expected) in ops.into_iter().zip(expected) { - assert_eq!(actual.uo, expected.uo); - } - } - - fn mock_reputation() -> Arc { - Arc::new(MockReputationManager::default()) - } - - #[derive(Default, Clone)] - struct MockReputationManager; - - impl ReputationManager for MockReputationManager { - fn status(&self, _address: Address) -> ReputationStatus { - ReputationStatus::Ok - } - - fn add_seen(&self, _address: Address) {} - - fn add_included(&self, _address: Address) {} - - fn dump_reputation(&self) -> Vec { - vec![] - } - - fn set_reputation(&self, _address: Address, _ops_seen: u64, _ops_included: u64) {} - } -} +// TODO(danc): These tests are out of date and need to be updated using mocks + +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::common::types::UserOperation; + +// #[tokio::test] +// async fn add_single_op() { +// let pool = create_pool(); +// let op = create_op(Address::random(), 0, 0); +// let hash = pool +// .add_operation(OperationOrigin::Local, op.clone()) +// .await +// .unwrap(); +// check_ops(pool.best_operations(1), vec![op]); +// pool.remove_operations(&vec![hash]); +// assert_eq!(pool.best_operations(1), vec![]); +// } + +// #[tokio::test] +// async fn add_multiple_ops() { +// let pool = create_pool(); +// let ops = vec![ +// create_op(Address::random(), 0, 3), +// create_op(Address::random(), 0, 2), +// create_op(Address::random(), 0, 1), +// ]; +// let mut hashes = vec![]; +// for op in &ops { +// let hash = pool +// .add_operation(OperationOrigin::Local, op.clone()) +// .await +// .unwrap(); +// hashes.push(hash); +// } +// check_ops(pool.best_operations(3), ops); +// pool.remove_operations(&hashes); +// assert_eq!(pool.best_operations(3), vec![]); +// } + +// #[tokio::test] +// async fn clear() { +// let pool = create_pool(); +// let ops = vec![ +// create_op(Address::random(), 0, 3), +// create_op(Address::random(), 0, 2), +// create_op(Address::random(), 0, 1), +// ]; +// for op in &ops { +// let _ = pool +// .add_operation(OperationOrigin::Local, op.clone()) +// .await +// .unwrap(); +// } +// check_ops(pool.best_operations(3), ops); +// pool.clear(); +// assert_eq!(pool.best_operations(3), vec![]); +// } + +// fn create_pool() -> UoPool { +// let args = PoolConfig { +// entry_point: Address::random(), +// chain_id: 1, +// max_userops_per_sender: 16, +// min_replacement_fee_increase_percentage: 10, +// max_size_of_pool_bytes: 10000, +// blocklist: None, +// allowlist: None, +// precheck_settings: precheck::Settings::default(), +// sim_settings: simulation::Settings::default(), +// mempool_channel_configs: HashMap::new(), +// }; +// UoPool::new(args, mock_reputation()) +// } + +// fn create_op(sender: Address, nonce: usize, max_fee_per_gas: usize) -> PoolOperation { +// PoolOperation { +// uo: UserOperation { +// sender, +// nonce: nonce.into(), +// max_fee_per_gas: max_fee_per_gas.into(), +// ..UserOperation::default() +// }, +// ..PoolOperation::default() +// } +// } + +// fn check_ops(ops: Vec>, expected: Vec) { +// assert_eq!(ops.len(), expected.len()); +// for (actual, expected) in ops.into_iter().zip(expected) { +// assert_eq!(actual.uo, expected.uo); +// } +// } + +// fn mock_reputation() -> Arc { +// Arc::new(MockReputationManager::default()) +// } + +// #[derive(Default, Clone)] +// struct MockReputationManager; + +// impl ReputationManager for MockReputationManager { +// fn status(&self, _address: Address) -> ReputationStatus { +// ReputationStatus::Ok +// } + +// fn add_seen(&self, _address: Address) {} + +// fn add_included(&self, _address: Address) {} + +// fn dump_reputation(&self) -> Vec { +// vec![] +// } + +// fn set_reputation(&self, _address: Address, _ops_seen: u64, _ops_included: u64) {} +// } +// } diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 1fa837dce..cafbae818 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -1,3 +1,4 @@ +mod error; mod event; mod mempool; mod reputation; @@ -5,5 +6,5 @@ mod server; mod task; mod types; -pub use mempool::PoolConfig; +pub use mempool::{error::MempoolError, PoolConfig}; pub use task::*; diff --git a/src/op_pool/reputation.rs b/src/op_pool/reputation.rs index 3cf24070c..dc3e95950 100644 --- a/src/op_pool/reputation.rs +++ b/src/op_pool/reputation.rs @@ -7,13 +7,26 @@ use ethers::types::Address; use parking_lot::RwLock; use tokio::time::interval; -use crate::common::protos::op_pool::{Reputation, ReputationStatus}; +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum ReputationStatus { + Ok, + Throttled, + Banned, +} + +#[derive(Debug, Clone)] +pub struct Reputation { + pub address: Address, + pub status: ReputationStatus, + pub ops_seen: u64, + pub ops_included: u64, +} /// Reputation manager trait /// /// Interior mutability pattern used as ReputationManagers may /// need to be thread-safe. -pub trait ReputationManager: Send + Sync { +pub trait ReputationManager: Send + Sync + 'static { /// Called by mempool before returning operations to bundler fn status(&self, address: Address) -> ReputationStatus; @@ -79,8 +92,8 @@ impl ReputationManager for HourlyMovingAverageReputation { .counts .iter() .map(|(address, count)| Reputation { - address: address.as_bytes().to_vec(), - status: reputation.status(*address).into(), + address: *address, + status: reputation.status(*address), ops_seen: count.ops_seen, ops_included: count.ops_included, }) @@ -354,8 +367,7 @@ mod tests { for rep in reps { assert_eq!(rep.ops_seen, 1000); assert_eq!(rep.ops_included, 1000); - let a = Address::from_slice(&rep.address); - assert!(addrs.contains(&a)); + assert!(addrs.contains(&rep.address)); } } } diff --git a/src/op_pool/server.rs b/src/op_pool/server.rs index f8e281cfe..412cd9ca6 100644 --- a/src/op_pool/server.rs +++ b/src/op_pool/server.rs @@ -1,21 +1,17 @@ use std::{collections::HashMap, sync::Arc}; -use ethers::{ - types::{Address, H256}, - utils::to_checksum, -}; -use prost::Message; -use tonic::{async_trait, Code, Request, Response, Result, Status}; +use ethers::types::{Address, H256}; +use tonic::{async_trait, Request, Response, Result, Status}; -use super::mempool::{error::MempoolError, Mempool, OperationOrigin}; +use super::mempool::{Mempool, OperationOrigin}; use crate::common::protos::{ self, op_pool::{ - op_pool_server::OpPool, AddOpRequest, AddOpResponse, DebugClearStateRequest, - DebugClearStateResponse, DebugDumpMempoolRequest, DebugDumpMempoolResponse, - DebugDumpReputationRequest, DebugDumpReputationResponse, DebugSetReputationRequest, - DebugSetReputationResponse, ErrorInfo, ErrorMetadataKey, ErrorReason, GetOpsRequest, - GetOpsResponse, GetSupportedEntryPointsRequest, GetSupportedEntryPointsResponse, MempoolOp, + add_op_response, op_pool_server::OpPool, AddOpFailure, AddOpRequest, AddOpResponse, + AddOpSuccess, DebugClearStateRequest, DebugClearStateResponse, DebugDumpMempoolRequest, + DebugDumpMempoolResponse, DebugDumpReputationRequest, DebugDumpReputationResponse, + DebugSetReputationRequest, DebugSetReputationResponse, GetOpsRequest, GetOpsResponse, + GetSupportedEntryPointsRequest, GetSupportedEntryPointsResponse, MempoolOp, RemoveEntitiesRequest, RemoveEntitiesResponse, RemoveOpsRequest, RemoveOpsResponse, }, }; @@ -70,19 +66,26 @@ where let req = request.into_inner(); let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - let proto_op = req + let op = req .op - .ok_or_else(|| Status::invalid_argument("Operation is required in AddOpRequest"))?; - - let pool_op = proto_op + .ok_or_else(|| Status::invalid_argument("op is required in AddOpRequest"))? .try_into() - .map_err(|e| Status::invalid_argument(format!("Failed to parse operation: {e}")))?; + .map_err(|e| Status::invalid_argument(format!("Invalid operation: {e}")))?; - let hash = mempool.add_operation(OperationOrigin::Local, pool_op)?; + let resp = match mempool.add_operation(OperationOrigin::Local, op).await { + Ok(hash) => AddOpResponse { + result: Some(add_op_response::Result::Success(AddOpSuccess { + hash: hash.as_bytes().to_vec(), + })), + }, + Err(error) => AddOpResponse { + result: Some(add_op_response::Result::Failure(AddOpFailure { + error: Some(error.into()), + })), + }, + }; - Ok(Response::new(AddOpResponse { - hash: hash.as_bytes().to_vec(), - })) + Ok(Response::new(resp)) } async fn get_ops(&self, request: Request) -> Result> { @@ -199,74 +202,15 @@ where let reps = mempool.dump_reputation(); Ok(Response::new(DebugDumpReputationResponse { - reputations: reps, + reputations: reps.into_iter().map(Into::into).collect(), })) } } -impl From for Status { - fn from(e: MempoolError) -> Self { - let ei = match &e { - MempoolError::EntityThrottled(et) => ErrorInfo { - reason: ErrorReason::EntityThrottled.as_str_name().to_string(), - metadata: HashMap::from([(et.kind.to_string(), to_checksum(&et.address, None))]), - }, - MempoolError::MaxOperationsReached(_, _) => ErrorInfo { - reason: ErrorReason::OperationRejected.as_str_name().to_string(), - metadata: HashMap::new(), - }, - MempoolError::ReplacementUnderpriced(prio_fee, fee) => ErrorInfo { - reason: ErrorReason::ReplacementUnderpriced - .as_str_name() - .to_string(), - metadata: HashMap::from([ - ( - ErrorMetadataKey::CurrentMaxPriorityFeePerGas - .as_str_name() - .to_string(), - prio_fee.to_string(), - ), - ( - ErrorMetadataKey::CurrentMaxFeePerGas - .as_str_name() - .to_string(), - fee.to_string(), - ), - ]), - }, - MempoolError::DiscardedOnInsert => ErrorInfo { - reason: ErrorReason::OperationDiscardedOnInsert - .as_str_name() - .to_string(), - metadata: HashMap::new(), - }, - MempoolError::Other(_) => ErrorInfo { - reason: ErrorReason::Unspecified.as_str_name().to_string(), - metadata: HashMap::new(), - }, - }; - - let msg = e.to_string(); - let details = tonic_types::Status { - // code and message are not used by the client - code: 0, - message: "".into(), - details: vec![prost_types::Any { - type_url: "type.alchemy.com/op_pool.ErrorInfo".to_string(), - value: ei.encode_to_vec(), - }], - }; - - Status::with_details( - Code::FailedPrecondition, - msg, - details.encode_to_vec().into(), - ) - } -} - #[cfg(test)] mod tests { + use tonic::Code; + use super::*; const TEST_ADDRESS_ARR: [u8; 20] = [ @@ -275,13 +219,11 @@ mod tests { ]; use crate::{ - common::{ - protos::op_pool::{self, Reputation}, - types::Entity, - }, + common::{protos::op_pool::UserOperation, types, types::Entity}, op_pool::{ event::NewBlockEvent, mempool::{error::MempoolResult, PoolOperation}, + reputation::Reputation, }, }; @@ -327,7 +269,7 @@ mod tests { let oppool = given_oppool(); let request = Request::new(AddOpRequest { entry_point: TEST_ADDRESS_ARR.to_vec(), - op: Some(op_pool::MempoolOp::default()), + op: Some(UserOperation::default()), }); let result = oppool.add_op(request).await; @@ -356,6 +298,7 @@ mod tests { } } + #[async_trait] impl Mempool for MockMempool { fn entry_point(&self) -> Address { self.entry_point @@ -363,22 +306,14 @@ mod tests { fn on_new_block(&self, _event: &NewBlockEvent) {} - fn add_operation( + async fn add_operation( &self, _origin: OperationOrigin, - _opp: PoolOperation, + _opp: types::UserOperation, ) -> MempoolResult { Ok(H256::zero()) } - fn add_operations( - &self, - _origin: OperationOrigin, - _operations: impl IntoIterator, - ) -> Vec> { - vec![] - } - fn remove_operations<'a>(&self, _hashes: impl IntoIterator) {} fn remove_entity(&self, _entity: Entity) {} diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index ef5515d50..5e8e20b77 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -1,10 +1,18 @@ use std::{sync::Arc, time::Duration}; use anyhow::{bail, Context}; +use ethers::providers::{ + Http, HttpRateLimitRetryPolicy, JsonRpcClient, Provider, RetryClientBuilder, +}; use futures::future; use tokio::{task::JoinHandle, try_join}; use tokio_util::sync::CancellationToken; -use tonic::{async_trait, transport::Server}; +use tonic::{ + async_trait, + transport::{NamedService, Server}, +}; +use tonic_health::server::HealthReporter; +use url::Url; use super::{ event::EventProvider, @@ -12,9 +20,12 @@ use super::{ }; use crate::{ common::{ + contracts::i_entry_point::IEntryPoint, grpc::metrics::GrpcMetricsLayer, handle::{flatten_handle, Task}, + precheck::{Prechecker, PrecheckerImpl}, protos::op_pool::{op_pool_server::OpPoolServer, OP_POOL_FILE_DESCRIPTOR_SET}, + simulation::{Simulator, SimulatorImpl}, }, op_pool::{ event::{EventListener, HttpBlockProviderFactory, WsBlockProviderFactory}, @@ -29,7 +40,7 @@ pub struct Args { pub port: u16, pub host: String, pub ws_url: Option, - pub http_url: Option, + pub http_url: String, pub http_poll_interval: Duration, pub chain_id: u64, pub pool_configs: Vec, @@ -55,25 +66,36 @@ impl Task for PoolTask { let event_provider: Box = if let Some(ws_url) = &self.args.ws_url { let connection_factory = WsBlockProviderFactory::new(ws_url.to_owned(), 10); Box::new(EventListener::new(connection_factory, entry_points)) - } else if let Some(http_url) = &self.args.http_url { + } else { let connection_factory = HttpBlockProviderFactory::new( - http_url.to_owned(), + self.args.http_url.clone(), self.args.http_poll_interval, 10, ); Box::new(EventListener::new(connection_factory, entry_points)) - } else { - bail!("Either ws_url or http_url must be provided"); }; + let parsed_url = Url::parse(&self.args.http_url).context("Invalid RPC URL")?; + let http = Http::new(parsed_url); + // this retry policy will retry on 429ish errors OR connectivity errors + let client = RetryClientBuilder::default() + // these retries are if the server returns a 429 + .rate_limit_retries(10) + // these retries are if the connection is dubious + .timeout_retries(3) + .initial_backoff(Duration::from_millis(500)) + .build(http, Box::::default()); + let provider = Arc::new(Provider::new(client)); + // create mempools - let mut mempools = Vec::new(); + let mut mempools = vec![]; let mut mempool_handles = Vec::new(); for pool_config in &self.args.pool_configs { let (pool, handle) = PoolTask::create_mempool( pool_config, event_provider.as_ref(), shutdown_token.clone(), + provider.clone(), ) .await .context("should have created mempool")?; @@ -107,9 +129,7 @@ impl Task for PoolTask { // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - health_reporter - .set_serving::>>>() - .await; + Self::set_serving(&mut health_reporter, &op_pool_server).await; let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); let server_handle = tokio::spawn(async move { @@ -151,11 +171,15 @@ impl PoolTask { Box::new(self) } - async fn create_mempool( + async fn create_mempool( pool_config: &PoolConfig, event_provider: &dyn EventProvider, shutdown_token: CancellationToken, - ) -> anyhow::Result<(Arc>, JoinHandle<()>)> { + provider: Arc>, + ) -> anyhow::Result<( + Arc>, + JoinHandle<()>, + )> { let entry_point = pool_config.entry_point; // Reputation manager let reputation = Arc::new(HourlyMovingAverageReputation::new( @@ -167,8 +191,27 @@ impl PoolTask { let reputation_runner = Arc::clone(&reputation); tokio::spawn(async move { reputation_runner.run().await }); + let i_entry_point = IEntryPoint::new(pool_config.entry_point, Arc::clone(&provider)); + let prechecker = PrecheckerImpl::new( + Arc::clone(&provider), + pool_config.chain_id, + i_entry_point, + pool_config.precheck_settings, + ); + let simulator = SimulatorImpl::new( + Arc::clone(&provider), + pool_config.entry_point, + pool_config.sim_settings, + pool_config.mempool_channel_configs.clone(), + ); + // Mempool - let mp = Arc::new(UoPool::new(pool_config.clone(), Arc::clone(&reputation))); + let mp = Arc::new(UoPool::new( + pool_config.clone(), + Arc::clone(&reputation), + prechecker, + simulator, + )); // Start mempool let mempool_events = event_provider .subscribe_by_entrypoint(entry_point) @@ -181,4 +224,8 @@ impl PoolTask { Ok((mp, handle)) } + + async fn set_serving(reporter: &mut HealthReporter, _service: &S) { + reporter.set_serving::().await; + } } diff --git a/src/op_pool/types.rs b/src/op_pool/types.rs index 42a017ea0..dd777207e 100644 --- a/src/op_pool/types.rs +++ b/src/op_pool/types.rs @@ -1,11 +1,17 @@ use anyhow::Context; use ethers::types::{Address, H256}; -use super::mempool::PoolOperation; +use super::{ + mempool::PoolOperation, + reputation::{Reputation, ReputationStatus}, +}; use crate::common::{ protos::{ self, - op_pool::{EntityType as ProtoEntityType, MempoolOp, UserOperation}, + op_pool::{ + EntityType as ProtoEntityType, MempoolOp, Reputation as ProtoReputation, + ReputationStatus as ProtoReputationStatus, UserOperation, + }, ConversionError, }, types::ValidTimeRange, @@ -70,6 +76,27 @@ impl TryFrom for PoolOperation { } } +impl From for ProtoReputationStatus { + fn from(status: ReputationStatus) -> Self { + match status { + ReputationStatus::Ok => ProtoReputationStatus::Ok, + ReputationStatus::Throttled => ProtoReputationStatus::Throttled, + ReputationStatus::Banned => ProtoReputationStatus::Banned, + } + } +} + +impl From for ProtoReputation { + fn from(rep: Reputation) -> Self { + ProtoReputation { + address: rep.address.as_bytes().to_vec(), + status: ProtoReputationStatus::from(rep.status).into(), + ops_seen: rep.ops_seen, + ops_included: rep.ops_included, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/rpc/eth/error.rs b/src/rpc/eth/error.rs index fce8c2401..aaf87d150 100644 --- a/src/rpc/eth/error.rs +++ b/src/rpc/eth/error.rs @@ -2,16 +2,19 @@ use ethers::types::{Address, Opcode, U256}; use jsonrpsee::{ core::Error as RpcError, types::{ - error::{CallError, INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE, INVALID_REQUEST_CODE}, + error::{CallError, INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}, ErrorObject, }, }; use serde::Serialize; -use crate::common::{ - precheck::PrecheckError, - simulation::SimulationError, - types::{Entity, EntityType, Timestamp}, +use crate::{ + common::{ + precheck::PrecheckViolation, + simulation::SimulationViolation, + types::{Entity, EntityType, Timestamp}, + }, + op_pool::MempoolError, }; // Error codes borrowed from jsonrpsee @@ -33,6 +36,8 @@ const EXECUTION_REVERTED: i32 = -32521; /// Error returned by the RPC server eth namespace #[derive(Debug, thiserror::Error)] pub enum EthRpcError { + #[error(transparent)] + Internal(#[from] anyhow::Error), /// Invalid parameters #[error("{0}")] InvalidParams(String), @@ -71,13 +76,13 @@ pub enum EthRpcError { #[error("Invalid UserOp signature or paymaster signature")] SignatureCheckFailed, #[error("precheck failed: {0}")] - PrecheckFailed(PrecheckError), + PrecheckFailed(PrecheckViolation), #[error("validation simulation failed: {0}")] - SimulationFailed(SimulationError), - #[error(transparent)] - Internal(#[from] anyhow::Error), + SimulationFailed(SimulationViolation), #[error("{0}")] ExecutionReverted(String), + #[error("operation rejected by mempool: {0}")] + OperationRejected(String), } #[derive(Debug, Clone, Serialize)] @@ -134,11 +139,80 @@ pub struct UnsupportedAggregatorData { pub aggregator: Address, } +impl From for EthRpcError { + fn from(value: MempoolError) -> Self { + match value { + MempoolError::Other(e) => EthRpcError::Internal(e), + MempoolError::ReplacementUnderpriced(priority_fee, fee) => { + EthRpcError::ReplacementUnderpriced(ReplacementUnderpricedData { + current_max_priority_fee: priority_fee, + current_max_fee: fee, + }) + } + MempoolError::MaxOperationsReached(count, _) => EthRpcError::OperationRejected( + format!("max operations reached for sender {count} already in pool"), + ), + MempoolError::EntityThrottled(entity) => EthRpcError::ThrottledOrBanned(entity), + MempoolError::DiscardedOnInsert => { + EthRpcError::OperationRejected("discarded on insert".to_owned()) + } + MempoolError::PrecheckViolation(violation) => violation.into(), + MempoolError::SimulationViolation(violation) => violation.into(), + MempoolError::InvalidSignature => EthRpcError::SignatureCheckFailed, + MempoolError::UnsupportedAggregator(a) => { + EthRpcError::UnsupportedAggregator(UnsupportedAggregatorData { aggregator: a }) + } + } + } +} + +impl From for EthRpcError { + fn from(value: PrecheckViolation) -> Self { + Self::PrecheckFailed(value) + } +} + +impl From for EthRpcError { + fn from(value: SimulationViolation) -> Self { + match value { + SimulationViolation::UnintendedRevertWithMessage( + EntityType::Paymaster, + reason, + Some(paymaster), + ) => Self::PaymasterValidationRejected(PaymasterValidationRejectedData { + paymaster, + reason, + }), + SimulationViolation::UnintendedRevertWithMessage(_, reason, _) => { + Self::EntryPointValidationRejected(reason) + } + SimulationViolation::UsedForbiddenOpcode(entity, _, op) => { + Self::OpcodeViolation(entity.kind, op.0) + } + SimulationViolation::UsedForbiddenPrecompile(entity, _, precompile) => { + Self::PrecompileViolation(entity.kind, precompile) + } + SimulationViolation::FactoryCalledCreate2Twice(_) => { + Self::OpcodeViolation(EntityType::Factory, Opcode::CREATE2) + } + SimulationViolation::InvalidStorageAccess(entity, slot) => { + Self::InvalidStorageAccess(entity.kind, slot.address, slot.slot) + } + SimulationViolation::NotStaked(entity, min_stake, min_unstake_delay) => { + Self::StakeTooLow(StakeTooLowData::new(entity, min_stake, min_unstake_delay)) + } + SimulationViolation::AggregatorValidationFailed => Self::SignatureCheckFailed, + _ => Self::SimulationFailed(value), + } + } +} + impl From for RpcError { fn from(error: EthRpcError) -> Self { let msg = error.to_string(); match error { + EthRpcError::Internal(_) => rpc_err(INTERNAL_ERROR_CODE, msg), EthRpcError::InvalidParams(_) => rpc_err(INVALID_PARAMS_CODE, msg), EthRpcError::EntryPointValidationRejected(_) => { rpc_err(ENTRYPOINT_VALIDATION_REJECTED_CODE, msg) @@ -164,9 +238,9 @@ impl From for RpcError { } EthRpcError::SignatureCheckFailed => rpc_err(SIGNATURE_CHECK_FAILED_CODE, msg), EthRpcError::PrecheckFailed(_) => rpc_err(INVALID_PARAMS_CODE, msg), - EthRpcError::SimulationFailed(_) => rpc_err(INVALID_REQUEST_CODE, msg), - EthRpcError::Internal(_) => rpc_err(INTERNAL_ERROR_CODE, msg), + EthRpcError::SimulationFailed(_) => rpc_err(INVALID_PARAMS_CODE, msg), EthRpcError::ExecutionReverted(_) => rpc_err(EXECUTION_REVERTED, msg), + EthRpcError::OperationRejected(_) => rpc_err(INVALID_PARAMS_CODE, msg), } } } @@ -186,3 +260,13 @@ fn create_rpc_err(code: i32, msg: impl Into, data: Option< data, ))) } + +impl From for EthRpcError { + fn from(status: tonic::Status) -> Self { + EthRpcError::Internal(anyhow::anyhow!(format!( + "internal server error code: {} message: {}", + status.code(), + status.message() + ))) + } +} diff --git a/src/rpc/eth/mod.rs b/src/rpc/eth/mod.rs index 4803e1716..0b16def6a 100644 --- a/src/rpc/eth/mod.rs +++ b/src/rpc/eth/mod.rs @@ -2,9 +2,7 @@ mod error; pub mod estimation; use std::{ collections::{HashMap, VecDeque}, - str::FromStr, sync::Arc, - time::Duration, }; use anyhow::{anyhow, bail, Context}; @@ -14,20 +12,16 @@ use ethers::{ providers::{JsonRpcClient, Middleware, Provider}, types::{ Address, BlockNumber, Bytes, Filter, GethDebugBuiltInTracerType, GethDebugTracerType, - GethDebugTracingOptions, GethTrace, GethTraceFrame, Log, Opcode, TransactionReceipt, H256, - U256, U64, + GethDebugTracingOptions, GethTrace, GethTraceFrame, Log, TransactionReceipt, H256, U256, + U64, }, utils::to_checksum, }; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use prost::Message; -use tonic::{async_trait, transport::Channel, Status}; -use tracing::{debug, Level}; +use tonic::{async_trait, transport::Channel}; +use tracing::Level; -use self::error::{ - EthRpcError, OutOfTimeRangeData, PaymasterValidationRejectedData, ReplacementUnderpricedData, - StakeTooLowData, UnsupportedAggregatorData, -}; +use self::error::EthRpcError; use crate::{ common::{ context::LogWithContext, @@ -36,17 +30,10 @@ use crate::{ UserOperationRevertReasonFilter, }, eth::log_to_raw_log, - mempool::MempoolConfig, - precheck::{self, PrecheckError, Prechecker, PrecheckerImpl}, - protos::op_pool::{ - op_pool_client::OpPoolClient, AddOpRequest, EntityType as ProtoEntityType, ErrorInfo, - ErrorMetadataKey, ErrorReason, MempoolOp, - }, - simulation::{ - self, SimulationError, SimulationSuccess, SimulationViolation, Simulator, SimulatorImpl, - }, - types::{Entity, EntityType, EntryPointLike, Timestamp, UserOperation}, + protos::op_pool::{add_op_response, op_pool_client::OpPoolClient, AddOpRequest}, + types::UserOperation, }, + op_pool::MempoolError, rpc::{ estimation::{GasEstimationError, GasEstimator, GasEstimatorImpl}, GasEstimate, RichUserOperation, RpcUserOperation, UserOperationOptionalGas, @@ -87,12 +74,9 @@ pub trait EthApi { async fn chain_id(&self) -> RpcResult; } -// TODO: use ProviderLike and EntrypointLike +// TODO: use ProviderLike #[derive(Debug)] struct EntryPointContext { - entry_point: IEntryPoint>, - prechecker: PrecheckerImpl, IEntryPoint>>, - simulator: SimulatorImpl, gas_estimator: GasEstimatorImpl, IEntryPoint>>, } @@ -104,32 +88,12 @@ where address: Address, chain_id: u64, provider: Arc>, - precheck_settings: precheck::Settings, - sim_settings: simulation::Settings, estimator_settings: estimation::Settings, - mempool_configs: HashMap, ) -> Self { let entry_point = IEntryPoint::new(address, Arc::clone(&provider)); - let prechecker = PrecheckerImpl::new( - Arc::clone(&provider), - chain_id, - entry_point.clone(), - precheck_settings, - ); - let simulator = SimulatorImpl::new( - Arc::clone(&provider), - address, - sim_settings, - mempool_configs, - ); let gas_estimator = - GasEstimatorImpl::new(chain_id, provider, entry_point.clone(), estimator_settings); - Self { - entry_point, - prechecker, - simulator, - gas_estimator, - } + GasEstimatorImpl::new(chain_id, provider, entry_point, estimator_settings); + Self { gas_estimator } } } @@ -151,25 +115,14 @@ where entry_points: Vec
, chain_id: u64, op_pool_client: OpPoolClient, - precheck_settings: precheck::Settings, - sim_settings: simulation::Settings, estimation_settings: estimation::Settings, - mempool_configs: HashMap, ) -> Self { let contexts_by_entry_point = entry_points .iter() .map(|&a| { ( a, - EntryPointContext::new( - a, - chain_id, - Arc::clone(&provider), - precheck_settings, - sim_settings, - estimation_settings, - mempool_configs.clone(), - ), + EntryPointContext::new(a, chain_id, Arc::clone(&provider), estimation_settings), ) }) .collect(); @@ -347,8 +300,6 @@ where } } -const EXPIRATION_BUFFER: Duration = Duration::from_secs(30); - #[async_trait] impl EthApiServer for EthApi where @@ -367,82 +318,29 @@ where let op: UserOperation = op.into(); - let EntryPointContext { - entry_point, - prechecker, - simulator, - .. - } = self - .contexts_by_entry_point - .get(&entry_point) - .context("entry point should exist in the map")?; - - // 1. Validate op with simple prechecks - prechecker.check(&op).await.map_err(EthRpcError::from)?; - - // 2. Validate op with simulation - let SimulationSuccess { - valid_time_range, - code_hash, - aggregator, - signature_failed, - entities_needing_stake, - block_hash, - account_is_staked, - .. - } = simulator - .simulate_validation(op.clone(), None, None) - .await - .map_err(EthRpcError::from)?; - - if signature_failed { - Err(EthRpcError::SignatureCheckFailed)? - } else if let Some(agg) = &aggregator { - // TODO(danc): all aggregators are currently unsupported - Err(EthRpcError::UnsupportedAggregator( - UnsupportedAggregatorData { - aggregator: agg.address, - }, - ))? - } - - let now = Timestamp::now(); - let valid_after = valid_time_range.valid_after; - let valid_until = valid_time_range.valid_until; - if !valid_time_range.contains(now, EXPIRATION_BUFFER) { - Err(EthRpcError::OutOfTimeRange(OutOfTimeRangeData { - valid_after, - valid_until, - paymaster: op.paymaster(), - }))? - } - - // 3. send op the mempool let add_op_result = self .op_pool_client .clone() .add_op(AddOpRequest { - entry_point: entry_point.address().as_bytes().to_vec(), - op: Some(MempoolOp { - uo: Some((&op).into()), - aggregator: aggregator.unwrap_or_default().address.as_bytes().to_vec(), - valid_after: valid_after.seconds_since_epoch(), - valid_until: valid_until.seconds_since_epoch(), - expected_code_hash: code_hash.as_bytes().to_vec(), - sim_block_hash: block_hash.as_bytes().to_vec(), - entities_needing_stake: entities_needing_stake - .iter() - .map(|&e| ProtoEntityType::from(e).into()) - .collect(), - account_is_staked, - }), + entry_point: entry_point.as_bytes().to_vec(), + op: Some((&op).into()), }) .await .map_err(EthRpcError::from) .log_on_error_level(Level::DEBUG, "failed to add op to the mempool")?; - // 4. return the op hash - Ok(H256::from_slice(&add_op_result.into_inner().hash)) + match add_op_result.into_inner().result { + Some(add_op_response::Result::Success(s)) => Ok(H256::from_slice(&s.hash)), + Some(add_op_response::Result::Failure(f)) => { + Err(EthRpcError::from(MempoolError::try_from( + f.error.context("should have received error from op pool")?, + )?)) + .log_on_error_level(Level::DEBUG, "failed to add op to the mempool")? + } + None => Err(EthRpcError::Internal(anyhow!( + "should have received result from op pool" + )))?, + } } async fn estimate_user_operation_gas( @@ -611,181 +509,18 @@ where } } -impl From for EthRpcError { - fn from(value: PrecheckError) -> Self { - match &value { - PrecheckError::Violations(_) => Self::PrecheckFailed(value), - PrecheckError::Other(_) => Self::Internal(value.into()), - } - } -} - -impl From for EthRpcError { - fn from(mut value: SimulationError) -> Self { - debug!("Simulation error: {value:?}"); - - let SimulationError::Violations(violations) = &mut value else { - return Self::Internal(value.into()); - }; - - let Some(violation) = violations.iter().min() else { - return Self::Internal(value.into()); - }; - - match violation { - SimulationViolation::UnintendedRevertWithMessage( - EntityType::Paymaster, - reason, - Some(paymaster), - ) => Self::PaymasterValidationRejected(PaymasterValidationRejectedData { - paymaster: *paymaster, - reason: reason.to_string(), - }), - SimulationViolation::UnintendedRevertWithMessage(_, reason, _) => { - Self::EntryPointValidationRejected(reason.clone()) - } - SimulationViolation::UsedForbiddenOpcode(entity, _, op) => { - Self::OpcodeViolation(entity.kind, op.clone().0) - } - SimulationViolation::UsedForbiddenPrecompile(entity, _, precompile) => { - Self::PrecompileViolation(entity.kind, *precompile) - } - SimulationViolation::FactoryCalledCreate2Twice(_) => { - Self::OpcodeViolation(EntityType::Factory, Opcode::CREATE2) - } - SimulationViolation::InvalidStorageAccess(entity, slot) => { - Self::InvalidStorageAccess(entity.kind, slot.address, slot.slot) - } - SimulationViolation::NotStaked(entity, min_stake, min_unstake_delay) => { - Self::StakeTooLow(StakeTooLowData::new( - *entity, - *min_stake, - *min_unstake_delay, - )) - } - SimulationViolation::AggregatorValidationFailed => Self::SignatureCheckFailed, - _ => Self::SimulationFailed(value), - } - } -} - -impl TryFrom for ErrorInfo { - type Error = anyhow::Error; - - fn try_from(value: Status) -> Result { - let decoded_status = - tonic_types::Status::decode(value.details()).context("should have decoded status")?; - - // This could actually contain more error details, but we only use the first right now - let any = decoded_status - .details - .first() - .context("should have details")?; - - if any.type_url == "type.alchemy.com/op_pool.ErrorInfo" { - ErrorInfo::decode(&any.value[..]) - .context("should have decoded successfully into ErrorInfo") - } else { - bail!("Unknown type_url: {}", any.type_url); - } - } -} - -impl From for EthRpcError { - fn from(value: ErrorInfo) -> Self { - let ErrorInfo { reason, metadata } = value; - - if reason == ErrorReason::EntityThrottled.as_str_name() { - let (entity, address) = metadata.iter().next().unwrap(); - let Some(address) = Address::from_str(address).ok() else { - return anyhow!("should have valid address in ErrorInfo metadata").into(); - }; - let Some(entity) = EntityType::from_str(entity).ok() else { - return anyhow!("should be a valid Entity type in ErrorInfo metadata").into(); - }; - return EthRpcError::ThrottledOrBanned(Entity::new(entity, address)); - } else if reason == ErrorReason::ReplacementUnderpriced.as_str_name() { - let prio_fee = metadata - .get(ErrorMetadataKey::CurrentMaxPriorityFeePerGas.as_str_name()) - .and_then(|fee| fee.parse::().ok()) - .unwrap_or_default(); - let fee = metadata - .get(ErrorMetadataKey::CurrentMaxFeePerGas.as_str_name()) - .and_then(|fee| fee.parse::().ok()) - .unwrap_or_default(); - return EthRpcError::ReplacementUnderpriced(ReplacementUnderpricedData::new( - prio_fee, fee, - )); - } else if reason == ErrorReason::OperationDiscardedOnInsert.as_str_name() { - return anyhow!("operation rejected: mempool full try again with higher gas price") - .into(); - } - - anyhow!("operation rejected").into() - } -} - -impl From for EthRpcError { - fn from(status: Status) -> Self { - let status_details: anyhow::Result = status.try_into(); - let Ok(error_info) = status_details else { - return EthRpcError::Internal(status_details.unwrap_err()); - }; - - EthRpcError::from(error_info) - } -} - #[cfg(test)] mod tests { use ethers::{ providers::Http, types::{Log, TransactionReceipt}, - utils::{hex::ToHex, keccak256}, + utils::keccak256, }; use super::*; - use crate::common::protos::op_pool::ErrorReason; const UO_OP_TOPIC: &str = "user-op-event-topic"; - #[test] - fn test_throttled_or_banned_decode() { - let error_info = ErrorInfo { - reason: ErrorReason::EntityThrottled.as_str_name().to_string(), - metadata: HashMap::from([( - EntityType::Paymaster.to_string(), - Address::default().encode_hex(), - )]), - }; - - let details = tonic_types::Status { - code: 0, - message: "".to_string(), - details: vec![prost_types::Any { - type_url: "type.alchemy.com/op_pool.ErrorInfo".to_string(), - value: error_info.encode_to_vec(), - }], - }; - - let status = Status::with_details( - tonic::Code::Internal, - "error_message".to_string(), - details.encode_to_vec().into(), - ); - - let rpc_error: EthRpcError = status.into(); - - assert!( - matches!( - rpc_error, - EthRpcError::ThrottledOrBanned(ref data) if *data == Entity::paymaster(Address::default()) - ), - "{:?}", - rpc_error - ); - } - #[test] fn test_filter_receipt_logs_when_at_begining_of_list() { let reference_log = given_log(UO_OP_TOPIC, "moldy-hash"); diff --git a/src/rpc/task.rs b/src/rpc/task.rs index bc74bb4b9..659efb4e5 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -1,9 +1,9 @@ -use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::{ providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClientBuilder}, - types::{Address, H256}, + types::Address, }; use jsonrpsee::{ server::{middleware::proxy_get_request::ProxyGetRequestLayer, ServerBuilder}, @@ -23,11 +23,8 @@ use super::ApiNamespace; use crate::{ common::{ handle::Task, - mempool::MempoolConfig, - precheck, protos::{builder::builder_client::BuilderClient, op_pool::op_pool_client::OpPoolClient}, server::{self, format_socket_addr}, - simulation, }, rpc::{ debug::{DebugApi, DebugApiServer}, @@ -47,11 +44,8 @@ pub struct Args { pub chain_id: u64, pub api_namespaces: Vec, pub rpc_url: String, - pub precheck_settings: precheck::Settings, - pub sim_settings: simulation::Settings, pub estimation_settings: estimation::Settings, pub rpc_timeout: Duration, - pub mempool_configs: HashMap, } #[derive(Debug)] @@ -64,7 +58,6 @@ impl Task for RpcTask { async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?; tracing::info!("Starting rpc server on {}", addr); - tracing::info!("Mempool config: {:?}", self.args.mempool_configs); let mut module = RpcModule::new(()); @@ -120,10 +113,7 @@ impl Task for RpcTask { self.args.entry_points.clone(), self.args.chain_id, op_pool_client.clone(), - self.args.precheck_settings, - self.args.sim_settings, self.args.estimation_settings, - self.args.mempool_configs.clone(), ) .into_rpc(), )?,