From 0654565167152042d6ea58a4b44a7a929c8ee33a Mon Sep 17 00:00:00 2001 From: David Philipson Date: Tue, 25 Jul 2023 16:43:57 -0700 Subject: [PATCH] feat(oppool): Return reorged ops to the mempool Previously, ops could be lost forever if they were mined into a block, but then that block got reorged away. Now, we detect reorgs and return any ops contained in them to our mempool. This requires some involved changes: * We replace the entire `events` mod and all its listeners. Instead, we introduce a new type, `Chain`, which represents our current knowledge of what blocks make up the blockchain. * We watch for the latest block hash to change. When it does, we read backwards from the latest block until we connect back to our known blocks, which lets us see any blocks that were replaced as well. * This process produces a `ChainUpdate` event, which is sent to the op pool (replacing `NewBlockEvent`). This may cause the op pool to not only remove mined ops but also restore unmined ops. * In order for the op pool to do so, it remembers mined ops for a time rather than deleting them fully. We add new methods to the op pool for restoring unmined blocks to make this possible. --- src/builder/server.rs | 30 +- src/builder/task.rs | 28 +- src/cli/builder.rs | 5 +- src/cli/mod.rs | 6 +- src/cli/pool.rs | 31 +- src/cli/rpc.rs | 5 +- src/common/block_watcher.rs | 53 ++ src/common/eth.rs | 24 +- src/common/handle.rs | 11 +- src/common/mod.rs | 2 + src/common/retry.rs | 89 ++++ src/common/server.rs | 37 +- src/common/types/provider_like.rs | 15 +- src/op_pool/chain.rs | 802 ++++++++++++++++++++++++++++++ src/op_pool/event/http.rs | 189 ------- src/op_pool/event/listener.rs | 609 ----------------------- src/op_pool/event/mock.rs | 81 --- src/op_pool/event/mod.rs | 99 ---- src/op_pool/event/ws.rs | 144 ------ src/op_pool/mempool/mod.rs | 13 +- src/op_pool/mempool/pool.rs | 223 ++++++--- src/op_pool/mempool/uo_pool.rs | 88 ++-- src/op_pool/mod.rs | 2 +- src/op_pool/reputation.rs | 23 +- src/op_pool/server.rs | 4 +- src/op_pool/task.rs | 61 +-- 26 files changed, 1308 insertions(+), 1366 deletions(-) create mode 100644 src/common/block_watcher.rs create mode 100644 src/common/retry.rs create mode 100644 src/op_pool/chain.rs delete mode 100644 src/op_pool/event/http.rs delete mode 100644 src/op_pool/event/listener.rs delete mode 100644 src/op_pool/event/mock.rs delete mode 100644 src/op_pool/event/mod.rs delete mode 100644 src/op_pool/event/ws.rs diff --git a/src/builder/server.rs b/src/builder/server.rs index f2a67bee4..492989320 100644 --- a/src/builder/server.rs +++ b/src/builder/server.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{bail, Context}; use ethers::{ - providers::{Http, Middleware, Provider, RetryClient}, + providers::{Http, Provider, RetryClient}, types::{transaction::eip2718::TypedTransaction, Address, H256, U256}, }; use tokio::{join, sync::broadcast, time}; @@ -22,6 +22,7 @@ use crate::{ transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker}, }, common::{ + block_watcher, emit::WithEntryPoint, gas::GasFees, math, @@ -136,7 +137,12 @@ where time::sleep(self.eth_poll_interval).await; continue; } - last_block_number = self.wait_for_new_block_number(last_block_number).await; + last_block_number = block_watcher::wait_for_new_block_number( + &*self.provider, + last_block_number, + self.eth_poll_interval, + ) + .await; self.check_for_and_log_transaction_update().await; let result = self.send_bundle_with_increasing_gas_fees().await; match result { @@ -337,26 +343,6 @@ where Ok(SendBundleResult::StalledAtMaxFeeIncreases) } - async fn wait_for_new_block_number(&self, prev_block_number: u64) -> u64 { - loop { - let block_number = self.provider.get_block_number().await; - match block_number { - Ok(n) => { - let n = n.as_u64(); - if n > prev_block_number { - return n; - } - } - Err(error) => { - error!( - "Failed to load latest block number in builder. Will keep trying: {error}" - ); - } - } - time::sleep(self.eth_poll_interval).await; - } - } - /// Builds a bundle and returns some metadata and the transaction to send /// it, or `None` if there are no valid operations available. async fn get_bundle_tx( diff --git a/src/builder/task.rs b/src/builder/task.rs index cd7f7ffef..4d5e7e05a 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -1,10 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::{bail, Context}; -use ethers::{ - providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient, RetryClientBuilder}, - types::{Address, H256}, -}; +use ethers::types::{Address, H256}; use ethers_signers::Signer; use rusoto_core::Region; use tokio::{select, sync::broadcast, time}; @@ -14,7 +11,6 @@ use tonic::{ transport::{Channel, Server}, }; use tracing::info; -use url::Url; use crate::{ builder::{ @@ -29,6 +25,7 @@ use crate::{ common::{ contracts::i_entry_point::IEntryPoint, emit::WithEntryPoint, + eth, gas::PriorityFeeMode, handle::{SpawnGuard, Task}, mempool::MempoolConfig, @@ -81,7 +78,7 @@ impl Task for BuilderTask { info!("Starting builder server on {}", addr); tracing::info!("Mempool config: {:?}", self.args.mempool_configs); - let provider = new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; + let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; let signer = if let Some(pk) = &self.args.private_key { info!("Using local signer"); BundlerSigner::Local( @@ -139,7 +136,8 @@ impl Task for BuilderTask { proposer_settings, self.event_sender.clone(), ); - let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; + let submit_provider = + eth::new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; let transaction_sender = get_sender( submit_provider, signer, @@ -240,19 +238,3 @@ impl BuilderTask { } } } - -fn new_provider( - url: &str, - poll_interval: Duration, -) -> anyhow::Result>>> { - let parsed_url = Url::parse(url).context("provider url should be a valid")?; - let http = Http::new(parsed_url); - let client = RetryClientBuilder::default() - // these retries are if the server returns a 429 - .rate_limit_retries(10) - // these retries are if the connection is dubious - .timeout_retries(3) - .initial_backoff(Duration::from_millis(500)) - .build(http, Box::::default()); - Ok(Arc::new(Provider::new(client).interval(poll_interval))) -} diff --git a/src/cli/builder.rs b/src/cli/builder.rs index 2463aa58e..7fef9d4df 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -159,10 +159,7 @@ impl BuilderArgs { common.priority_fee_mode_value, )?; - let rpc_url = common - .node_http - .clone() - .context("should have a node HTTP URL")?; + let rpc_url = common.node_http.clone(); let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone()); let mempool_configs = match &common.mempool_config_path { diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 231dbe46b..7d91a8d20 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -100,10 +100,6 @@ pub struct CommonArgs { )] chain_id: u64, - /// ETH Node websocket URL to connect to - #[arg(long = "node_ws", name = "node_ws", env = "NODE_WS", global = true)] - node_ws: Option, - /// ETH Node HTTP URL to connect to #[arg( long = "node_http", @@ -111,7 +107,7 @@ pub struct CommonArgs { env = "NODE_HTTP", global = true )] - node_http: Option, + node_http: String, #[arg( long = "max_verification_gas", diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 65a29c387..71a2e9af0 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -2,6 +2,7 @@ use std::time::Duration; use anyhow::Context; use clap::Args; +use ethers::types::Chain; use tokio::sync::broadcast; use super::CommonArgs; @@ -83,6 +84,13 @@ pub struct PoolArgs { env = "POOL_ALLOWLIST_PATH" )] pub allowlist_path: Option, + + #[arg( + long = "pool.chain_history_size", + name = "pool.chain_history_size", + env = "POOL_CHAIN_HISTORY_SIZE" + )] + pub chain_history_size: Option, } impl PoolArgs { @@ -122,7 +130,9 @@ impl PoolArgs { port: self.port, host: self.host.clone(), chain_id: common.chain_id, - ws_url: common.node_ws.clone(), + chain_history_size: self + .chain_history_size + .unwrap_or_else(|| default_chain_history_size(common.chain_id)), http_url: common.node_http.clone(), http_poll_interval: Duration::from_millis(self.http_poll_interval_millis), pool_configs, @@ -130,6 +140,25 @@ impl PoolArgs { } } +const SMALL_HISTORY_SIZE: u64 = 16; +const LARGE_HISTORY_SIZE: u64 = 128; + +// Mainnets that are known to not have large reorgs can use the small history +// size. Use the large history size for all testnets because I don't trust them. +const SMALL_HISTORY_CHAIN_IDS: &[u64] = &[ + Chain::Mainnet as u64, + Chain::Arbitrum as u64, + Chain::Optimism as u64, +]; + +fn default_chain_history_size(chain_id: u64) -> u64 { + if SMALL_HISTORY_CHAIN_IDS.contains(&chain_id) { + SMALL_HISTORY_SIZE + } else { + LARGE_HISTORY_SIZE + } +} + /// CLI options for the Pool server standalone #[derive(Args, Debug)] pub struct PoolCliArgs { diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index 98af54455..741286864 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -98,10 +98,7 @@ impl RpcArgs { .map(|ep| ep.parse()) .collect::, _>>() .context("Invalid entry_points argument")?, - rpc_url: common - .node_http - .clone() - .context("rpc requires node_http arg")?, + rpc_url: common.node_http.clone(), chain_id: common.chain_id, api_namespaces: apis, precheck_settings, diff --git a/src/common/block_watcher.rs b/src/common/block_watcher.rs new file mode 100644 index 000000000..4eb2532f8 --- /dev/null +++ b/src/common/block_watcher.rs @@ -0,0 +1,53 @@ +use std::time::Duration; + +use ethers::types::{Block, BlockNumber, H256}; +use tokio::time; +use tracing::error; + +use crate::common::{retry, retry::UnlimitedRetryOpts, types::ProviderLike}; + +pub async fn wait_for_new_block( + provider: &impl ProviderLike, + last_block_hash: H256, + poll_interval: Duration, +) -> (H256, Block) { + loop { + let block = retry::with_unlimited_retries( + "watch latest block", + || provider.get_block(BlockNumber::Latest), + UnlimitedRetryOpts::default(), + ) + .await; + let Some(block) = block else { + error!("Latest block should be present when waiting for new block."); + continue; + }; + let Some(hash) = block.hash else { + error!("Latest block should have hash."); + continue; + }; + if last_block_hash != hash { + return (hash, block); + } + time::sleep(poll_interval).await; + } +} + +pub async fn wait_for_new_block_number( + provider: &impl ProviderLike, + last_block_number: u64, + poll_interval: Duration, +) -> u64 { + loop { + let block_number = retry::with_unlimited_retries( + "watch latest block number", + || provider.get_block_number(), + UnlimitedRetryOpts::default(), + ) + .await; + if last_block_number < block_number { + return block_number; + } + time::sleep(poll_interval).await; + } +} diff --git a/src/common/eth.rs b/src/common/eth.rs index 938434141..dedf0c0a6 100644 --- a/src/common/eth.rs +++ b/src/common/eth.rs @@ -1,17 +1,37 @@ -use std::{error, future::Future, ops::Deref, sync::Arc}; +use std::{error, future::Future, ops::Deref, sync::Arc, time::Duration}; use anyhow::Context; use ethers::{ abi::{AbiDecode, AbiEncode, RawLog}, contract::{builders::ContractCall, Contract, ContractDeployer, ContractError}, - providers::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError}, + providers::{ + Http, HttpRateLimitRetryPolicy, JsonRpcClient, Middleware, PendingTransaction, Provider, + ProviderError, RetryClient, RetryClientBuilder, + }, types::{ Address, BlockId, Bytes, Eip1559TransactionRequest, Log, Selector, TransactionReceipt, H256, }, }; +use url::Url; use crate::common::contracts::get_code_hashes::{CodeHashesResult, GETCODEHASHES_BYTECODE}; +pub fn new_provider( + url: &str, + poll_interval: Duration, +) -> anyhow::Result>>> { + let parsed_url = Url::parse(url).context("provider url should be valid")?; + let http = Http::new(parsed_url); + let client = RetryClientBuilder::default() + // these retries are if the server returns a 429 + .rate_limit_retries(10) + // these retries are if the connection is dubious + .timeout_retries(3) + .initial_backoff(Duration::from_millis(500)) + .build(http, Box::::default()); + Ok(Arc::new(Provider::new(client).interval(poll_interval))) +} + /// Waits for a pending transaction to be mined, providing appropriate error /// messages for each point of failure. pub async fn await_mined_tx<'a, Fut, C, Err>( diff --git a/src/common/handle.rs b/src/common/handle.rs index f933ce854..f3f4b7654 100644 --- a/src/common/handle.rs +++ b/src/common/handle.rs @@ -12,9 +12,7 @@ use tracing::{error, info}; /// /// Flattens the two types of errors that can occur when awaiting a handle. /// Useful when using tokio::try_join! to await multiple handles. -pub async fn flatten_handle( - handle: JoinHandle>, -) -> Result { +pub async fn flatten_handle(handle: JoinHandle>) -> anyhow::Result { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => Err(err)?, @@ -22,6 +20,13 @@ pub async fn flatten_handle( } } +/// Converts a JoinHandle result into an `anyhow::Result`. Like +/// `flatten_handle`, useful when using `tokio::try_join!` to await multiple +/// handles. +pub async fn as_anyhow_handle(handle: JoinHandle) -> anyhow::Result { + handle.await.context("handling failed") +} + /// A guard that aborts a spawned task when dropped. #[derive(Debug)] pub struct SpawnGuard(AbortHandle); diff --git a/src/common/mod.rs b/src/common/mod.rs index 30104d5f6..66e976e19 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,3 +1,4 @@ +pub mod block_watcher; pub mod context; pub mod contracts; pub mod dev; @@ -10,6 +11,7 @@ pub mod math; pub mod mempool; pub mod precheck; pub mod protos; +pub mod retry; pub mod server; pub mod simulation; pub mod strs; diff --git a/src/common/retry.rs b/src/common/retry.rs new file mode 100644 index 000000000..4f4923f6b --- /dev/null +++ b/src/common/retry.rs @@ -0,0 +1,89 @@ +use std::{future::Future, time::Duration}; + +use rand::Rng; +use tokio::time; +use tracing::warn; + +#[derive(Clone, Copy, Debug)] +pub struct RetryOpts { + pub max_attempts: u64, + /// The first retry is immediately after the first failure (plus jitter). + /// The next retry after that will wait this long. + pub min_nonzero_wait: Duration, + pub max_wait: Duration, + pub max_jitter: Duration, +} + +impl Default for RetryOpts { + fn default() -> Self { + UnlimitedRetryOpts::default().to_retry_opts_with_max_attempts(10) + } +} + +pub async fn with_retries( + description: &str, + func: Func, + opts: RetryOpts, +) -> Result +where + Func: Fn() -> Fut, + Fut: Future>, +{ + let mut next_wait = Duration::ZERO; + let mut last_error: Option = None; + for attempt_number in 1..opts.max_attempts + 1 { + match func().await { + Ok(out) => return Ok(out), + Err(error) => { + last_error = Some(error); + warn!("Failed to {description} (attempt {attempt_number})"); + } + } + // Grab a new rng each iteration because we can't hold it across awaits. + let jitter = rand::thread_rng().gen_range(Duration::ZERO..opts.max_jitter); + time::sleep(next_wait + jitter).await; + next_wait = (2 * next_wait).clamp(opts.min_nonzero_wait, opts.max_wait); + } + Err(last_error.unwrap()) +} + +#[derive(Clone, Copy, Debug)] +pub struct UnlimitedRetryOpts { + pub min_nonzero_wait: Duration, + pub max_wait: Duration, + pub max_jitter: Duration, +} + +impl Default for UnlimitedRetryOpts { + fn default() -> Self { + Self { + min_nonzero_wait: Duration::from_secs(1), + max_wait: Duration::from_secs(10), + max_jitter: Duration::from_secs(1), + } + } +} + +impl UnlimitedRetryOpts { + fn to_retry_opts_with_max_attempts(self, max_attempts: u64) -> RetryOpts { + RetryOpts { + max_attempts, + min_nonzero_wait: self.min_nonzero_wait, + max_wait: self.max_wait, + max_jitter: self.max_jitter, + } + } +} + +pub async fn with_unlimited_retries( + description: &str, + func: Func, + opts: UnlimitedRetryOpts, +) -> Out +where + Func: Fn() -> Fut, + Fut: Future>, +{ + let opts = opts.to_retry_opts_with_max_attempts(u64::MAX); + with_retries(description, func, opts).await.ok().unwrap() +} diff --git a/src/common/server.rs b/src/common/server.rs index 8c094f921..f5bc64eeb 100644 --- a/src/common/server.rs +++ b/src/common/server.rs @@ -1,7 +1,8 @@ -use std::{cmp, future::Future, time::Duration}; +use std::{future::Future, time::Duration}; -use anyhow::bail; -use rand::Rng; +use anyhow::Context; + +use crate::common::{retry, retry::RetryOpts}; pub fn format_server_addr(host: &String, port: u16, secure: bool) -> String { if secure { @@ -24,21 +25,17 @@ where F: Fn(String) -> FutF, FutF: Future> + Send + 'static, { - for i in 0..10 { - match func(url.to_owned()).await { - Ok(client) => return Ok(client), - Err(e) => tracing::warn!( - "Failed to connect to {server_name} at {url} {e:?} (attempt {})", - i - ), - } - let sleep_dur = { - let mut rng = rand::thread_rng(); - let jitter = rng.gen_range(0..1000); - let millis = cmp::min(10, 2_u64.pow(i)) * 1000 + jitter; - Duration::from_millis(millis) - }; - tokio::time::sleep(sleep_dur).await; - } - bail!("Failed to connect to {server_name} at {url} after 10 attempts"); + let description = format!("connect to {server_name} at {url}"); + retry::with_retries( + &description, + || func(url.to_owned()), + RetryOpts { + max_attempts: 10, + min_nonzero_wait: Duration::from_secs(1), + max_wait: Duration::from_secs(10), + max_jitter: Duration::from_secs(1), + }, + ) + .await + .context("should connect to server when retrying") } diff --git a/src/common/types/provider_like.rs b/src/common/types/provider_like.rs index 237e2c86c..a636fcd8e 100644 --- a/src/common/types/provider_like.rs +++ b/src/common/types/provider_like.rs @@ -4,7 +4,7 @@ use anyhow::Context; use ethers::{ contract::ContractError, providers::{JsonRpcClient, Middleware, Provider}, - types::{Address, Block, BlockId, BlockNumber, Bytes, H160, H256, U256}, + types::{Address, Block, BlockId, BlockNumber, Bytes, Filter, Log, H160, H256, U256}, }; #[cfg(test)] use mockall::automock; @@ -48,6 +48,8 @@ pub trait ProviderLike: Send + Sync + 'static { async fn get_transaction_count(&self, address: Address) -> anyhow::Result; + async fn get_logs(&self, filter: &Filter) -> anyhow::Result>; + async fn aggregate_signatures( self: Arc, aggregator_address: Address, @@ -114,10 +116,15 @@ impl ProviderLike for Provider { } async fn get_max_priority_fee(&self) -> anyhow::Result { - Ok(self - .request("eth_maxPriorityFeePerGas", ()) + self.request("eth_maxPriorityFeePerGas", ()) + .await + .context("should get max priority fee from provider") + } + + async fn get_logs(&self, filter: &Filter) -> anyhow::Result> { + Middleware::get_logs(self, filter) .await - .context("should get max priority fee from provider")?) + .context("provider should get logs") } async fn aggregate_signatures( diff --git a/src/op_pool/chain.rs b/src/op_pool/chain.rs new file mode 100644 index 000000000..05c5aa1b2 --- /dev/null +++ b/src/op_pool/chain.rs @@ -0,0 +1,802 @@ +use std::{ + collections::{HashSet, VecDeque}, + sync::Arc, + time::Duration, +}; + +use anyhow::{ensure, Context}; +use ethers::{ + contract, + prelude::EthEvent, + types::{Address, Block, Filter, H256, U256}, +}; +use futures::future; +use tokio::{ + select, + sync::{broadcast, Semaphore}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +use crate::common::{ + block_watcher, contracts::i_entry_point::UserOperationEventFilter, types::ProviderLike, +}; + +const MAX_LOAD_OPS_CONCURRENCY: usize = 64; + +/// A data structure that holds the currently known recent state of the chain, +/// with logic for updating itself and returning what has changed. +/// +/// Will update itself when `.sync_to_block_number` is called, at which point it +/// will query a node to determine the new state of the chain. +#[derive(Debug)] +pub struct Chain { + provider: Arc

