Skip to content

Commit

Permalink
feat(pool): remove expired UOs from pool each block
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Jan 17, 2024
1 parent 8927cf5 commit cc4e3e6
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 3 deletions.
16 changes: 15 additions & 1 deletion crates/pool/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use ethers::{
use futures::future;
use rundler_provider::Provider;
use rundler_task::block_watcher;
use rundler_types::{contracts::i_entry_point::UserOperationEventFilter, UserOperationId};
use rundler_types::{
contracts::i_entry_point::UserOperationEventFilter, Timestamp, UserOperationId,
};
use tokio::{
select,
sync::{broadcast, Semaphore},
Expand Down Expand Up @@ -57,6 +59,7 @@ pub(crate) struct Chain<P: Provider> {
pub struct ChainUpdate {
pub latest_block_number: u64,
pub latest_block_hash: H256,
pub latest_block_timestamp: Timestamp,
/// Blocks before this number are no longer tracked in this `Chain`, so no
/// further updates related to them will be sent.
pub earliest_remembered_block_number: u64,
Expand Down Expand Up @@ -93,6 +96,7 @@ pub(crate) struct Settings {
struct BlockSummary {
number: u64,
hash: H256,
timestamp: Timestamp,
parent_hash: H256,
ops: Vec<MinedOp>,
}
Expand Down Expand Up @@ -380,6 +384,7 @@ impl<P: Provider> Chain<P> {
ChainUpdate {
latest_block_number: latest_block.number,
latest_block_hash: latest_block.hash,
latest_block_timestamp: latest_block.timestamp,
earliest_remembered_block_number: self.blocks[0].number,
reorg_depth,
mined_ops,
Expand Down Expand Up @@ -415,6 +420,7 @@ impl BlockSummary {
.context("block number should be present")?
.as_u64(),
hash: block.hash.context("block hash should exist")?,
timestamp: block.timestamp.as_u64().into(),
parent_hash: block.parent_hash,
ops: Vec::new(),
})
Expand Down Expand Up @@ -557,6 +563,7 @@ mod tests {
ChainUpdate {
latest_block_number: 3,
latest_block_hash: hash(3),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 1,
reorg_depth: 0,
mined_ops: vec![fake_mined_op(103), fake_mined_op(104), fake_mined_op(105),],
Expand Down Expand Up @@ -584,6 +591,7 @@ mod tests {
ChainUpdate {
latest_block_number: 4,
latest_block_hash: hash(4),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 2,
reorg_depth: 0,
mined_ops: vec![fake_mined_op(106)],
Expand Down Expand Up @@ -617,6 +625,7 @@ mod tests {
ChainUpdate {
latest_block_number: 4,
latest_block_hash: hash(14),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 2,
reorg_depth: 1,
mined_ops: vec![fake_mined_op(112), fake_mined_op(113), fake_mined_op(114)],
Expand Down Expand Up @@ -650,6 +659,7 @@ mod tests {
ChainUpdate {
latest_block_number: 2,
latest_block_hash: hash(12),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 2,
mined_ops: vec![fake_mined_op(111), fake_mined_op(112)],
Expand Down Expand Up @@ -680,6 +690,7 @@ mod tests {
ChainUpdate {
latest_block_number: 1,
latest_block_hash: hash(11),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 2,
mined_ops: vec![fake_mined_op(111)],
Expand Down Expand Up @@ -711,6 +722,7 @@ mod tests {
ChainUpdate {
latest_block_number: 3,
latest_block_hash: hash(13),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 1,
reorg_depth: 3,
mined_ops: vec![fake_mined_op(111), fake_mined_op(112), fake_mined_op(113)],
Expand Down Expand Up @@ -740,6 +752,7 @@ mod tests {
ChainUpdate {
latest_block_number: 6,
latest_block_hash: hash(16),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 4,
reorg_depth: 0,
mined_ops: vec![fake_mined_op(104), fake_mined_op(105), fake_mined_op(106)],
Expand All @@ -763,6 +776,7 @@ mod tests {
ChainUpdate {
latest_block_number: 1,
latest_block_hash: hash(1),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![fake_mined_op(101), fake_mined_op(102), fake_mined_op(103),],
Expand Down
5 changes: 5 additions & 0 deletions crates/pool/src/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ pub enum OpRemovalReason {
/// The removed entity
entity: Entity,
},
/// Op was removed because it expired
Expired {
/// Op was valid until this timestamp
valid_until: Timestamp,
},
}

impl EntitySummary {
Expand Down
60 changes: 59 additions & 1 deletion crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ethers::{
abi::Address,
types::{H256, U256},
};
use rundler_types::{Entity, UserOperation, UserOperationId};
use rundler_types::{Entity, Timestamp, UserOperation, UserOperationId};
use rundler_utils::math;
use tracing::info;

Expand Down Expand Up @@ -141,6 +141,25 @@ impl PoolInner {
self.best.clone().into_iter().map(|v| v.po)
}

/// Removes all operations using the given entity, returning the hashes of the removed operations.
///
/// NOTE: This method is O(n) where n is the number of operations in the pool.
/// It should be called sparingly (e.g. when a block is mined).
pub(crate) fn remove_expired(&mut self, expire_before: Timestamp) -> Vec<(H256, Timestamp)> {
let mut expired = Vec::new();
for (hash, op) in &self.by_hash {
if op.po.valid_time_range.valid_until < expire_before {
expired.push((*hash, op.po.valid_time_range.valid_until));
}
}

for (hash, _) in &expired {
self.remove_operation_by_hash(*hash);
}

expired
}

pub(crate) fn address_count(&self, address: Address) -> usize {
self.count_by_address.get(&address).copied().unwrap_or(0)
}
Expand Down Expand Up @@ -285,6 +304,7 @@ impl PoolInner {
self.by_id.insert(pool_op.uo().id(), pool_op.clone());
self.best.insert(pool_op);

// TODO(danc): This silently drops UOs from the pool without reporting
let removed = self
.enforce_size()
.context("should have succeeded in resizing the pool")?;
Expand Down Expand Up @@ -789,6 +809,44 @@ mod tests {
}
}

#[test]
fn test_expired() {
let conf = conf();
let mut pool = PoolInner::new(conf.clone());
let sender = Address::random();
let mut po1 = create_op(sender, 0, 10);
po1.valid_time_range.valid_until = Timestamp::from(1);
let _ = pool.add_operation(po1.clone()).unwrap();

let res = pool.remove_expired(Timestamp::from(2));
assert_eq!(res.len(), 1);
assert_eq!(res[0].0, po1.uo.op_hash(conf.entry_point, conf.chain_id));
assert_eq!(res[0].1, Timestamp::from(1));
}

#[test]
fn test_multiple_expired() {
let conf = conf();
let mut pool = PoolInner::new(conf.clone());

let mut po1 = create_op(Address::random(), 0, 10);
po1.valid_time_range.valid_until = 5.into();
let _ = pool.add_operation(po1.clone()).unwrap();

let mut po2 = create_op(Address::random(), 0, 10);
po2.valid_time_range.valid_until = 10.into();
let _ = pool.add_operation(po2.clone()).unwrap();

let mut po3 = create_op(Address::random(), 0, 10);
po3.valid_time_range.valid_until = 9.into();
let _ = pool.add_operation(po3.clone()).unwrap();

let res = pool.remove_expired(10.into());
assert_eq!(res.len(), 2);
assert!(res.contains(&(po1.uo.op_hash(conf.entry_point, conf.chain_id), 5.into())));
assert!(res.contains(&(po3.uo.op_hash(conf.entry_point, conf.chain_id), 9.into())));
}

fn conf() -> PoolInnerConfig {
PoolInnerConfig {
entry_point: Address::random(),
Expand Down
54 changes: 53 additions & 1 deletion crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,24 @@ where
for hash in to_remove {
state.pool.remove_operation_by_hash(hash);
state.throttled_ops.remove(&hash);
self.emit(OpPoolEvent::RemovedOp {
op_hash: hash,
reason: OpRemovalReason::ThrottledAndOld {
added_at_block_number: state.block_number,
current_block_number: update.latest_block_number,
},
})
}

// expire old UOs
let expired = state.pool.remove_expired(update.latest_block_timestamp);
for (hash, until) in expired {
self.emit(OpPoolEvent::RemovedOp {
op_hash: hash,
reason: OpRemovalReason::Expired { valid_until: until },
})
}

state.block_number = update.latest_block_number;
}

Expand Down Expand Up @@ -452,7 +469,7 @@ mod tests {
MockPrechecker, MockSimulator, PrecheckError, PrecheckSettings, PrecheckViolation,
SimulationError, SimulationSettings, SimulationSuccess, SimulationViolation,
};
use rundler_types::{EntityType, GasFees};
use rundler_types::{EntityType, GasFees, ValidTimeRange};

use super::*;
use crate::chain::MinedOp;
Expand Down Expand Up @@ -533,6 +550,7 @@ mod tests {
pool.on_chain_update(&ChainUpdate {
latest_block_number: 1,
latest_block_hash: H256::random(),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![MinedOp {
Expand Down Expand Up @@ -561,6 +579,7 @@ mod tests {
pool.on_chain_update(&ChainUpdate {
latest_block_number: 1,
latest_block_hash: H256::random(),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![MinedOp {
Expand All @@ -580,6 +599,7 @@ mod tests {
pool.on_chain_update(&ChainUpdate {
latest_block_number: 1,
latest_block_hash: H256::random(),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![],
Expand Down Expand Up @@ -607,6 +627,7 @@ mod tests {
pool.on_chain_update(&ChainUpdate {
latest_block_number: 1,
latest_block_hash: H256::random(),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![MinedOp {
Expand Down Expand Up @@ -643,6 +664,7 @@ mod tests {
pool.on_chain_update(&ChainUpdate {
latest_block_number: 1,
latest_block_hash: H256::random(),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![MinedOp {
Expand Down Expand Up @@ -710,6 +732,7 @@ mod tests {
pool.on_chain_update(&ChainUpdate {
latest_block_number: 1,
latest_block_hash: H256::random(),
latest_block_timestamp: 0.into(),
earliest_remembered_block_number: 0,
reorg_depth: 0,
mined_ops: vec![MinedOp {
Expand Down Expand Up @@ -862,9 +885,35 @@ mod tests {
check_ops(pool.best_operations(1, 0).unwrap(), vec![replacement]);
}

#[tokio::test]
async fn test_expiry() {
let mut op = create_op(Address::random(), 0, 0);
op.valid_time_range = ValidTimeRange {
valid_after: 0.into(),
valid_until: 10.into(),
};
let pool = create_pool(vec![op.clone()]);

let _ = pool
.add_operation(OperationOrigin::Local, op.op.clone())
.await
.unwrap();

check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op.clone()]);

pool.on_chain_update(&ChainUpdate {
latest_block_timestamp: 11.into(),
..ChainUpdate::default()
})
.await;

check_ops(pool.best_operations(1, 0).unwrap(), vec![]);
}

#[derive(Clone, Debug)]
struct OpWithErrors {
op: UserOperation,
valid_time_range: ValidTimeRange,
precheck_error: Option<PrecheckViolation>,
simulation_error: Option<SimulationViolation>,
staked: bool,
Expand Down Expand Up @@ -901,6 +950,7 @@ mod tests {
} else {
Ok(SimulationSuccess {
account_is_staked: op.staked,
valid_time_range: op.valid_time_range,
..SimulationSuccess::default()
})
}
Expand Down Expand Up @@ -948,6 +998,7 @@ mod tests {
max_fee_per_gas: max_fee_per_gas.into(),
..UserOperation::default()
},
valid_time_range: ValidTimeRange::default(),
precheck_error: None,
simulation_error: None,
staked: false,
Expand All @@ -969,6 +1020,7 @@ mod tests {
max_fee_per_gas: max_fee_per_gas.into(),
..UserOperation::default()
},
valid_time_range: ValidTimeRange::default(),
precheck_error,
simulation_error,
staked,
Expand Down

0 comments on commit cc4e3e6

Please sign in to comment.