>,
- max_failure_count: u8,
- failure_count: u8,
- tick_interval: Duration,
-}
-
-impl Challenger
-where
- P: ScribeOptimisticProvider + 'static,
-{
- pub fn new(
- address: Address,
- contract_provider: P,
- tick_interval: Option,
- max_failure_count: Option,
- ) -> Self {
- Self {
- address,
- contract_provider,
- last_processed_block: None,
- challenge_period_in_sec: 0,
- challenge_period_last_updated_at: None,
- failure_count: 0,
- max_failure_count: max_failure_count.unwrap_or(MAX_FAILURE_COUNT),
- tick_interval: Duration::from_millis(
- tick_interval.unwrap_or(DEFAULT_CHECK_INTERVAL_IN_MS),
- ),
- }
- }
-
- // Sets last processed block number in challenger
- fn set_last_processed_block(&mut self, block: U64) {
- self.last_processed_block = Some(block);
-
- // Updating last scanned block metric
- metrics::set_last_scanned_block(
- self.address,
- self.contract_provider.get_from().unwrap_or_default(),
- block.as_u64() as i64,
- );
- }
-
- // Reloads challenge period from contract.
- // This function have to be called every N time, because challenge period can be changed by contract owner.
- async fn reload_challenge_period(&mut self) -> Result<()> {
- let challenge_period_in_sec = self.contract_provider.get_challenge_period().await?;
-
- debug!(
- "[{:?}] Reloaded opChallenge period for contract is {:?}",
- self.address, challenge_period_in_sec
- );
- self.challenge_period_in_sec = challenge_period_in_sec;
- self.challenge_period_last_updated_at = Some(Utc::now());
-
- Ok(())
- }
-
- // Reloads the challenge period from the contract if it has not been updated within the default challenge period reload interval.
- async fn reload_challenge_period_if_needed(&mut self) -> Result<()> {
- let need_update = match self.challenge_period_last_updated_at {
- None => true,
- Some(utc) => {
- let diff = Utc::now() - utc;
- diff.to_std().unwrap() > DEFAULT_CHALLENGE_PERIOD_RELOAD_INTERVAL
- }
- };
-
- if need_update {
- self.reload_challenge_period().await.unwrap();
- }
-
- Ok(())
- }
-
- // Gets earliest block number we can search for non challenged `opPokes`
- async fn get_starting_block_number(
- &self,
- last_block_number: U64,
- challenge_period_in_sec: u16,
- ) -> Result {
- debug!(
- "[{:?}] Calculating starting block number, latest from chain {:?}, period {:?}",
- self.address, last_block_number, challenge_period_in_sec
- );
-
- let blocks_per_period = challenge_period_in_sec / SLOT_PERIOD_SECONDS;
-
- Ok(last_block_number - blocks_per_period)
- }
-
- // Check if given block_number for log is already non challengeable
- async fn is_challengeable(
- &self,
- block_number: U64,
- challenge_period_in_sec: u16,
- ) -> Result {
- // Checking if log is possible to challenge ?
- let block = self
- .contract_provider
- .get_block(block_number)
- .await?
- .unwrap();
-
- let diff = Utc::now().timestamp() as u64 - block.timestamp.as_u64();
-
- Ok(challenge_period_in_sec > diff as u16)
- }
-
- // This function is called every tick.
- // Deciedes blocks range we have to process, processing them and if no error happened sets new `last_processed_block` for next tick.
- // If error happened on next tick it will again try to process same blocks.
- async fn process(&mut self) -> Result<()> {
- // Reloads challenge period value
- self.reload_challenge_period_if_needed().await.unwrap();
-
- // Getting last block from chain
- let latest_block_number = self
- .contract_provider
- .get_latest_block_number()
- .await
- .unwrap();
-
- // Fetching block we have to start with
- let from_block = self.last_processed_block.unwrap_or(
- self.get_starting_block_number(latest_block_number, self.challenge_period_in_sec)
- .await?,
- );
-
- // In some cases (block drop) our latest processed block can be bigger than latest block from chain,
- // in this case we have to skip processing and reset last processed block, so on next tick we will retry.
- // Also we returning error to increase failure count.
- if from_block > latest_block_number {
- // Resetting last processed block with latest chain block
- self.set_last_processed_block(latest_block_number);
-
- bail!(
- "Invalid block range {:?} - {:?}, from block is bigger than to block, resetting last processed block to latest block from chain.",
- from_block, latest_block_number
- );
- }
-
- // Processing blocks range
- self.process_blocks_range(from_block, latest_block_number)
- .await?;
-
- // Updating last processed block with latest chain block
- self.set_last_processed_block(latest_block_number);
-
- Ok(())
- }
-
- // Validates all `OpPoked` events and challenges them if needed.
- async fn process_blocks_range(&mut self, from_block: U64, to_block: U64) -> Result<()> {
- debug!(
- "[{:?}] Processing blocks range {:?} - {:?}",
- self.address, from_block, to_block
- );
-
- // Fetch list of `OpPokeChallengedSuccessfully` events
- let challenges = self
- .contract_provider
- .get_successful_challenges(from_block, to_block)
- .await?;
-
- // Fetches `OpPoked` events
- let op_pokes = self
- .contract_provider
- .get_op_pokes(from_block, to_block)
- .await?;
-
- // ignoring already challenged pokes
- let unchallenged_pokes = reject_challenged_pokes(op_pokes, challenges);
-
- // Check if we have unchallenged pokes
- if unchallenged_pokes.is_empty() {
- debug!(
- "[{:?}] No unchallenged opPokes found in block range {:?} - {:?}, skipping...",
- self.address, from_block, to_block
- );
- return Ok(());
- }
-
- for (poke, meta) in unchallenged_pokes {
- let challengeable = self
- .is_challengeable(meta.block_number, self.challenge_period_in_sec)
- .await?;
-
- if !challengeable {
- error!(
- "[{:?}] Block is to old for `opChallenge` block number: {:?}",
- self.address, meta.block_number
- );
- continue;
- }
-
- let valid = self
- .contract_provider
- .is_schnorr_signature_valid(poke.clone())
- .await?;
-
- // If schnorr data is valid, we should not challenge it...
- if valid {
- debug!(
- "[{:?}] Schnorr data for block {:?} is valid, nothing to do...",
- self.address, meta.block_number
- );
-
- continue;
- }
-
- info!(
- "[{:?}] Schnorr data for block {:?} is not valid, trying to challenge...",
- self.address, meta.block_number
- );
-
- // TODO: handle error gracefully, we should go further even if error happened
- match self.contract_provider.challenge(poke.schnorr_data).await {
- Ok(receipt) => {
- if let Some(receipt) = receipt {
- info!(
- "[{:?}] Successfully sent `opChallenge` transaction for OpPoke on block {:?}: {:?}",
- self.address, meta.block_number, receipt
- );
- // Add challenge to metrics
- metrics::inc_challenge_counter(
- self.address,
- self.contract_provider.get_from().unwrap_or_default(),
- receipt.transaction_hash,
- );
- } else {
- warn!(
- "[{:?}] Successfully sent `opChallenge` for block {:?} transaction but no receipt returned",
- self.address, meta.block_number
- );
- }
- }
- Err(err) => {
- error!(
- "[{:?}] Failed to make `opChallenge` call for block {:?}: {:?}",
- self.address, meta.block_number, err
- );
- }
- };
- }
-
- Ok(())
- }
-
- /// Starts processing pokes for the given contract address using the specified provider and tick interval.
- ///
- /// The function uses a tokio::time::interval to run the process method at regular intervals specified by the tick_interval field.
- ///
- /// # Used arguments
- ///
- /// * `contract_address` - The address of the contract to process pokes for.
- /// * `provider` - The provider to use for interacting with the Ethereum network.
- /// * `tick_interval` - The interval at which to check for new pokes.
- ///
- /// # Examples
- ///
- /// ```
- /// use eyre::Result;
- /// use ethers::providers::{Http, Provider};
- /// use challenger::{Challenger, HttpScribeOptimisticProvider};
- /// use std::time::Duration;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<()> {
- /// let rpc_provider = Provider::::connect("https://mainnet.infura.io/v3/your-project-id").await?;
- /// let contract_address = "0x1234567890123456789012345678901234567890".parse()?;
- /// let provider = HttpScribeOptimisticProvider::new(contract_address, rpc_provider);
- /// let mut challenger = Challenger::new(contract_address, provider, Duration::from_secs(30), None);
- ///
- /// challenger.start().await?
- /// }
- /// ```
- pub async fn start(&mut self) -> Result<()> {
- let mut interval = time::interval(self.tick_interval);
-
- loop {
- match self.process().await {
- Ok(_) => {
- // Reset error counter
- self.failure_count = 0;
- }
- Err(err) => {
- error!("[{:?}] Failed to process opPokes: {:?}", self.address, err);
-
- // Increment error counter
- metrics::inc_errors_counter(
- self.address,
- self.contract_provider.get_from().unwrap_or_default(),
- &err.to_string(),
- );
-
- // Increment and check error counter
- self.failure_count += 1;
- if self.failure_count >= self.max_failure_count {
- error!(
- "[{:?}] Reached max failure count, stopping processing...",
- self.address
- );
- return Err(err);
- }
- }
- }
-
- interval.tick().await;
- }
- }
-}
-
-// Removes challenged pokes from list of loaded pokes.
-// Logic is very simple, if `OpPokeChallengedSuccessfully` event is after `OpPoked` event, then we can safely
-// say that `OpPoked` event is already challenged. So we need to validate sequence of events and remove all
-// `OpPoked` events that has `OpPokeChallengedSuccessfully` event after it.
-fn reject_challenged_pokes(
- pokes: Vec<(OpPokedFilter, LogMeta)>,
- challenges: Vec<(OpPokeChallengedSuccessfullyFilter, LogMeta)>,
-) -> Vec<(OpPokedFilter, LogMeta)> {
- if challenges.is_empty() || pokes.is_empty() {
- return pokes;
- }
- let mut result: Vec<(OpPokedFilter, LogMeta)> = vec![];
-
- if pokes.len() == 1 {
- let (_, meta) = &pokes[0];
- for (_, c_meta) in challenges.clone() {
- if c_meta.block_number > meta.block_number {
- // empty result
- return result;
- }
- }
- return pokes;
- }
-
- 'pokes_loop: for i in 0..pokes.len() {
- let (poke, meta) = &pokes.get(i).unwrap();
- // If we do have next poke in list
- if let Some((_, next_meta)) = &pokes.get(i + 1) {
- for (_, c_meta) in challenges.clone() {
- if meta.block_number < c_meta.block_number
- && next_meta.block_number > c_meta.block_number
- {
- // poke already challenged
- continue 'pokes_loop;
- }
- }
- } else {
- for (_, c_meta) in challenges.clone() {
- if c_meta.block_number > meta.block_number {
- // poke already challenged
- continue 'pokes_loop;
- }
- }
- }
- result.push((poke.clone(), meta.clone()));
- }
-
- result
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use async_trait::async_trait;
- use contract::SchnorrData;
- use ethers::{
- contract::LogMeta,
- types::{Block, TransactionReceipt, H160, H256, U256, U64},
- };
- use eyre::Result;
- use mockall::{mock, predicate::*};
-
- mock! {
- pub TestScribe{}
-
- #[async_trait]
- impl ScribeOptimisticProvider for TestScribe {
- async fn get_latest_block_number(&self) -> Result;
-
- async fn get_block(&self, block_number: U64) -> Result