, + settings: Settings, + /// Blocks are stored from earliest to latest, so the oldest block is at the + /// front of this deque and the newest at the back. + blocks: VecDeque, + /// Semaphore to limit the number of concurrent `eth_getLogs` calls. + load_ops_semaphore: Semaphore, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct ChainUpdate { + pub latest_block_number: u64, + pub latest_block_hash: H256, + /// Blocks before this number are no longer tracked in this `Chain`, so no + /// further updates related to them will be sent. + pub earliest_remembered_block_number: u64, + pub reorg_depth: u64, + pub mined_ops: Vec, + pub unmined_ops: Vec, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct MinedOp { + pub hash: H256, + pub entry_point: Address, + pub sender: Address, + pub nonce: U256, +} + +#[derive(Debug)] +pub struct Settings { + pub history_size: u64, + pub poll_interval: Duration, + pub entry_point_addresses: Vec

, +} + +#[derive(Debug)] +struct BlockSummary { + pub number: u64, + pub hash: H256, + pub parent_hash: H256, + pub ops: Vec, +} + +impl Chain

{ + pub fn new(provider: Arc

, settings: Settings) -> Self { + let history_size = settings.history_size as usize; + assert!(history_size > 0, "history size should be positive"); + Self { + provider, + settings, + blocks: VecDeque::new(), + load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY), + } + } + + pub fn spawn_watcher( + mut self, + sender: broadcast::Sender>, + shutdown_token: CancellationToken, + ) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + select! { + update = self.wait_for_update() => { + let _ = sender.send(Arc::new(update)); + } + _ = shutdown_token.cancelled() => { + info!("Shutting down chain watcher"); + break; + } + } + } + }) + } + + async fn wait_for_update(&mut self) -> ChainUpdate { + let mut block_hash = self + .blocks + .back() + .map(|block| block.hash) + .unwrap_or_default(); + loop { + let (hash, block) = block_watcher::wait_for_new_block( + &*self.provider, + block_hash, + self.settings.poll_interval, + ) + .await; + block_hash = hash; + let update = self.sync_to_block(block).await; + match update { + Ok(update) => return update, + Err(error) => { + error!("Failed to update chain at block {block_hash:?}. Will try again at next block. {error:?}"); + } + } + } + } + + pub async fn sync_to_block(&mut self, new_head: Block) -> anyhow::Result { + let new_head = BlockSummary::try_from_block_without_ops(new_head, None)?; + let Some(current_block) = self.blocks.back() else { + return self.reset_and_initialize(new_head).await; + }; + let current_block_number = current_block.number; + let new_block_number = new_head.number; + ensure!( + current_block_number < new_block_number + self.settings.history_size, + "new block number {new_block_number} should be greater than start of history (current block: {current_block_number})" + ); + if current_block_number + self.settings.history_size < new_block_number { + warn!( + "New block {new_block_number} number is {} blocks ahead of the previously known head. Chain history will skip ahead.", + new_block_number - current_block_number, + ); + return self.reset_and_initialize(new_head).await; + } + let added_blocks = self + .load_added_blocks_connecting_to_existing_chain(current_block_number, new_head) + .await?; + Ok(self.update_with_blocks(current_block_number, added_blocks)) + } + + async fn reset_and_initialize(&mut self, head: BlockSummary) -> anyhow::Result { + let min_block_number = head.number.saturating_sub(self.settings.history_size - 1); + let mut blocks = self + .load_blocks_back_to_number_no_ops(head, min_block_number) + .await + .context("should load full history when resetting chain")?; + self.load_ops_into_block_summaries(&mut blocks).await?; + self.blocks = blocks; + let mined_ops: Vec<_> = self + .blocks + .iter() + .flat_map(|block| &block.ops) + .copied() + .collect(); + Ok(self.new_update(0, mined_ops, vec![])) + } + + /// Given a collection of blocks to add to the chain, whose numbers may + /// overlap the current numbers in the case of reorgs, update the state of + /// this data structure and return an update struct. + fn update_with_blocks( + &mut self, + current_block_number: u64, + added_blocks: VecDeque, + ) -> ChainUpdate { + let mined_ops: Vec<_> = added_blocks + .iter() + .flat_map(|block| &block.ops) + .copied() + .collect(); + let reorg_depth = current_block_number + 1 - added_blocks[0].number; + let unmined_ops: Vec<_> = self + .blocks + .iter() + .skip(self.blocks.len() - reorg_depth as usize) + .flat_map(|block| &block.ops) + .copied() + .collect(); + for _ in 0..reorg_depth { + self.blocks.pop_back(); + } + self.blocks.extend(added_blocks); + while self.blocks.len() > self.settings.history_size as usize { + self.blocks.pop_front(); + } + self.new_update(reorg_depth, mined_ops, unmined_ops) + } + + async fn load_added_blocks_connecting_to_existing_chain( + &self, + current_block_number: u64, + new_head: BlockSummary, + ) -> anyhow::Result> { + // Load blocks from last known number to current. + let mut added_blocks = self + .load_blocks_back_to_number_no_ops(new_head, current_block_number + 1) + .await + .context("chain should load blocks from last processed to latest block")?; + ensure!( + !added_blocks.is_empty(), + "added blocks should never be empty" + ); + // Continue to load blocks backwards until we connect with the known chain, if necessary. + loop { + let earliest_new_block = &added_blocks[0]; + if earliest_new_block.number == 0 { + break; + } + let Some(presumed_parent) = self.block_with_number(earliest_new_block.number - 1) + else { + warn!( + "Reorg is deeper than chain history size ({})", + self.blocks.len() + ); + break; + }; + if presumed_parent.hash == earliest_new_block.parent_hash { + break; + } + // The earliest newly loaded block's parent does not match the known + // chain, so continue to load blocks backwards, replacing the known + // chain, until it does. + let block = self + .provider + .get_block(earliest_new_block.parent_hash) + .await + .context("should load parent block when handling reorg")? + .context("block with parent hash of known block should exist")?; + let block = BlockSummary::try_from_block_without_ops( + block, + Some(earliest_new_block.number - 1), + )?; + added_blocks.push_front(block); + } + self.load_ops_into_block_summaries(&mut added_blocks) + .await?; + Ok(added_blocks) + } + + async fn load_blocks_back_to_number_no_ops( + &self, + head: BlockSummary, + min_block_number: u64, + ) -> anyhow::Result> { + let mut blocks = + VecDeque::with_capacity(head.number.saturating_sub(min_block_number) as usize + 1); + blocks.push_front(head); + while blocks[0].number > min_block_number { + let parent_hash = blocks[0].parent_hash; + let parent = self + .provider + .get_block(parent_hash) + .await + .context("should load parent block by hash")? + .context("block with parent hash of known block should exist")?; + blocks.push_front(BlockSummary::try_from_block_without_ops( + parent, + Some(blocks[0].number - 1), + )?); + } + Ok(blocks) + } + + async fn load_ops_into_block_summaries( + &self, + blocks: &mut VecDeque, + ) -> anyhow::Result<()> { + // As when loading blocks, load op events block-by-block, specifying + // block hash. Don't load with a single call by block number range + // because if the network is in the middle of a reorg, then we can't + // tell which branch we read events from. + let future_opses = blocks + .iter() + .map(|block| self.load_ops_in_block_with_hash(block.hash)); + let opses = future::try_join_all(future_opses) + .await + .context("should load ops for new blocks")?; + for (i, ops) in opses.into_iter().enumerate() { + blocks[i].ops = ops; + } + Ok(()) + } + + async fn load_ops_in_block_with_hash(&self, block_hash: H256) -> anyhow::Result> { + let _permit = self + .load_ops_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + let filter = Filter::new() + .address(self.settings.entry_point_addresses.clone()) + .event(&UserOperationEventFilter::abi_signature()) + .at_block_hash(block_hash); + let logs = self + .provider + .get_logs(&filter) + .await + .context("chain state should load user operation events")?; + logs.into_iter() + .map(|log| { + let entry_point = log.address; + let event = contract::parse_log::(log)?; + Ok(MinedOp { + hash: event.user_op_hash.into(), + entry_point, + sender: event.sender, + nonce: event.nonce, + }) + }) + .collect() + } + + fn block_with_number(&self, number: u64) -> Option<&BlockSummary> { + let earliest_number = self.blocks.front()?.number; + if number < earliest_number { + return None; + } + self.blocks.get((number - earliest_number) as usize) + } + + fn new_update( + &self, + reorg_depth: u64, + mined_ops: Vec, + unmined_ops: Vec, + ) -> ChainUpdate { + let latest_block = self + .blocks + .back() + .expect("new_update should not be called when blocks is empty"); + ChainUpdate { + latest_block_number: latest_block.number, + latest_block_hash: latest_block.hash, + earliest_remembered_block_number: self.blocks[0].number, + reorg_depth, + mined_ops, + unmined_ops, + } + } +} + +impl BlockSummary { + /// Converts a block returned from a provider into a `BlockSummary` with no + /// ops. Takes an expected block number and returns an error if it doesn't + /// match the block. While a provider should never return a block number + /// that doesn't match what we expect, if the provider does return bad data + /// it's better to catch it now than run into panics from bad indexing math + /// later. + fn try_from_block_without_ops( + block: Block, + expected_block_number: Option, + ) -> anyhow::Result { + let number = block + .number + .context("block number should be present")? + .as_u64(); + if let Some(expected_block_number) = expected_block_number { + ensure!( + number == expected_block_number, + "block number {number} should match expected {expected_block_number}" + ); + } + Ok(Self { + number: block + .number + .context("block number should be present")? + .as_u64(), + hash: block.hash.context("block hash should exist")?, + parent_hash: block.parent_hash, + ops: Vec::new(), + }) + } +} + +#[derive(Debug)] +pub struct DedupedOps { + pub mined_ops: Vec, + pub unmined_ops: Vec, +} + +impl ChainUpdate { + /// "Cancels out" ops that appear in both mined and unmined. + pub fn deduped_ops(&self) -> DedupedOps { + let mined_op_hashes: HashSet<_> = self.mined_ops.iter().map(|op| op.hash).collect(); + let unmined_op_hashes: HashSet<_> = self.unmined_ops.iter().map(|op| op.hash).collect(); + let mined_ops = self + .mined_ops + .iter() + .filter(|op| !unmined_op_hashes.contains(&op.hash)) + .copied() + .collect(); + let unmined_ops = self + .unmined_ops + .iter() + .filter(|op| !mined_op_hashes.contains(&op.hash)) + .copied() + .collect(); + DedupedOps { + mined_ops, + unmined_ops, + } + } +} + +#[cfg(test)] +mod tests { + use std::ops::DerefMut; + + use ethers::{ + abi::AbiEncode, + types::{FilterBlockOption, Log, H160}, + utils, + }; + use parking_lot::RwLock; + + use super::*; + use crate::common::types::MockProviderLike; + + const HISTORY_SIZE: u64 = 3; + const ENTRY_POINT_ADDRESS: Address = H160(*b"01234567890123456789"); + + #[derive(Clone, Debug)] + struct MockBlock { + hash: H256, + op_hashes: Vec, + } + + impl MockBlock { + fn new(hash: H256, op_hashes: Vec) -> Self { + Self { hash, op_hashes } + } + } + + #[derive(Clone, Debug)] + struct ProviderController { + blocks: Arc>>, + } + + impl ProviderController { + fn set_blocks(&self, blocks: Vec) { + *self.blocks.write() = blocks; + } + + fn get_blocks_mut(&self) -> impl DerefMut> + '_ { + self.blocks.write() + } + + fn get_head(&self) -> Block { + let hash = self.blocks.read().last().unwrap().hash; + self.get_block_by_hash(hash).unwrap() + } + + fn get_block_by_hash(&self, hash: H256) -> Option> { + let blocks = self.blocks.read(); + let number = blocks.iter().position(|block| block.hash == hash); + let Some(number) = number else { return None }; + let parent_hash = if number > 0 { + blocks[number - 1].hash + } else { + H256::zero() + }; + Some(Block { + hash: Some(hash), + parent_hash, + number: Some(number.into()), + ..Default::default() + }) + } + + fn get_logs_by_block_hash(&self, block_hash: H256) -> Vec { + let blocks = self.blocks.read(); + let block = blocks.iter().find(|block| block.hash == block_hash); + let Some(block) = block else { + return vec![]; + }; + block.op_hashes.iter().copied().map(fake_log).collect() + } + } + + #[tokio::test] + async fn test_initial_load() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(101), hash(102)]), + MockBlock::new(hash(1), vec![hash(103)]), + MockBlock::new(hash(2), vec![]), + MockBlock::new(hash(3), vec![hash(104), hash(105)]), + ]); + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + // With a history size of 3, we should get updates from all blocks except the first one. + assert_eq!( + update, + ChainUpdate { + latest_block_number: 3, + latest_block_hash: hash(3), + earliest_remembered_block_number: 1, + reorg_depth: 0, + mined_ops: vec![fake_mined_op(103), fake_mined_op(104), fake_mined_op(105),], + unmined_ops: vec![], + } + ); + } + + #[tokio::test] + async fn test_simple_advance() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(101), hash(102)]), + MockBlock::new(hash(1), vec![hash(103)]), + MockBlock::new(hash(2), vec![]), + MockBlock::new(hash(3), vec![hash(104), hash(105)]), + ]); + chain.sync_to_block(controller.get_head()).await.unwrap(); + controller + .get_blocks_mut() + .push(MockBlock::new(hash(4), vec![hash(106)])); + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 4, + latest_block_hash: hash(4), + earliest_remembered_block_number: 2, + reorg_depth: 0, + mined_ops: vec![fake_mined_op(106)], + unmined_ops: vec![], + } + ); + } + + #[tokio::test] + async fn test_forward_reorg() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(100)]), + MockBlock::new(hash(1), vec![hash(101)]), + MockBlock::new(hash(2), vec![hash(102)]), + ]); + chain.sync_to_block(controller.get_head()).await.unwrap(); + { + // Replaces the head of the chain with three new blocks. + let mut blocks = controller.get_blocks_mut(); + blocks.pop(); + blocks.extend([ + MockBlock::new(hash(12), vec![hash(112)]), + MockBlock::new(hash(13), vec![hash(113)]), + MockBlock::new(hash(14), vec![hash(114)]), + ]); + } + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 4, + latest_block_hash: hash(14), + earliest_remembered_block_number: 2, + reorg_depth: 1, + mined_ops: vec![fake_mined_op(112), fake_mined_op(113), fake_mined_op(114)], + unmined_ops: vec![fake_mined_op(102)], + } + ); + } + + #[tokio::test] + async fn test_sideways_reorg() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(100)]), + MockBlock::new(hash(1), vec![hash(101)]), + MockBlock::new(hash(2), vec![hash(102)]), + ]); + chain.sync_to_block(controller.get_head()).await.unwrap(); + { + // Replaces the top two blocks with two new ones. + let mut blocks = controller.get_blocks_mut(); + blocks.pop(); + blocks.pop(); + blocks.extend([ + MockBlock::new(hash(11), vec![hash(111)]), + MockBlock::new(hash(12), vec![hash(112)]), + ]); + } + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 2, + latest_block_hash: hash(12), + earliest_remembered_block_number: 0, + reorg_depth: 2, + mined_ops: vec![fake_mined_op(111), fake_mined_op(112)], + unmined_ops: vec![fake_mined_op(101), fake_mined_op(102)], + } + ); + } + + #[tokio::test] + async fn test_backwards_reorg() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(100)]), + MockBlock::new(hash(1), vec![hash(101)]), + MockBlock::new(hash(2), vec![hash(102)]), + ]); + chain.sync_to_block(controller.get_head()).await.unwrap(); + { + // Replaces the top two blocks with just one new one. + let mut blocks = controller.get_blocks_mut(); + blocks.pop(); + blocks.pop(); + blocks.push(MockBlock::new(hash(11), vec![hash(111)])); + } + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 1, + latest_block_hash: hash(11), + earliest_remembered_block_number: 0, + reorg_depth: 2, + mined_ops: vec![fake_mined_op(111)], + unmined_ops: vec![fake_mined_op(101), fake_mined_op(102)], + } + ); + } + + #[tokio::test] + async fn test_reorg_longer_than_history() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(100)]), + MockBlock::new(hash(1), vec![hash(101)]), + MockBlock::new(hash(2), vec![hash(102)]), + MockBlock::new(hash(3), vec![hash(103)]), + ]); + chain.sync_to_block(controller.get_head()).await.unwrap(); + // The history has size 3, so after this update it's completely unrecognizable. + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(100)]), + MockBlock::new(hash(11), vec![hash(111)]), + MockBlock::new(hash(12), vec![hash(112)]), + MockBlock::new(hash(13), vec![hash(113)]), + ]); + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 3, + latest_block_hash: hash(13), + earliest_remembered_block_number: 1, + reorg_depth: 3, + mined_ops: vec![fake_mined_op(111), fake_mined_op(112), fake_mined_op(113)], + unmined_ops: vec![fake_mined_op(101), fake_mined_op(102), fake_mined_op(103)], + } + ); + } + + #[tokio::test] + async fn test_advance_larger_than_history_size() { + let (mut chain, controller) = new_chain(); + controller.set_blocks(vec![ + MockBlock::new(hash(0), vec![hash(100)]), + MockBlock::new(hash(1), vec![hash(101)]), + MockBlock::new(hash(2), vec![hash(102)]), + ]); + chain.sync_to_block(controller.get_head()).await.unwrap(); + { + let mut blocks = controller.get_blocks_mut(); + for i in 3..7 { + blocks.push(MockBlock::new(hash(10 + i), vec![hash(100 + i)])); + } + } + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 6, + latest_block_hash: hash(16), + earliest_remembered_block_number: 4, + reorg_depth: 0, + mined_ops: vec![fake_mined_op(104), fake_mined_op(105), fake_mined_op(106)], + unmined_ops: vec![], + } + ); + } + + /// This test probably only matters for running against a local chain. + #[tokio::test] + async fn test_latest_block_number_smaller_than_history_size() { + let (mut chain, controller) = new_chain(); + let blocks = vec![ + MockBlock::new(hash(0), vec![hash(101), hash(102)]), + MockBlock::new(hash(1), vec![hash(103)]), + ]; + controller.set_blocks(blocks); + let update = chain.sync_to_block(controller.get_head()).await.unwrap(); + assert_eq!( + update, + ChainUpdate { + latest_block_number: 1, + latest_block_hash: hash(1), + earliest_remembered_block_number: 0, + reorg_depth: 0, + mined_ops: vec![fake_mined_op(101), fake_mined_op(102), fake_mined_op(103),], + unmined_ops: vec![], + } + ); + } + + fn new_chain() -> (Chain, ProviderController) { + let (provider, controller) = new_mock_provider(); + let chain = Chain::new( + Arc::new(provider), + Settings { + history_size: HISTORY_SIZE, + poll_interval: Duration::from_secs(250), // Not used in tests. + entry_point_addresses: vec![ENTRY_POINT_ADDRESS], + }, + ); + (chain, controller) + } + + fn new_mock_provider() -> (impl ProviderLike, ProviderController) { + let controller = ProviderController { + blocks: Arc::new(RwLock::new(vec![])), + }; + let mut provider = MockProviderLike::new(); + + provider.expect_get_block::().returning({ + let controller = controller.clone(); + move |hash| Ok(controller.get_block_by_hash(hash)) + }); + + provider.expect_get_logs().returning({ + let controller = controller.clone(); + move |filter| { + let FilterBlockOption::AtBlockHash(block_hash) = filter.block_option else { + panic!("mock provider only suppoerts getLogs at specific block hashes"); + }; + Ok(controller.get_logs_by_block_hash(block_hash)) + } + }); + + (provider, controller) + } + + fn fake_log(op_hash: H256) -> Log { + Log { + address: ENTRY_POINT_ADDRESS, + topics: vec![ + H256::from(utils::keccak256( + UserOperationEventFilter::abi_signature().as_bytes(), + )), + op_hash, + H256::zero(), // Sender + H256::zero(), // Paymaster + ], + data: AbiEncode::encode(( + U256::zero(), // nonce + true, // success + U256::zero(), // actual_gas_cost + U256::zero(), // actual_gas_used + )) + .into(), + ..Default::default() + } + } + + fn fake_mined_op(n: u8) -> MinedOp { + MinedOp { + hash: hash(n), + entry_point: ENTRY_POINT_ADDRESS, + sender: Address::zero(), + nonce: U256::zero(), + } + } + + /// Helper that makes fake hashes. + fn hash(n: u8) -> H256 { + let mut hash = H256::zero(); + hash.0[0] = n; + hash + } +} diff --git a/src/op_pool/event/http.rs b/src/op_pool/event/http.rs deleted file mode 100644 index d042f1bf9..000000000 --- a/src/op_pool/event/http.rs +++ /dev/null @@ -1,189 +0,0 @@ -use std::time::Duration; - -use ethers::{ - providers::{ - Http, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClientBuilder, StreamExt, - }, - types::{Block, Filter, Log, H256}, -}; -use tokio::{ - select, - sync::{mpsc, oneshot}, - try_join, -}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::async_trait; -use tracing::error; -use url::Url; - -use super::{BlockProvider, BlockProviderError, BlockProviderFactory, BlockWithLogs}; - -/// A block provider factory -#[derive(Debug)] -pub struct HttpBlockProviderFactory { - http_url: String, - poll_interval: Duration, - num_retries: u64, -} - -impl HttpBlockProviderFactory { - pub fn new(http_url: String, poll_interval: Duration, num_retries: u64) -> Self { - Self { - http_url, - poll_interval, - num_retries, - } - } -} - -#[async_trait] -impl BlockProviderFactory for HttpBlockProviderFactory { - type Provider = HttpBlockProvider; - - fn new_provider(&self) -> Self::Provider { - HttpBlockProvider::new(self.http_url.clone(), self.poll_interval, self.num_retries) - } -} - -/// An http provider that uses ethers to stream blocks from a node's http endpoint -/// Polls the node for new blocks at a given interval -#[derive(Debug)] -pub struct HttpBlockProvider { - http_url: String, - poll_interval: Duration, - num_retries: u64, - shutdown_sender: Option>, -} - -impl HttpBlockProvider { - pub fn new(http_url: String, poll_interval: Duration, num_retries: u64) -> Self { - Self { - http_url, - poll_interval, - num_retries, - shutdown_sender: None, - } - } -} - -#[async_trait] -impl BlockProvider for HttpBlockProvider { - type BlockStream = ReceiverStream>; - - async fn subscribe(&mut self, filter: Filter) -> Result { - if self.shutdown_sender.is_some() { - return Err(BlockProviderError::ConnectionError( - "already subscribed".to_string(), - )); - } - - let parsed_url = Url::parse(&self.http_url) - .map_err(|e| BlockProviderError::ConnectionError(e.to_string()))?; - let num_retries = self.num_retries.try_into().map_err(|_| { - BlockProviderError::ConnectionError(format!( - "num_retries {} is too large", - self.num_retries - )) - })?; - let http = Http::new(parsed_url); - let client = RetryClientBuilder::default() - .rate_limit_retries(num_retries) - .timeout_retries(num_retries) - .initial_backoff(self.poll_interval) - .build(http, Box::::default()); - let mut provider = Provider::new(client); - provider.set_interval(self.poll_interval); - - // test the connection - let _ = provider - .get_block_number() - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))?; - - let (tx, rx) = mpsc::channel(10_000); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - self.shutdown_sender = Some(shutdown_tx); - - tokio::spawn(async move { - let mut block_stream = match provider.watch_blocks().await { - Ok(stream) => stream, - Err(err) => { - tracing::error!("Error subscribing to blocks: {:?}", err); - let _ = tx - .send(Err(BlockProviderError::ConnectionError(err.to_string()))) - .await; - return; - } - }; - - loop { - select! { - block = block_stream.next() => { - let msg = match block { - Some(block_hash) => { - let block_filter = filter.clone().at_block_hash(block_hash); - match try_join!(Self::get_block(&provider, block_hash), Self::get_logs(&provider, &block_filter)) { - Ok((block, logs)) => { - Ok(BlockWithLogs { block, logs }) - }, - Err(err) => { - error!("Error getting block or logs: {:?}", err); - Err(err) - } - } - }, - None => { - error!("Block stream ended"); - Err(BlockProviderError::ConnectionError("Block stream ended".to_string())) - } - }; - let was_err = msg.is_err(); - if tx.send(msg).await.is_err() { - error!("Receiver dropped"); - return; - } - if was_err { - error!("Provider error getting block or logs, ending subscription"); - return; - } - }, - _ = &mut shutdown_rx => { - return; - } - } - } - }); - - Ok(ReceiverStream::new(rx)) - } - - async fn unsubscribe(&mut self) { - let shutdown_sender = self.shutdown_sender.take(); - if let Some(ss) = shutdown_sender { - let _ = ss.send(()); - } - } -} - -impl HttpBlockProvider { - async fn get_block( - provider: &P, - block_hash: H256, - ) -> Result, BlockProviderError> { - provider - .get_block(block_hash) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))? - .ok_or_else(|| BlockProviderError::RpcError("Block not found".to_string())) - } - - async fn get_logs( - provider: &P, - filter: &Filter, - ) -> Result, BlockProviderError> { - provider - .get_logs(filter) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string())) - } -} diff --git a/src/op_pool/event/listener.rs b/src/op_pool/event/listener.rs deleted file mode 100644 index e7c15b03b..000000000 --- a/src/op_pool/event/listener.rs +++ /dev/null @@ -1,609 +0,0 @@ -use std::{ - cmp, - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::Context; -use ethers::{ - prelude::parse_log, - types::{Address, Filter}, -}; -use rand::Rng; -use tokio::{ - select, - sync::broadcast, - task::JoinHandle, - time::{sleep, timeout}, -}; -use tokio_stream::StreamExt; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - -use super::{ - BlockProvider, BlockProviderFactory, BlockWithLogs, EntryPointEvent, EventProvider, - NewBlockEvent, -}; - -const MAX_BACKOFF_SECONDS: u64 = 60; - -/// An event listener that listens for new blocks and emits events -/// for each provided entry point -pub struct EventListener { - provider_factory: F, - provider: Option, - log_filter_base: Filter, - entrypoint_event_broadcasts: HashMap>>, - last_reconnect: Option, - backoff_idx: u32, -} - -impl EventProvider for EventListener { - fn subscribe_by_entrypoint( - &self, - entry_point: Address, - ) -> Option>> { - self.entrypoint_event_broadcasts - .get(&entry_point) - .map(|b| b.subscribe()) - } - - fn spawn( - self: Box, - shutdown_token: CancellationToken, - ) -> JoinHandle> { - tokio::spawn(async move { self.listen_with_shutdown(shutdown_token).await }) - } -} - -impl EventListener { - /// Create a new event listener from a block provider factory and list entry points - /// Must call listen_with_shutdown to start listening - pub fn new<'a>( - provider_factory: F, - entry_points: impl IntoIterator, - ) -> Self { - let mut entry_point_addresses = vec![]; - let mut entrypoint_event_broadcasts = HashMap::new(); - for ep in entry_points { - entry_point_addresses.push(*ep); - entrypoint_event_broadcasts.insert(*ep, broadcast::channel(1000).0); - } - - let log_filter_base = Filter::new().address(entry_point_addresses); - - Self { - provider_factory, - provider: None, - log_filter_base, - entrypoint_event_broadcasts, - last_reconnect: None, - backoff_idx: 0, - } - } - - /// Consumes the listener and starts listening for new blocks - /// until the shutdown signal is received - pub async fn listen_with_shutdown( - mut self, - shutdown_token: CancellationToken, - ) -> anyhow::Result<()> { - let mut block_stream = self.reconnect(shutdown_token.clone()).await?; - loop { - select! { - block_with_logs = block_stream.next() => { - match block_with_logs { - Some(Ok(block_with_logs)) => { - if let Err(err) = self.handle_block_event(block_with_logs).await { - error!("Error handling block event: {err:?}"); - EventListenerMetrics::increment_block_handler_errors(); - } - } - Some(Err(err)) => { - error!("Error getting block: {:?}", err); - EventListenerMetrics::increment_provider_errors(); - block_stream = self.reconnect(shutdown_token.clone()).await?; - } - None => { - error!("Block stream ended unexpectedly"); - EventListenerMetrics::increment_provider_errors(); - block_stream = self.reconnect(shutdown_token.clone()).await?; - } - } - } - _ = shutdown_token.cancelled() => { - info!("Shutting down event listener"); - break; - } - } - } - - Ok(()) - } - - async fn reconnect( - &mut self, - shutdown_token: CancellationToken, - ) -> anyhow::Result<::BlockStream> { - if let Some(mut provider) = self.provider.take() { - provider.unsubscribe().await; - } - - let since_last_recovery = self.last_reconnect.map(|t| Instant::now() - t); - if let Some(since_last_recovery) = since_last_recovery { - let expected_backoff = self.backoff_time(); - if since_last_recovery < expected_backoff { - let sleep_duration = expected_backoff - since_last_recovery; - info!( - "Reconnecting too quickly, sleeping {:?} before reconnecting", - sleep_duration - ); - if self - .sleep_or_shutdown(shutdown_token.clone(), sleep_duration) - .await - { - return Err(anyhow::anyhow!("Shutdown signal received")); - } - } else { - info!("Reconnecting after {since_last_recovery:?}, expected backoff {expected_backoff:?}"); - self.backoff_idx = 0; - } - } - - loop { - self.provider = Some(self.provider_factory.new_provider()); - match timeout( - Duration::from_secs(5), - self.provider - .as_mut() - .unwrap() - .subscribe(self.log_filter_base.clone()), - ) - .await - { - Ok(Ok(block_stream)) => { - info!("Connected to event provider"); - self.last_reconnect = Some(Instant::now()); - return Ok(block_stream); - } - Ok(Err(err)) => { - EventListenerMetrics::increment_provider_errors(); - let sleep_duration = self.backoff_time(); - error!( - "Error connecting to event provider, sleeping {sleep_duration:?}: {err:?}" - ); - if self - .sleep_or_shutdown(shutdown_token.clone(), sleep_duration) - .await - { - return Err(anyhow::anyhow!("Shutdown signal received")); - } - } - Err(err) => { - EventListenerMetrics::increment_provider_errors(); - let sleep_duration = self.backoff_time(); - error!( - "Timeout connecting to event provider, sleeping {sleep_duration:?}: {err:?}" - ); - if self - .sleep_or_shutdown(shutdown_token.clone(), sleep_duration) - .await - { - return Err(anyhow::anyhow!("Shutdown signal received")); - } - } - } - } - } - - async fn handle_block_event(&self, mut block_with_logs: BlockWithLogs) -> anyhow::Result<()> { - block_with_logs - .logs - .sort_by(|a, b| a.log_index.cmp(&b.log_index)); - let block_hash = block_with_logs - .block - .hash - .context("block should have hash")?; - let block_number = block_with_logs - .block - .number - .context("block should have number")?; - let mut block_events = HashMap::new(); - - EventListenerMetrics::increment_blocks_seen(); - EventListenerMetrics::set_block_height(block_number.as_u64()); - - for ep in self.entrypoint_event_broadcasts.keys() { - block_events.insert( - *ep, - NewBlockEvent { - address: *ep, - hash: block_hash, - number: block_number, - events: vec![], - }, - ); - } - - for log in block_with_logs.logs { - let ep_address = log.address; - if !block_events.contains_key(&ep_address) { - error!("Received log for unknown entrypoint {ep_address:?}"); - continue; - } - - let txn_hash = log - .transaction_hash - .context("log should have transaction hash")?; - let txn_index = log - .transaction_index - .context("log should have transaction index")?; - - let event = EntryPointEvent { - contract_event: parse_log(log)?, - txn_hash, - txn_index, - }; - - block_events.entry(ep_address).and_modify(|e| { - e.events.push(event); - }); - EventListenerMetrics::increment_events_seen(); - } - - for (ep, block_event) in block_events { - match self.entrypoint_event_broadcasts.get(&ep) { - Some(broadcast) => { - // ignore sender errors, which can only happen if there are no receivers - let _ = broadcast.send(Arc::new(block_event)); - } - None => { - error!("No broadcast channel for entry point: {:?}", ep); - } - } - } - - Ok(()) - } - - fn backoff_time(&self) -> Duration { - let mut rng = rand::thread_rng(); - let jitter = rng.gen_range(0..1000); - let millis = cmp::min(MAX_BACKOFF_SECONDS, 2_u64.pow(self.backoff_idx)) * 1000 + jitter; - Duration::from_millis(millis) - } - - async fn sleep_or_shutdown( - &mut self, - shutdown_token: CancellationToken, - duration: Duration, - ) -> bool { - select! { - _ = sleep(duration) => { - self.backoff_idx += 1; - false - }, - _ = shutdown_token.cancelled() => { - info!("Shutting down event listener"); - true - } - } - } -} - -struct EventListenerMetrics {} - -impl EventListenerMetrics { - fn set_block_height(block_height: u64) { - metrics::gauge!("op_pool_event_listener_block_height", block_height as f64); - } - - fn increment_blocks_seen() { - metrics::increment_counter!("op_pool_event_listener_blocks_seen"); - } - - fn increment_provider_errors() { - metrics::increment_counter!("op_pool_event_listener_provider_errors"); - } - - fn increment_block_handler_errors() { - metrics::increment_counter!("op_pool_event_listener_block_handler_errors"); - } - - fn increment_events_seen() { - metrics::increment_counter!("op_pool_event_listener_events_seen"); - } -} - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicBool, Ordering}; - - use ethers::{ - abi::{encode, Token}, - contract::EthEvent, - types::{Block, Log, H256, U256, U64}, - }; - use tokio::{ - join, - sync::{broadcast, mpsc}, - task::JoinHandle, - }; - - use super::*; - use crate::{ - common::contracts::i_entry_point::{IEntryPointEvents, UserOperationEventFilter}, - op_pool::event::{mock::MockBlockProviderFactory, BlockProviderError}, - }; - - #[tokio::test] - async fn start_stop() { - let state = setup().await; - teardown(state).await; - } - - #[tokio::test] - async fn single_block_no_logs() { - let mut state = setup().await; - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - - teardown(state).await; - } - - #[tokio::test] - async fn single_block_with_log() { - let mut state = setup().await; - let (event, log) = make_random_log(state.ep); - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![log], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - assert_eq!(block_event.events.len(), 1); - - if let IEntryPointEvents::UserOperationEventFilter(block_event) = - &block_event.events[0].contract_event - { - assert_eq!(block_event, &event); - } else { - panic!("wrong event type"); - } - - teardown(state).await; - } - - #[tokio::test] - async fn multile_blocks_with_log() { - let mut state = setup().await; - - for n in 0..5 { - // send a block to the listener - let (event, log) = make_random_log(state.ep); - let block_hash = H256::random(); - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(block_hash), - number: Some(U64::from(n)), - ..Default::default() - }, - logs: vec![log], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::from(n)); - assert_eq!(block_event.events.len(), 1); - - if let IEntryPointEvents::UserOperationEventFilter(block_event) = - &block_event.events[0].contract_event - { - assert_eq!(block_event, &event); - } else { - panic!("wrong event type"); - } - } - - teardown(state).await; - } - - #[tokio::test] - async fn reconnect_once() { - let mut state = setup().await; - - // send an error to the listener - state - .tx - .send(Err(BlockProviderError::ConnectionError( - "error".to_string(), - ))) - .unwrap(); - - // wait for the listener to reconnect - state.connection_event_rx.recv().await.unwrap(); - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - - teardown(state).await; - } - - #[tokio::test] - async fn reconnect_multiple() { - let mut state = setup().await; - - state.set_fail_connection(); - // send an initial error to the listener, causing reconnect - state - .tx - .send(Err(BlockProviderError::ConnectionError( - "error".to_string(), - ))) - .unwrap(); - - // wait for the listener to reconnect 2 times, then allow through - for _ in 0..2 { - state.connection_event_rx.recv().await.unwrap(); - } - state.unset_fail_connection(); - - // wait for the listener to connect - state.connection_event_rx.recv().await.unwrap(); - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - - teardown(state).await; - } - - struct TestState { - ep: Address, - tx: broadcast::Sender>, - shutdown_token: CancellationToken, - handle: JoinHandle>, - events: broadcast::Receiver>, - connection_event_rx: mpsc::Receiver<()>, - should_fail_connection: Arc, - } - - impl TestState { - fn set_fail_connection(&self) { - self.should_fail_connection.store(true, Ordering::SeqCst); - } - - fn unset_fail_connection(&self) { - self.should_fail_connection.store(false, Ordering::SeqCst); - } - } - - async fn setup() -> TestState { - let (tx, rx) = broadcast::channel(100); - let shutdown_token = CancellationToken::new(); - let (connection_event_tx, mut connection_event_rx) = mpsc::channel(100); - let should_fail_connection = Arc::new(AtomicBool::new(false)); - let factory = - MockBlockProviderFactory::new(rx, connection_event_tx, should_fail_connection.clone()); - let ep = Address::random(); - let listener = EventListener::new(factory, vec![&ep]); - let events = listener.subscribe_by_entrypoint(ep).unwrap(); - let listener_shutdown = shutdown_token.clone(); - let handle = - tokio::spawn(async move { listener.listen_with_shutdown(listener_shutdown).await }); - - // wait for the listener to connect - connection_event_rx.recv().await.unwrap(); - - TestState { - ep, - tx, - shutdown_token, - handle, - events, - connection_event_rx, - should_fail_connection, - } - } - - async fn teardown(state: TestState) { - // send a shutdown signal - state.shutdown_token.cancel(); - // wait for the listener to shutdown - join!(state.handle).0.unwrap().unwrap(); - } - - fn make_random_log(ep: Address) -> (UserOperationEventFilter, Log) { - let hash = H256::random(); - let sender = Address::random(); - let paymaster = Address::random(); - - // UserOperationEvent([INDEXED]bytes32,address,address,[NON_INDEXED]uint256,bool,uint256,uint256) - let log = Log { - address: ep, - topics: vec![ - UserOperationEventFilter::signature(), - hash, - address_to_topic(sender), - address_to_topic(paymaster), - ], - data: encode(&[ - Token::Uint(U256::zero()), - Token::Bool(false), - Token::Uint(U256::zero()), - Token::Uint(U256::zero()), - ]) - .into(), - block_hash: Some(H256::zero()), - block_number: Some(U64::zero()), - transaction_hash: Some(H256::zero()), - transaction_index: Some(U64::zero()), - ..Default::default() - }; - let event: UserOperationEventFilter = parse_log(log.clone()).unwrap(); - (event, log) - } - - fn address_to_topic(src: Address) -> H256 { - let mut bytes = [0; 32]; - bytes[12..32].copy_from_slice(src.as_bytes()); - H256::from(bytes) - } -} diff --git a/src/op_pool/event/mock.rs b/src/op_pool/event/mock.rs deleted file mode 100644 index c6ddd4ea2..000000000 --- a/src/op_pool/event/mock.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use ethers::types::Filter; -use tokio::sync::{broadcast, mpsc}; -use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt}; -use tonic::async_trait; - -use super::{BlockProvider, BlockProviderError, BlockProviderFactory, BlockWithLogs}; - -pub struct MockBlockProvider { - rx: broadcast::Receiver>, - fail_subscription: bool, - subscription_event: mpsc::Sender<()>, -} - -#[async_trait] -impl BlockProvider for MockBlockProvider { - type BlockStream = - Box> + Send + Unpin>; - - async fn subscribe( - &mut self, - _filter: Filter, - ) -> Result { - let ret = match self.fail_subscription { - true => Err(BlockProviderError::ConnectionError( - "forced error".to_string(), - )), - false => Ok(Box::new( - BroadcastStream::new(self.rx.resubscribe()).map(|res| match res { - Ok(res) => res, - Err(_) => Err(BlockProviderError::ConnectionError( - "strem recv error".to_string(), - )), - }), - ) as Self::BlockStream), - }; - self.subscription_event - .send(()) - .await - .expect("failed to send connection event"); - ret - } - - async fn unsubscribe(&mut self) {} -} - -pub struct MockBlockProviderFactory { - rx: broadcast::Receiver>, - connection_event_tx: mpsc::Sender<()>, - should_fail_connection: Arc, -} - -impl BlockProviderFactory for MockBlockProviderFactory { - type Provider = MockBlockProvider; - - fn new_provider(&self) -> Self::Provider { - MockBlockProvider { - fail_subscription: self.should_fail_connection.load(Ordering::SeqCst), - rx: self.rx.resubscribe(), - subscription_event: self.connection_event_tx.clone(), - } - } -} - -impl MockBlockProviderFactory { - pub fn new( - rx: broadcast::Receiver>, - connection_event_tx: mpsc::Sender<()>, - should_fail_connection: Arc, - ) -> Self { - Self { - rx, - connection_event_tx, - should_fail_connection, - } - } -} diff --git a/src/op_pool/event/mod.rs b/src/op_pool/event/mod.rs deleted file mode 100644 index 974d107c2..000000000 --- a/src/op_pool/event/mod.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::sync::Arc; - -use ethers::types::{Address, Block, Filter, Log, H256, U64}; -use tokio::{sync::broadcast, task::JoinHandle}; -use tokio_util::sync::CancellationToken; -use tonic::async_trait; - -use crate::common::contracts::i_entry_point::IEntryPointEvents; - -mod listener; -pub use listener::EventListener; -#[cfg(test)] -mod mock; -mod ws; -pub use ws::WsBlockProviderFactory; -mod http; -pub use http::HttpBlockProviderFactory; - -/// Event when a new block is mined. -/// Events correspond to a single entry point -#[derive(Debug)] -pub struct NewBlockEvent { - /// The entry point address - pub address: Address, - /// The block hash - pub hash: H256, - /// The block number - pub number: U64, - /// Ordered EntryPoint events - pub events: Vec, -} - -/// An event emitted by an entry point with metadata -#[derive(Debug)] -pub struct EntryPointEvent { - /// The entry point contract event - pub contract_event: IEntryPointEvents, - /// The transaction hash that emitted the event - pub txn_hash: H256, - /// The transaction index that emitted the event - pub txn_index: U64, -} - -/// A trait that provides a stream of new blocks with their events by entrypoint -pub trait EventProvider: Send + Sync { - /// Subscribe to new blocks by entrypoint - fn subscribe_by_entrypoint( - &self, - entry_point: Address, - ) -> Option>>; - - /// Spawn the event provider - fn spawn( - self: Box, - shutdown_token: CancellationToken, - ) -> JoinHandle>; -} - -/// A factory that creates a new event provider -#[async_trait] -pub trait BlockProviderFactory: Send + Sync + 'static { - type Provider: BlockProvider + Send + Sync + 'static; - - /// Create a new block provider - fn new_provider(&self) -> Self::Provider; -} - -/// Block provider errors -#[derive(Debug, Clone, thiserror::Error)] -pub enum BlockProviderError { - #[error("Connection error: {0}")] - ConnectionError(String), - #[error("Rpc error: {0}")] - RpcError(String), -} - -/// A block with its logs correspoinding to a filter -/// given to a block provider -#[derive(Debug, Clone)] -pub struct BlockWithLogs { - /// The block - block: Block, - /// The logs that correspond to the filter - logs: Vec, -} - -/// A trait that provides a stream of blocks -#[async_trait] -pub trait BlockProvider { - type BlockStream: tokio_stream::Stream> - + Send - + Unpin; - - /// Subscribe to a block stream - async fn subscribe(&mut self, filter: Filter) -> Result; - - /// Unsubscribe from a block stream - async fn unsubscribe(&mut self); -} diff --git a/src/op_pool/event/ws.rs b/src/op_pool/event/ws.rs deleted file mode 100644 index fb3914fda..000000000 --- a/src/op_pool/event/ws.rs +++ /dev/null @@ -1,144 +0,0 @@ -use ethers::{ - providers::{Middleware, Provider, StreamExt, Ws}, - types::{Block, Filter, H256}, -}; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::async_trait; - -use super::{BlockProvider, BlockProviderError, BlockProviderFactory, BlockWithLogs}; - -/// A block provider factory that uses an ethers websocket provider -/// to stream blocks from a node's websocket endpoint -pub struct WsBlockProviderFactory { - ws_url: String, - num_retries: usize, -} - -impl WsBlockProviderFactory { - pub fn new(ws_url: String, num_retries: usize) -> Self { - Self { - ws_url, - num_retries, - } - } -} - -#[async_trait] -impl BlockProviderFactory for WsBlockProviderFactory { - type Provider = WsBlockProvider; - - fn new_provider(&self) -> Self::Provider { - WsBlockProvider::new(self.ws_url.clone(), self.num_retries) - } -} - -/// A block provider that uses an ethers websocket provider -#[derive(Debug)] -pub struct WsBlockProvider { - ws_url: String, - num_retries: usize, - shutdown_sender: Option>, -} - -impl WsBlockProvider { - pub fn new(ws_url: String, num_retries: usize) -> Self { - Self { - ws_url, - num_retries, - shutdown_sender: None, - } - } -} - -#[async_trait] -impl BlockProvider for WsBlockProvider { - type BlockStream = ReceiverStream>; - - async fn subscribe(&mut self, filter: Filter) -> Result { - if self.shutdown_sender.is_some() { - return Err(BlockProviderError::ConnectionError( - "already subscribed".to_string(), - )); - } - - let provider = - Provider::::connect_with_reconnects(self.ws_url.clone(), self.num_retries) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))?; - - let (tx, rx) = mpsc::channel(10_000); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - self.shutdown_sender = Some(shutdown_tx); - - tokio::spawn(async move { - let mut block_subscription = match provider.subscribe_blocks().await { - Ok(sub) => sub, - Err(err) => { - tracing::error!("Error subscribing to blocks: {:?}", err); - if tx - .send(Err(BlockProviderError::ConnectionError(err.to_string()))) - .await - .is_err() - { - tracing::error!("Receiver dropped"); - } - return; - } - }; - - loop { - tokio::select! { - res = block_subscription.next() => { - let msg = match res { - Some(block) => { - Self::process_block(&provider, block, &filter).await - } - None => { - tracing::error!("Block subscription ended"); - Err(BlockProviderError::ConnectionError("block subscription closed".to_string())) - } - }; - let was_err = msg.is_err(); - if tx.send(msg).await.is_err() { - tracing::error!("Receiver dropped"); - return; - } - if was_err { - return; - } - }, - _ = &mut shutdown_rx => { - return; - } - } - } - }); - - Ok(ReceiverStream::new(rx)) - } - - async fn unsubscribe(&mut self) { - let shutdown_sender = self.shutdown_sender.take(); - if let Some(ss) = shutdown_sender { - let _ = ss.send(()); - } - } -} - -impl WsBlockProvider { - async fn process_block( - provider: &Provider, - block: Block, - filter: &Filter, - ) -> Result { - let block_hash = block.hash.unwrap_or_default(); - let filter = filter.clone().at_block_hash(block_hash); - let logs = provider - .get_logs(&filter) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))?; - let block_with_logs = BlockWithLogs { block, logs }; - Ok(block_with_logs) - } -} diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index e720cb68f..0e027d3ec 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -9,8 +9,10 @@ use ethers::types::{Address, H256}; use strum::IntoEnumIterator; use self::error::MempoolResult; -use super::{event::NewBlockEvent, reputation::Reputation}; -use crate::common::types::{Entity, EntityType, UserOperation, ValidTimeRange}; +use crate::{ + common::types::{Entity, EntityType, UserOperation, ValidTimeRange}, + op_pool::{chain::ChainUpdate, reputation::Reputation}, +}; /// In-memory operation pool pub trait Mempool: Send + Sync { @@ -19,8 +21,8 @@ pub trait Mempool: Send + Sync { /// Event listener for when a new block is mined. /// - /// Pool is updated according to the new blocks events. - fn on_new_block(&self, event: &NewBlockEvent); + /// Pool is updated according to the chain update event. + fn on_chain_update(&self, update: &ChainUpdate); /// Adds a validated user operation to the pool. /// @@ -94,6 +96,9 @@ pub enum OperationOrigin { Local, /// The operation was discovered via the P2P gossip protocol. External, + /// The operation was returned to the pool when the block it was in was + /// reorged away. + ReturnedAfterReorg, } /// A user operation with additional metadata from validation. diff --git a/src/op_pool/mempool/pool.rs b/src/op_pool/mempool/pool.rs index fa20d11b2..8b7548f09 100644 --- a/src/op_pool/mempool/pool.rs +++ b/src/op_pool/mempool/pool.rs @@ -9,6 +9,7 @@ use ethers::{ abi::Address, types::{H256, U256}, }; +use tracing::info; use super::{ error::{MempoolError, MempoolResult}, @@ -23,19 +24,26 @@ use crate::common::{ /// Pool of user operations #[derive(Debug)] pub struct PoolInner { - // Pool settings + /// Pool settings config: PoolConfig, - // Operations by hash + /// Operations by hash by_hash: HashMap, - // Operations by operation ID + /// Operations by operation ID by_id: HashMap, - // Best operations, sorted by gas price + /// Best operations, sorted by gas price best: BTreeSet, - // Count of operations by sender + /// Removed operations, temporarily kept around in case their blocks are + /// reorged away. Stored along with the block number at which it was + /// removed. + mined_at_block_number_by_hash: HashMap, + /// Removed operation hashes sorted by block number, so we can forget them + /// when enough new blocks have passed. + mined_hashes_with_block_numbers: BTreeSet<(u64, H256)>, + /// Count of operations by sender count_by_address: HashMap, - // Submission ID counter + /// Submission ID counter submission_id: u64, - // keeps track of the size of the pool in bytes + /// keeps track of the size of the pool in bytes size: SizeTracker, } @@ -46,6 +54,8 @@ impl PoolInner { by_hash: HashMap::new(), by_id: HashMap::new(), best: BTreeSet::new(), + mined_at_block_number_by_hash: HashMap::new(), + mined_hashes_with_block_numbers: BTreeSet::new(), count_by_address: HashMap::new(), submission_id: 0, size: SizeTracker::default(), @@ -53,6 +63,110 @@ impl PoolInner { } pub fn add_operation(&mut self, op: PoolOperation) -> MempoolResult { + self.add_operation_internal(Arc::new(op), None) + } + + pub fn add_operations( + &mut self, + operations: impl IntoIterator, + ) -> Vec> { + operations + .into_iter() + .map(|op| self.add_operation(op)) + .collect() + } + + pub fn best_operations(&self) -> impl Iterator> { + self.best.clone().into_iter().map(|v| v.po) + } + + pub fn address_count(&self, address: Address) -> usize { + self.count_by_address.get(&address).copied().unwrap_or(0) + } + + pub fn remove_operation_by_hash(&mut self, hash: H256) -> Option> { + self.remove_operation_internal(hash, None) + } + + pub fn mine_operation(&mut self, hash: H256, block_number: u64) -> Option> { + self.remove_operation_internal(hash, Some(block_number)) + } + + pub fn unmine_operation(&mut self, hash: H256) -> Option> { + let (op, block_number) = self.mined_at_block_number_by_hash.remove(&hash)?; + self.mined_hashes_with_block_numbers + .remove(&(block_number, hash)); + if let Err(error) = self.put_back_unmined_operation(op.clone()) { + info!("Could not put back unmined operation: {error}"); + }; + Some(op.po) + } + + /// Removes all operations using the given entity, returning the hashes of + /// the removed operations. + pub fn remove_entity(&mut self, entity: Entity) -> Vec { + let to_remove = self + .by_hash + .iter() + .filter(|(_, uo)| uo.po.contains_entity(&entity)) + .map(|(hash, _)| *hash) + .collect::>(); + for &hash in &to_remove { + self.remove_operation_by_hash(hash); + } + to_remove + } + + pub fn forget_mined_operations_before_block(&mut self, block_number: u64) { + while let Some(&(bn, hash)) = self + .mined_hashes_with_block_numbers + .first() + .filter(|(bn, _)| *bn < block_number) + { + self.mined_at_block_number_by_hash.remove(&hash); + self.mined_hashes_with_block_numbers.remove(&(bn, hash)); + } + } + + pub fn clear(&mut self) { + self.by_hash.clear(); + self.by_id.clear(); + self.best.clear(); + self.mined_at_block_number_by_hash.clear(); + self.mined_hashes_with_block_numbers.clear(); + self.count_by_address.clear(); + self.size = SizeTracker::default(); + } + + fn enforce_size(&mut self) -> anyhow::Result> { + let mut removed = Vec::new(); + + while self.size > self.config.max_size_of_pool_bytes { + if let Some(worst) = self.best.pop_last() { + let hash = worst + .uo() + .op_hash(self.config.entry_point, self.config.chain_id); + + let _ = self + .remove_operation_by_hash(hash) + .context("should have removed the worst operation")?; + + removed.push(hash); + } + } + + Ok(removed) + } + + fn put_back_unmined_operation(&mut self, op: OrderedPoolOperation) -> MempoolResult { + self.add_operation_internal(op.po, Some(op.submission_id)) + } + + fn add_operation_internal( + &mut self, + op: Arc, + submission_id: Option, + ) -> MempoolResult { // Check for replacement by ID if let Some(pool_op) = self.by_id.get(&op.uo.id()) { if op.uo.max_fee_per_gas > u128::MAX.into() @@ -99,8 +213,8 @@ impl PoolInner { } let pool_op = OrderedPoolOperation { - po: Arc::new(op), - submission_id: self.next_submission_id(), + po: op, + submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()), }; // update counts @@ -130,83 +244,28 @@ impl PoolInner { Ok(hash) } - pub fn add_operations( + fn remove_operation_internal( &mut self, - operations: impl IntoIterator, - ) -> Vec> { - operations - .into_iter() - .map(|op| self.add_operation(op)) - .collect() - } - - pub fn best_operations(&self) -> impl Iterator> { - self.best.clone().into_iter().map(|v| v.po) - } - - pub fn address_count(&self, address: Address) -> usize { - self.count_by_address.get(&address).copied().unwrap_or(0) - } - - pub fn remove_operation_by_hash(&mut self, hash: H256) -> Option> { - if let Some(op) = self.by_hash.remove(&hash) { - self.by_id.remove(&op.uo().id()); - self.best.remove(&op); - - for e in op.po.entities() { - self.decrement_address_count(e.address); - } - - self.size -= op.size(); - metrics::gauge!("op_pool_num_ops_in_pool", self.by_hash.len() as f64, "entrypoint_addr" => self.config.entry_point.to_string()); - metrics::gauge!("op_pool_size_bytes", self.size.0 as f64, "entrypoint_addr" => self.config.entry_point.to_string()); - return Some(op.po); + hash: H256, + block_number: Option, + ) -> Option> { + let op = self.by_hash.remove(&hash)?; + self.by_id.remove(&op.uo().id()); + self.best.remove(&op); + if let Some(block_number) = block_number { + self.mined_at_block_number_by_hash + .insert(hash, (op.clone(), block_number)); + self.mined_hashes_with_block_numbers + .insert((block_number, hash)); } - - None - } - - /// Removes all operations using the given entity, returning the hashes of - /// the removed operations. - pub fn remove_entity(&mut self, entity: Entity) -> Vec { - let to_remove = self - .by_hash - .iter() - .filter(|(_, uo)| uo.po.contains_entity(&entity)) - .map(|(hash, _)| *hash) - .collect::>(); - for &hash in &to_remove { - self.remove_operation_by_hash(hash); + for e in op.po.entities() { + self.decrement_address_count(e.address); } - to_remove - } - - pub fn clear(&mut self) { - self.by_hash.clear(); - self.by_id.clear(); - self.best.clear(); - self.count_by_address.clear(); - self.size = SizeTracker::default(); - } - - fn enforce_size(&mut self) -> anyhow::Result> { - let mut removed = Vec::new(); - while self.size > self.config.max_size_of_pool_bytes { - if let Some(worst) = self.best.pop_last() { - let hash = worst - .uo() - .op_hash(self.config.entry_point, self.config.chain_id); - - let _ = self - .remove_operation_by_hash(hash) - .context("should have removed the worst operation")?; - - removed.push(hash); - } - } - - Ok(removed) + self.size -= op.size(); + metrics::gauge!("op_pool_num_ops_in_pool", self.by_hash.len() as f64, "entrypoint_addr" => self.config.entry_point.to_string()); + metrics::gauge!("op_pool_size_bytes", self.size.0 as f64, "entrypoint_addr" => self.config.entry_point.to_string()); + Some(op.po) } fn decrement_address_count(&mut self, address: Address) { diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index c26806a64..35d9b8eb7 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -7,6 +7,7 @@ use ethers::types::{Address, H256}; use parking_lot::RwLock; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; +use tracing::info; use super::{ error::{MempoolError, MempoolResult}, @@ -14,10 +15,10 @@ use super::{ Mempool, OperationOrigin, PoolConfig, PoolOperation, }; use crate::{ - common::{contracts::i_entry_point::IEntryPointEvents, emit::WithEntryPoint, types::Entity}, + common::{emit::WithEntryPoint, types::Entity}, op_pool::{ + chain::ChainUpdate, emit::{EntityReputation, EntityStatus, EntitySummary, OpPoolEvent, OpRemovalReason}, - event::NewBlockEvent, reputation::{Reputation, ReputationManager, ReputationStatus}, }, }; @@ -68,7 +69,7 @@ where pub async fn run( self: Arc, - mut new_block_events: broadcast::Receiver>, + mut chain_events: broadcast::Receiver>, shutdown_token: CancellationToken, ) { loop { @@ -77,9 +78,9 @@ where tracing::info!("Shutting down UoPool"); break; } - new_block = new_block_events.recv() => { - if let Ok(new_block) = new_block { - self.on_new_block(&new_block); + update = chain_events.recv() => { + if let Ok(update) = update { + self.on_chain_update(&update); } } } @@ -102,42 +103,71 @@ where self.entry_point } - fn on_new_block(&self, new_block: &NewBlockEvent) { + fn on_chain_update(&self, update: &ChainUpdate) { let mut state = self.state.write(); - tracing::info!( - "New block: {:?} with {} entrypoint events", - new_block.number, - new_block.events.len() - ); - for event in &new_block.events { - if let IEntryPointEvents::UserOperationEventFilter(uo_event) = &event.contract_event { - let op_hash = uo_event.user_op_hash.into(); - if let Some(op) = state.pool.remove_operation_by_hash(op_hash) { - for e in op.staked_entities() { - self.reputation.add_included(e.address); - } + let deduped_ops = update.deduped_ops(); + let mined_ops = deduped_ops + .mined_ops + .iter() + .filter(|op| op.entry_point == self.entry_point); + let unmined_ops = deduped_ops + .unmined_ops + .iter() + .filter(|op| op.entry_point == self.entry_point); + let mut mined_op_count = 0; + let mut unmined_op_count = 0; + for op in mined_ops { + mined_op_count += 1; + // Remove throttled ops that were included in the block + state.throttled_ops.remove(&op.hash); + if let Some(op) = state + .pool + .mine_operation(op.hash, update.latest_block_number) + { + for entity in op.staked_entities() { + self.reputation.add_included(entity.address); } - - // Remove throttled ops that were included in the block - state.throttled_ops.remove(&op_hash); } } - + for op in unmined_ops { + unmined_op_count += 1; + if let Some(op) = state.pool.unmine_operation(op.hash) { + for entity in op.staked_entities() { + self.reputation.remove_included(entity.address); + } + } + } + if mined_op_count > 0 { + info!( + "{mined_op_count} op(s) mined on entry point {:?} when advancing to block with number {}, hash {:?}.", + self.entry_point, + update.latest_block_number, + update.latest_block_hash, + ); + } + if unmined_op_count > 0 { + info!( + "{unmined_op_count} op(s) unmined in reorg on entry point {:?} when advancing to block with number {}, hash {:?}.", + self.entry_point, + update.latest_block_number, + update.latest_block_hash, + ); + } + state + .pool + .forget_mined_operations_before_block(update.earliest_remembered_block_number); // Remove throttled ops that are too old - let new_block_number = new_block.number.as_u64(); let mut to_remove = HashSet::new(); for (hash, block) in state.throttled_ops.iter() { - if new_block_number - block > THROTTLED_OPS_BLOCK_LIMIT { + if update.latest_block_number - block > THROTTLED_OPS_BLOCK_LIMIT { to_remove.insert(*hash); } } - for hash in to_remove { state.pool.remove_operation_by_hash(hash); state.throttled_ops.remove(&hash); } - - state.block_number = new_block_number; + state.block_number = update.latest_block_number; } fn add_operation(&self, origin: OperationOrigin, op: PoolOperation) -> MempoolResult { @@ -372,6 +402,8 @@ mod tests { fn add_included(&self, _address: Address) {} + fn remove_included(&self, _address: Address) {} + fn dump_reputation(&self) -> Vec { vec![] } diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 47862bbad..8f54276a9 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -1,5 +1,5 @@ +mod chain; pub mod emit; -mod event; mod mempool; mod reputation; mod server; diff --git a/src/op_pool/reputation.rs b/src/op_pool/reputation.rs index b2de9ac59..aaf3370d8 100644 --- a/src/op_pool/reputation.rs +++ b/src/op_pool/reputation.rs @@ -30,12 +30,18 @@ pub trait ReputationManager: Send + Sync { /// Called by mempool before returning operations to bundler fn status(&self, address: Address) -> ReputationStatus; - /// Called by mempool when an operation that requires stake is added to the pool + /// Called by mempool when an operation that requires stake is added to the + /// pool fn add_seen(&self, address: Address); - /// Called by the mempool when an operation that requires stake is removed from the pool + /// Called by the mempool when an operation that requires stake is removed + /// from the pool fn add_included(&self, address: Address); + /// Called by the mempool when a previously mined operation that requires + /// stake is returned to the pool. + fn remove_included(&self, address: Address); + /// Called by debug API fn dump_reputation(&self) -> Vec; @@ -78,14 +84,18 @@ impl ReputationManager for HourlyMovingAverageReputation { self.reputation.read().status(address) } - fn add_seen<'a>(&self, address: Address) { + fn add_seen(&self, address: Address) { self.reputation.write().add_seen(address); } - fn add_included<'a>(&self, address: Address) { + fn add_included(&self, address: Address) { self.reputation.write().add_included(address); } + fn remove_included(&self, address: Address) { + self.reputation.write().remove_included(address); + } + fn dump_reputation(&self) -> Vec { let reputation = self.reputation.read(); reputation @@ -193,6 +203,11 @@ impl AddressReputation { count.ops_included += 1; } + pub fn remove_included(&mut self, address: Address) { + let count = self.counts.entry(address).or_default(); + count.ops_included = count.ops_included.saturating_sub(1) + } + pub fn set_reputation(&mut self, address: Address, ops_seen: u64, ops_included: u64) { let count = self.counts.entry(address).or_default(); count.ops_seen = ops_seen; diff --git a/src/op_pool/server.rs b/src/op_pool/server.rs index a2e43c49f..09663ab81 100644 --- a/src/op_pool/server.rs +++ b/src/op_pool/server.rs @@ -277,7 +277,7 @@ mod tests { use crate::{ common::types::Entity, op_pool::{ - event::NewBlockEvent, + chain::ChainUpdate, mempool::{error::MempoolResult, PoolOperation}, reputation::Reputation, }, @@ -359,7 +359,7 @@ mod tests { self.entry_point } - fn on_new_block(&self, _event: &NewBlockEvent) {} + fn on_chain_update(&self, _update: &ChainUpdate) {} fn add_operation( &self, diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 88f749afa..d21620dda 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -9,13 +9,15 @@ use tonic::{async_trait, transport::Server}; use crate::{ common::{ emit::WithEntryPoint, + eth, grpc::metrics::GrpcMetricsLayer, - handle::{flatten_handle, Task}, + handle, + handle::Task, protos::op_pool::{op_pool_server::OpPoolServer, OP_POOL_FILE_DESCRIPTOR_SET}, }, op_pool::{ + chain::{self, Chain, ChainUpdate}, emit::OpPoolEvent, - event::{EventListener, EventProvider, HttpBlockProviderFactory, WsBlockProviderFactory}, mempool::{uo_pool::UoPool, Mempool, PoolConfig}, reputation::{HourlyMovingAverageReputation, ReputationParams}, server::OpPoolImpl, @@ -26,10 +28,10 @@ use crate::{ pub struct Args { pub port: u16, pub host: String, - pub ws_url: Option, - pub http_url: Option, + pub http_url: String, pub http_poll_interval: Duration, pub chain_id: u64, + pub chain_history_size: u64, pub pool_configs: Vec, } @@ -44,26 +46,25 @@ impl Task for PoolTask { async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr = format!("{}:{}", self.args.host, self.args.port).parse()?; let chain_id = self.args.chain_id; - let entry_points = self.args.pool_configs.iter().map(|pc| &pc.entry_point); tracing::info!("Starting server on {addr}"); tracing::info!("Chain id: {chain_id}"); - tracing::info!("Websocket url: {:?}", self.args.ws_url); tracing::info!("Http url: {:?}", self.args.http_url); - // Events listener - let event_provider: Box = if let Some(ws_url) = &self.args.ws_url { - let connection_factory = WsBlockProviderFactory::new(ws_url.to_owned(), 10); - Box::new(EventListener::new(connection_factory, entry_points)) - } else if let Some(http_url) = &self.args.http_url { - let connection_factory = HttpBlockProviderFactory::new( - http_url.to_owned(), - self.args.http_poll_interval, - 10, - ); - Box::new(EventListener::new(connection_factory, entry_points)) - } else { - bail!("Either ws_url or http_url must be provided"); + // create chain + let chain_settings = chain::Settings { + history_size: self.args.chain_history_size, + poll_interval: self.args.http_poll_interval, + entry_point_addresses: self + .args + .pool_configs + .iter() + .map(|config| config.entry_point) + .collect(), }; + let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?; + let chain = Chain::new(provider, chain_settings); + let (update_sender, _) = broadcast::channel(1000); + let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone()); // create mempools let mut mempools = Vec::new(); @@ -71,7 +72,7 @@ impl Task for PoolTask { for pool_config in &self.args.pool_configs { let (pool, handle) = PoolTask::create_mempool( pool_config, - event_provider.as_ref(), + update_sender.subscribe(), self.event_sender.clone(), shutdown_token.clone(), ) @@ -96,9 +97,6 @@ impl Task for PoolTask { .context("should have joined mempool handles") }); - // Start events listener - let events_provider_handle = event_provider.spawn(shutdown_token.clone()); - // gRPC server let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_map)); let reflection_service = tonic_reflection::server::Builder::configure() @@ -126,9 +124,9 @@ impl Task for PoolTask { tracing::info!("Started op_pool"); match try_join!( - flatten_handle(mempool_handle), - flatten_handle(server_handle), - flatten_handle(events_provider_handle) + handle::flatten_handle(mempool_handle), + handle::flatten_handle(server_handle), + handle::as_anyhow_handle(chain_handle), ) { Ok(_) => { tracing::info!("Pool server shutdown"); @@ -156,11 +154,10 @@ impl PoolTask { async fn create_mempool( pool_config: &PoolConfig, - event_provider: &dyn EventProvider, + update_rx: broadcast::Receiver>, event_sender: broadcast::Sender>, shutdown_token: CancellationToken, ) -> anyhow::Result<(Arc>, JoinHandle<()>)> { - let entry_point = pool_config.entry_point; // Reputation manager let reputation = Arc::new(HourlyMovingAverageReputation::new( ReputationParams::bundler_default(), @@ -177,15 +174,9 @@ impl PoolTask { Arc::clone(&reputation), event_sender, )); - // Start mempool - let mempool_events = event_provider - .subscribe_by_entrypoint(entry_point) - .context("event listener should have entrypoint subscriber")?; let mp_runner = Arc::clone(&mp); let handle = - tokio::spawn( - async move { mp_runner.run(mempool_events, shutdown_token.clone()).await }, - ); + tokio::spawn(async move { mp_runner.run(update_rx, shutdown_token.clone()).await }); Ok((mp, handle)) }