Skip to content

Commit

Permalink
Fixes staging issues with constant failing (#5)
Browse files Browse the repository at this point in the history
* Updated rpc calls to return error rather than panic + fixes in getLogs requets
  • Loading branch information
konstantinzolotarev authored Oct 11, 2023
1 parent 7adebd5 commit 775892b
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 44 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ hex = "0.4.3"
chrono = "0.4.26"
rpassword = "7.2.0"
lazy_static = "1.4.0"
tokio-util = "0.7.9"
136 changes: 101 additions & 35 deletions src/challenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use chrono::Utc;
use chrono::{DateTime, Utc};
use ethers::{
contract::{abigen, Contract, LogMeta},
core::types::{Address, ValueOrArray, U64},
Expand All @@ -22,9 +22,9 @@ use ethers::{
use eyre::Result;
use log::{debug, error, info};
use scribe_optimistic::OpPokeChallengedSuccessfullyFilter;
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc::Sender, time};
use tokio_util::sync::CancellationToken;

abigen!(ScribeOptimistic, "./abi/ScribeOptimistic.json");

Expand All @@ -34,11 +34,14 @@ abigen!(ScribeOptimistic, "./abi/ScribeOptimistic.json");
const SLOT_PERIOD_SECONDS: u16 = 12;

#[allow(dead_code)]
#[derive(Debug)]
pub struct Challenger<M> {
address: Address,
client: Arc<M>,
contract: ScribeOptimistic<M>,
last_processed_block: Option<U64>,
challenge_period_in_sec: u16,
challenge_period_last_updated_at: Option<DateTime<Utc>>,
}

impl<M: Middleware> Challenger<M>
Expand All @@ -54,9 +57,42 @@ where
client: client,
contract: contract,
last_processed_block: None,
challenge_period_in_sec: 0,
challenge_period_last_updated_at: None,
}
}

// Reloads challenge period from contract.
// This function have to be called every N time, because challenge period can be changed by contract owner.
async fn reload_challenge_period(&mut self) -> Result<()> {
let challenge_period_in_sec = self.contract.op_challenge_period().call().await.unwrap();
debug!(
"Address {:?}, reloaded opChallenge period for contract is {:?}",
self.address, challenge_period_in_sec
);
self.challenge_period_in_sec = challenge_period_in_sec;
self.challenge_period_last_updated_at = Some(Utc::now());

Ok(())
}

// Reloads challenge period value if it was not pulled from contract or pulled more than 10 mins ago.
async fn reload_challenge_period_if_needed(&mut self) -> Result<()> {
let need_update = match self.challenge_period_last_updated_at {
None => true,
Some(utc) => {
let diff = Utc::now() - utc;
diff.to_std().unwrap() > Duration::from_secs(600)
}
};

if need_update {
self.reload_challenge_period().await.unwrap();
}

Ok(())
}

