diff --git a/Cargo.lock b/Cargo.lock index fcfc104..c05b3d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -408,6 +408,7 @@ dependencies = [ "log", "rpassword", "tokio", + "tokio-util", ] [[package]] @@ -3167,9 +3168,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 8291e1e..31e5e43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ hex = "0.4.3" chrono = "0.4.26" rpassword = "7.2.0" lazy_static = "1.4.0" +tokio-util = "0.7.9" diff --git a/src/challenger.rs b/src/challenger.rs index 9c1ab2e..a7d612b 100644 --- a/src/challenger.rs +++ b/src/challenger.rs @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use chrono::Utc; +use chrono::{DateTime, Utc}; use ethers::{ contract::{abigen, Contract, LogMeta}, core::types::{Address, ValueOrArray, U64}, @@ -22,9 +22,9 @@ use ethers::{ use eyre::Result; use log::{debug, error, info}; use scribe_optimistic::OpPokeChallengedSuccessfullyFilter; -use std::sync::Arc; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tokio::{sync::mpsc::Sender, time}; +use tokio_util::sync::CancellationToken; abigen!(ScribeOptimistic, "./abi/ScribeOptimistic.json"); @@ -34,11 +34,14 @@ abigen!(ScribeOptimistic, "./abi/ScribeOptimistic.json"); const SLOT_PERIOD_SECONDS: u16 = 12; #[allow(dead_code)] +#[derive(Debug)] pub struct Challenger { address: Address, client: Arc, contract: ScribeOptimistic, last_processed_block: Option, + challenge_period_in_sec: u16, + challenge_period_last_updated_at: Option>, } impl Challenger @@ -54,9 +57,42 @@ where client: client, contract: contract, last_processed_block: None, + challenge_period_in_sec: 0, + challenge_period_last_updated_at: None, } } + // Reloads challenge period from contract. + // This function have to be called every N time, because challenge period can be changed by contract owner. + async fn reload_challenge_period(&mut self) -> Result<()> { + let challenge_period_in_sec = self.contract.op_challenge_period().call().await.unwrap(); + debug!( + "Address {:?}, reloaded opChallenge period for contract is {:?}", + self.address, challenge_period_in_sec + ); + self.challenge_period_in_sec = challenge_period_in_sec; + self.challenge_period_last_updated_at = Some(Utc::now()); + + Ok(()) + } + + // Reloads challenge period value if it was not pulled from contract or pulled more than 10 mins ago. + async fn reload_challenge_period_if_needed(&mut self) -> Result<()> { + let need_update = match self.challenge_period_last_updated_at { + None => true, + Some(utc) => { + let diff = Utc::now() - utc; + diff.to_std().unwrap() > Duration::from_secs(600) + } + }; + + if need_update { + self.reload_challenge_period().await.unwrap(); + } + + Ok(()) + } + // Gets earliest block number we can search for non challenged `opPokes` async fn get_starting_block_number( &self, @@ -72,6 +108,7 @@ where async fn get_successful_challenges( &self, from_block: U64, + to_block: U64, ) -> Result> { debug!( "Address {:?}, searching OpPokeChallengedSuccessfully events from block {:?}", @@ -80,7 +117,23 @@ where let event = Contract::event_of_type::(self.client.clone()) .address(ValueOrArray::Array(vec![self.address])) - .from_block(from_block); + .from_block(from_block) + .to_block(to_block); + + Ok(event.query_with_meta().await?) + } + + // Gets list of OpPoked events for blocks gap we need. + async fn get_op_pokes( + &self, + from_block: U64, + to_block: U64, + ) -> Result> { + // Fetches `OpPoked` events + let event = Contract::event_of_type::(self.client.clone()) + .address(ValueOrArray::Array(vec![self.address])) + .from_block(from_block) + .to_block(to_block); Ok(event.query_with_meta().await?) } @@ -99,7 +152,7 @@ where } // TODO: Need tests - fn filter_unchallenged_events( + fn reject_challenged_pokes( &self, pokes: Vec<(OpPokedFilter, LogMeta)>, challenges: Vec<(OpPokeChallengedSuccessfullyFilter, LogMeta)>, @@ -150,18 +203,15 @@ where } async fn process(&mut self) -> Result<()> { - let challenge_period_in_sec = self.contract.op_challenge_period().call().await?; - debug!( - "Address {:?}, opChallenge period for contract is {:?}", - self.address, challenge_period_in_sec - ); + // Reloads challenge period value + self.reload_challenge_period_if_needed().await.unwrap(); // Getting last block from chain - let last_block_number = self.client.get_block_number().await?; + let latest_block_number = self.client.get_block_number().await.unwrap(); // Fetching block we have to start with let from_block = self.last_processed_block.unwrap_or( - self.get_starting_block_number(last_block_number, challenge_period_in_sec) + self.get_starting_block_number(latest_block_number, self.challenge_period_in_sec) .await?, ); @@ -171,31 +221,31 @@ where ); // Updating last processed block with latest chain block - self.last_processed_block = Some(last_block_number); + self.last_processed_block = Some(latest_block_number); // Fetch list of `OpPokeChallengedSuccessfully` events - let challenges = self.get_successful_challenges(from_block).await?; + let challenges = self + .get_successful_challenges(from_block, latest_block_number) + .await?; // Fetches `OpPoked` events - let event = Contract::event_of_type::(self.client.clone()) - .address(ValueOrArray::Array(vec![self.address])) - .from_block(from_block); + let op_pokes = self.get_op_pokes(from_block, latest_block_number).await?; - let logs = event.query_with_meta().await?; + // ignoring already challenged pokes + let unchallenged_pokes = self.reject_challenged_pokes(op_pokes, challenges); - let filtered = self.filter_unchallenged_events(logs, challenges); - - if filtered.len() == 0 { - info!( + // Check if we have unchallenged pokes + if unchallenged_pokes.len() == 0 { + debug!( "Address {:?}, no unchallenged opPokes found, skipping...", self.address ); return Ok(()); } - for (log, meta) in filtered { + for (log, meta) in unchallenged_pokes { let challengeable = self - .is_challengeable(meta.block_number, challenge_period_in_sec) + .is_challengeable(meta.block_number, self.challenge_period_in_sec) .await?; if !challengeable { @@ -226,6 +276,11 @@ where ); if !valid { + debug!( + "Address {:?}, schnorr data is not valid, trying to challenge...", + self.address + ); + // TODO: handle error gracefully, we should go further even if error happened match self.challenge(schnorr_data.clone()).await { Ok(receipt) => { @@ -266,21 +321,32 @@ where } /// Start address processing - pub async fn start(&mut self, _sender: Sender<()>) -> Result<()> { + pub async fn start( + &mut self, + _sender: Sender<()>, + cancellation_token: CancellationToken, + ) -> Result<()> { let mut interval = time::interval(Duration::from_secs(30)); loop { - interval.tick().await; - - match self.process().await { - Ok(_) => { - debug!("All ok, continue with next tick..."); + tokio::select! { + _ = cancellation_token.cancelled() => { + info!("Address {:?}, cancellation token received, stopping...", self.address); + return Ok(()); } - Err(err) => { - error!( - "Address {:?}, failed to process opPokes: {:?}", - self.address, err - ); + _ = interval.tick() => { + debug!("Address {:?}, interval tick", self.address); + match self.process().await { + Ok(_) => { + debug!("All ok, continue with next tick..."); + } + Err(err) => { + error!( + "Address {:?}, failed to process opPokes: {:?}", + self.address, err + ); + } + } } } } diff --git a/src/main.rs b/src/main.rs index 6de3c97..3423cd7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,13 +22,16 @@ use ethers::{ signers::Signer, }; use eyre::Result; -use log::{debug, info}; +use log::{debug, error, info}; use std::sync::Arc; mod challenger; mod wallet; use challenger::Challenger; +use tokio::signal; use tokio::sync::mpsc::channel; +use tokio_util::sync::CancellationToken; + use wallet::{CustomWallet, KeystoreWallet, PrivateKeyWallet}; #[derive(Parser, Debug)] @@ -117,19 +120,22 @@ async fn main() -> Result<()> { let client = Arc::new(SignerMiddleware::new(provider, signer)); - let (send, mut recv) = channel(1); + let token = CancellationToken::new(); + let (send, mut recv) = channel(args.addresses.len()); for address in &args.addresses { let address = address.parse::
()?; - let cloned_client = client.clone(); - let c_send = send.clone(); + let client_clone = client.clone(); + let send_clone = send.clone(); + let token_clone = token.clone(); + tokio::spawn(async move { info!("Address {:?} starting monitoring opPokes", address); - let mut challenger = Challenger::new(address, cloned_client); + let mut challenger = Challenger::new(address, client_clone); - challenger.start(c_send).await + challenger.start(send_clone, token_clone).await }); } @@ -139,12 +145,26 @@ async fn main() -> Result<()> { // sleeps forever. drop(send); - let _ = recv.recv().await; + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received Ctrl-C, shutting down"); + token.cancel(); + + // Waiting for all tasks to finish + recv.recv().await; + }, + + _ = recv.recv() => { + info!("Tasks finished, shutting down"); + }, + } Ok(()) } #[cfg(test)] mod tests { + use std::path::PathBuf; + use super::*; #[test]