diff --git a/crates/pool/src/chain.rs b/crates/pool/src/chain.rs index e4d542873..eb418671d 100644 --- a/crates/pool/src/chain.rs +++ b/crates/pool/src/chain.rs @@ -26,7 +26,9 @@ use ethers::{ use futures::future; use rundler_provider::Provider; use rundler_task::block_watcher; -use rundler_types::{contracts::i_entry_point::UserOperationEventFilter, UserOperationId}; +use rundler_types::{ + contracts::i_entry_point::UserOperationEventFilter, Timestamp, UserOperationId, +}; use tokio::{ select, sync::{broadcast, Semaphore}, @@ -57,6 +59,7 @@ pub(crate) struct Chain { pub struct ChainUpdate { pub latest_block_number: u64, pub latest_block_hash: H256, + pub latest_block_timestamp: Timestamp, /// Blocks before this number are no longer tracked in this `Chain`, so no /// further updates related to them will be sent. pub earliest_remembered_block_number: u64, @@ -93,6 +96,7 @@ pub(crate) struct Settings { struct BlockSummary { number: u64, hash: H256, + timestamp: Timestamp, parent_hash: H256, ops: Vec, } @@ -380,6 +384,7 @@ impl Chain

{ ChainUpdate { latest_block_number: latest_block.number, latest_block_hash: latest_block.hash, + latest_block_timestamp: latest_block.timestamp, earliest_remembered_block_number: self.blocks[0].number, reorg_depth, mined_ops, @@ -415,6 +420,7 @@ impl BlockSummary { .context("block number should be present")? .as_u64(), hash: block.hash.context("block hash should exist")?, + timestamp: block.timestamp.as_u64().into(), parent_hash: block.parent_hash, ops: Vec::new(), }) @@ -557,6 +563,7 @@ mod tests { ChainUpdate { latest_block_number: 3, latest_block_hash: hash(3), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 1, reorg_depth: 0, mined_ops: vec![fake_mined_op(103), fake_mined_op(104), fake_mined_op(105),], @@ -584,6 +591,7 @@ mod tests { ChainUpdate { latest_block_number: 4, latest_block_hash: hash(4), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 2, reorg_depth: 0, mined_ops: vec![fake_mined_op(106)], @@ -617,6 +625,7 @@ mod tests { ChainUpdate { latest_block_number: 4, latest_block_hash: hash(14), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 2, reorg_depth: 1, mined_ops: vec![fake_mined_op(112), fake_mined_op(113), fake_mined_op(114)], @@ -650,6 +659,7 @@ mod tests { ChainUpdate { latest_block_number: 2, latest_block_hash: hash(12), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 2, mined_ops: vec![fake_mined_op(111), fake_mined_op(112)], @@ -680,6 +690,7 @@ mod tests { ChainUpdate { latest_block_number: 1, latest_block_hash: hash(11), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 2, mined_ops: vec![fake_mined_op(111)], @@ -711,6 +722,7 @@ mod tests { ChainUpdate { latest_block_number: 3, latest_block_hash: hash(13), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 1, reorg_depth: 3, mined_ops: vec![fake_mined_op(111), fake_mined_op(112), fake_mined_op(113)], @@ -740,6 +752,7 @@ mod tests { ChainUpdate { latest_block_number: 6, latest_block_hash: hash(16), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 4, reorg_depth: 0, mined_ops: vec![fake_mined_op(104), fake_mined_op(105), fake_mined_op(106)], @@ -763,6 +776,7 @@ mod tests { ChainUpdate { latest_block_number: 1, latest_block_hash: hash(1), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![fake_mined_op(101), fake_mined_op(102), fake_mined_op(103),], diff --git a/crates/pool/src/emit.rs b/crates/pool/src/emit.rs index c8a73c6e3..520a1d4b3 100644 --- a/crates/pool/src/emit.rs +++ b/crates/pool/src/emit.rs @@ -113,6 +113,11 @@ pub enum OpRemovalReason { /// The removed entity entity: Entity, }, + /// Op was removed because it expired + Expired { + /// Op was valid until this timestamp + valid_until: Timestamp, + }, } impl EntitySummary { diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 0d74d41b9..21134c908 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -22,7 +22,7 @@ use ethers::{ abi::Address, types::{H256, U256}, }; -use rundler_types::{Entity, UserOperation, UserOperationId}; +use rundler_types::{Entity, Timestamp, UserOperation, UserOperationId}; use rundler_utils::math; use tracing::info; @@ -141,6 +141,25 @@ impl PoolInner { self.best.clone().into_iter().map(|v| v.po) } + /// Removes all operations using the given entity, returning the hashes of the removed operations. + /// + /// NOTE: This method is O(n) where n is the number of operations in the pool. + /// It should be called sparingly (e.g. when a block is mined). + pub(crate) fn remove_expired(&mut self, expire_before: Timestamp) -> Vec<(H256, Timestamp)> { + let mut expired = Vec::new(); + for (hash, op) in &self.by_hash { + if op.po.valid_time_range.valid_until < expire_before { + expired.push((*hash, op.po.valid_time_range.valid_until)); + } + } + + for (hash, _) in &expired { + self.remove_operation_by_hash(*hash); + } + + expired + } + pub(crate) fn address_count(&self, address: Address) -> usize { self.count_by_address.get(&address).copied().unwrap_or(0) } @@ -285,6 +304,7 @@ impl PoolInner { self.by_id.insert(pool_op.uo().id(), pool_op.clone()); self.best.insert(pool_op); + // TODO(danc): This silently drops UOs from the pool without reporting let removed = self .enforce_size() .context("should have succeeded in resizing the pool")?; @@ -789,6 +809,44 @@ mod tests { } } + #[test] + fn test_expired() { + let conf = conf(); + let mut pool = PoolInner::new(conf.clone()); + let sender = Address::random(); + let mut po1 = create_op(sender, 0, 10); + po1.valid_time_range.valid_until = Timestamp::from(1); + let _ = pool.add_operation(po1.clone()).unwrap(); + + let res = pool.remove_expired(Timestamp::from(2)); + assert_eq!(res.len(), 1); + assert_eq!(res[0].0, po1.uo.op_hash(conf.entry_point, conf.chain_id)); + assert_eq!(res[0].1, Timestamp::from(1)); + } + + #[test] + fn test_multiple_expired() { + let conf = conf(); + let mut pool = PoolInner::new(conf.clone()); + + let mut po1 = create_op(Address::random(), 0, 10); + po1.valid_time_range.valid_until = 5.into(); + let _ = pool.add_operation(po1.clone()).unwrap(); + + let mut po2 = create_op(Address::random(), 0, 10); + po2.valid_time_range.valid_until = 10.into(); + let _ = pool.add_operation(po2.clone()).unwrap(); + + let mut po3 = create_op(Address::random(), 0, 10); + po3.valid_time_range.valid_until = 9.into(); + let _ = pool.add_operation(po3.clone()).unwrap(); + + let res = pool.remove_expired(10.into()); + assert_eq!(res.len(), 2); + assert!(res.contains(&(po1.uo.op_hash(conf.entry_point, conf.chain_id), 5.into()))); + assert!(res.contains(&(po3.uo.op_hash(conf.entry_point, conf.chain_id), 9.into()))); + } + fn conf() -> PoolInnerConfig { PoolInnerConfig { entry_point: Address::random(), diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index ef312e745..b9b6891cb 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -180,7 +180,24 @@ where for hash in to_remove { state.pool.remove_operation_by_hash(hash); state.throttled_ops.remove(&hash); + self.emit(OpPoolEvent::RemovedOp { + op_hash: hash, + reason: OpRemovalReason::ThrottledAndOld { + added_at_block_number: state.block_number, + current_block_number: update.latest_block_number, + }, + }) + } + + // expire old UOs + let expired = state.pool.remove_expired(update.latest_block_timestamp); + for (hash, until) in expired { + self.emit(OpPoolEvent::RemovedOp { + op_hash: hash, + reason: OpRemovalReason::Expired { valid_until: until }, + }) } + state.block_number = update.latest_block_number; } @@ -452,7 +469,7 @@ mod tests { MockPrechecker, MockSimulator, PrecheckError, PrecheckSettings, PrecheckViolation, SimulationError, SimulationSettings, SimulationSuccess, SimulationViolation, }; - use rundler_types::{EntityType, GasFees}; + use rundler_types::{EntityType, GasFees, ValidTimeRange}; use super::*; use crate::chain::MinedOp; @@ -533,6 +550,7 @@ mod tests { pool.on_chain_update(&ChainUpdate { latest_block_number: 1, latest_block_hash: H256::random(), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![MinedOp { @@ -561,6 +579,7 @@ mod tests { pool.on_chain_update(&ChainUpdate { latest_block_number: 1, latest_block_hash: H256::random(), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![MinedOp { @@ -580,6 +599,7 @@ mod tests { pool.on_chain_update(&ChainUpdate { latest_block_number: 1, latest_block_hash: H256::random(), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![], @@ -607,6 +627,7 @@ mod tests { pool.on_chain_update(&ChainUpdate { latest_block_number: 1, latest_block_hash: H256::random(), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![MinedOp { @@ -643,6 +664,7 @@ mod tests { pool.on_chain_update(&ChainUpdate { latest_block_number: 1, latest_block_hash: H256::random(), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![MinedOp { @@ -710,6 +732,7 @@ mod tests { pool.on_chain_update(&ChainUpdate { latest_block_number: 1, latest_block_hash: H256::random(), + latest_block_timestamp: 0.into(), earliest_remembered_block_number: 0, reorg_depth: 0, mined_ops: vec![MinedOp { @@ -862,9 +885,35 @@ mod tests { check_ops(pool.best_operations(1, 0).unwrap(), vec![replacement]); } + #[tokio::test] + async fn test_expiry() { + let mut op = create_op(Address::random(), 0, 0); + op.valid_time_range = ValidTimeRange { + valid_after: 0.into(), + valid_until: 10.into(), + }; + let pool = create_pool(vec![op.clone()]); + + let _ = pool + .add_operation(OperationOrigin::Local, op.op.clone()) + .await + .unwrap(); + + check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op.clone()]); + + pool.on_chain_update(&ChainUpdate { + latest_block_timestamp: 11.into(), + ..ChainUpdate::default() + }) + .await; + + check_ops(pool.best_operations(1, 0).unwrap(), vec![]); + } + #[derive(Clone, Debug)] struct OpWithErrors { op: UserOperation, + valid_time_range: ValidTimeRange, precheck_error: Option, simulation_error: Option, staked: bool, @@ -901,6 +950,7 @@ mod tests { } else { Ok(SimulationSuccess { account_is_staked: op.staked, + valid_time_range: op.valid_time_range, ..SimulationSuccess::default() }) } @@ -948,6 +998,7 @@ mod tests { max_fee_per_gas: max_fee_per_gas.into(), ..UserOperation::default() }, + valid_time_range: ValidTimeRange::default(), precheck_error: None, simulation_error: None, staked: false, @@ -969,6 +1020,7 @@ mod tests { max_fee_per_gas: max_fee_per_gas.into(), ..UserOperation::default() }, + valid_time_range: ValidTimeRange::default(), precheck_error, simulation_error, staked,