// Gets earliest block number we can search for non challenged `opPokes`
async fn get_starting_block_number(
&self,
Expand All @@ -72,6 +108,7 @@ where
async fn get_successful_challenges(
&self,
from_block: U64,
to_block: U64,
) -> Result<Vec<(OpPokeChallengedSuccessfullyFilter, LogMeta)>> {
debug!(
"Address {:?}, searching OpPokeChallengedSuccessfully events from block {:?}",
Expand All @@ -80,7 +117,23 @@ where
let event =
Contract::event_of_type::<OpPokeChallengedSuccessfullyFilter>(self.client.clone())
.address(ValueOrArray::Array(vec![self.address]))
.from_block(from_block);
.from_block(from_block)
.to_block(to_block);

Ok(event.query_with_meta().await?)
}

// Gets list of OpPoked events for blocks gap we need.
async fn get_op_pokes(
&self,
from_block: U64,
to_block: U64,
) -> Result<Vec<(OpPokedFilter, LogMeta)>> {
// Fetches `OpPoked` events
let event = Contract::event_of_type::<OpPokedFilter>(self.client.clone())
.address(ValueOrArray::Array(vec![self.address]))
.from_block(from_block)
.to_block(to_block);

Ok(event.query_with_meta().await?)
}
Expand All @@ -99,7 +152,7 @@ where
}

// TODO: Need tests
fn filter_unchallenged_events(
fn reject_challenged_pokes(
&self,
pokes: Vec<(OpPokedFilter, LogMeta)>,
challenges: Vec<(OpPokeChallengedSuccessfullyFilter, LogMeta)>,
Expand Down Expand Up @@ -150,18 +203,15 @@ where
}

async fn process(&mut self) -> Result<()> {
let challenge_period_in_sec = self.contract.op_challenge_period().call().await?;
debug!(
"Address {:?}, opChallenge period for contract is {:?}",
self.address, challenge_period_in_sec
);
// Reloads challenge period value
self.reload_challenge_period_if_needed().await.unwrap();

// Getting last block from chain
let last_block_number = self.client.get_block_number().await?;
let latest_block_number = self.client.get_block_number().await.unwrap();

// Fetching block we have to start with
let from_block = self.last_processed_block.unwrap_or(
self.get_starting_block_number(last_block_number, challenge_period_in_sec)
self.get_starting_block_number(latest_block_number, self.challenge_period_in_sec)
.await?,
);

Expand All @@ -171,31 +221,31 @@ where
);

// Updating last processed block with latest chain block
self.last_processed_block = Some(last_block_number);
self.last_processed_block = Some(latest_block_number);

// Fetch list of `OpPokeChallengedSuccessfully` events
let challenges = self.get_successful_challenges(from_block).await?;
let challenges = self
.get_successful_challenges(from_block, latest_block_number)
.await?;

// Fetches `OpPoked` events
let event = Contract::event_of_type::<OpPokedFilter>(self.client.clone())
.address(ValueOrArray::Array(vec![self.address]))
.from_block(from_block);
let op_pokes = self.get_op_pokes(from_block, latest_block_number).await?;

let logs = event.query_with_meta().await?;
// ignoring already challenged pokes
let unchallenged_pokes = self.reject_challenged_pokes(op_pokes, challenges);

let filtered = self.filter_unchallenged_events(logs, challenges);

if filtered.len() == 0 {
info!(
// Check if we have unchallenged pokes
if unchallenged_pokes.len() == 0 {
debug!(
"Address {:?}, no unchallenged opPokes found, skipping...",
self.address
);
return Ok(());
}

for (log, meta) in filtered {
for (log, meta) in unchallenged_pokes {
let challengeable = self
.is_challengeable(meta.block_number, challenge_period_in_sec)
.is_challengeable(meta.block_number, self.challenge_period_in_sec)
.await?;

if !challengeable {
Expand Down Expand Up @@ -226,6 +276,11 @@ where
);

if !valid {
debug!(
"Address {:?}, schnorr data is not valid, trying to challenge...",
self.address
);

// TODO: handle error gracefully, we should go further even if error happened
match self.challenge(schnorr_data.clone()).await {
Ok(receipt) => {
Expand Down Expand Up @@ -266,21 +321,32 @@ where
}

/// Start address processing
pub async fn start(&mut self, _sender: Sender<()>) -> Result<()> {
pub async fn start(
&mut self,
_sender: Sender<()>,
cancellation_token: CancellationToken,
) -> Result<()> {
let mut interval = time::interval(Duration::from_secs(30));

loop {
interval.tick().await;

match self.process().await {
Ok(_) => {
debug!("All ok, continue with next tick...");
tokio::select! {
_ = cancellation_token.cancelled() => {
info!("Address {:?}, cancellation token received, stopping...", self.address);
return Ok(());
}
Err(err) => {
error!(
"Address {:?}, failed to process opPokes: {:?}",
self.address, err
);
_ = interval.tick() => {
debug!("Address {:?}, interval tick", self.address);
match self.process().await {
Ok(_) => {
debug!("All ok, continue with next tick...");
}
Err(err) => {
error!(
"Address {:?}, failed to process opPokes: {:?}",
self.address, err
);
}
}
}
}
}
Expand Down
34 changes: 27 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ use ethers::{
signers::Signer,
};
use eyre::Result;
use log::{debug, info};
use log::{debug, error, info};
use std::sync::Arc;

mod challenger;
mod wallet;
use challenger::Challenger;
use tokio::signal;
use tokio::sync::mpsc::channel;
use tokio_util::sync::CancellationToken;

use wallet::{CustomWallet, KeystoreWallet, PrivateKeyWallet};

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -117,19 +120,22 @@ async fn main() -> Result<()> {

let client = Arc::new(SignerMiddleware::new(provider, signer));

let (send, mut recv) = channel(1);
let token = CancellationToken::new();
let (send, mut recv) = channel(args.addresses.len());

for address in &args.addresses {
let address = address.parse::<Address>()?;

let cloned_client = client.clone();
let c_send = send.clone();
let client_clone = client.clone();
let send_clone = send.clone();
let token_clone = token.clone();

tokio::spawn(async move {
info!("Address {:?} starting monitoring opPokes", address);

let mut challenger = Challenger::new(address, cloned_client);
let mut challenger = Challenger::new(address, client_clone);

challenger.start(c_send).await
challenger.start(send_clone, token_clone).await
});
}

Expand All @@ -139,12 +145,26 @@ async fn main() -> Result<()> {
// sleeps forever.
drop(send);

let _ = recv.recv().await;
tokio::select! {
_ = signal::ctrl_c() => {
info!("Received Ctrl-C, shutting down");
token.cancel();

// Waiting for all tasks to finish
recv.recv().await;
},

_ = recv.recv() => {
info!("Tasks finished, shutting down");
},
}
Ok(())
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use super::*;

#[test]
Expand Down

0 comments on commit 775892b

Please sign in to comment.