Skip to content

Commit

Permalink
sc-4232 Added prometheus metrics for latest processed block per addre…
Browse files Browse the repository at this point in the history
…ss + small code fixes (#11)

* Added prometheus metrics into challenger

* sc-4232 Added prometheus metrics for latest processed block per address + small code fixes

* sc-4232 Code optimizations + linter fixes

* sc-4232 Fix tests
  • Loading branch information
konstantinzolotarev authored Mar 11, 2024
1 parent 3ab6f29 commit 7789a4d
Show file tree
Hide file tree
Showing 8 changed files with 1,327 additions and 779 deletions.
1,847 changes: 1,104 additions & 743 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@ edition = "2021"
[workspace.dependencies]
challenger-lib = { path = "challenger" }

[dependencies]
challenger-lib.workspace = true

ethers = "2.0"
tokio = { version = "1", features = ["full"] }
hex-literal = "0.4.1"
env_logger = "0.10.0"
env_logger = "0.11.3"
futures = "0.3.28"
eyre = "0.6.8"
clap = { version = "4.3.21", features = ["derive", "env"] }
log = "0.4.20"
hex = "0.4.3"
chrono = "0.4.26"
rpassword = "7.2.0"
lazy_static = "1.4.0"
tokio-util = "0.7.9"
async-trait = "0.1.73"
warp = "0.3"

[dependencies]
challenger-lib.workspace = true
tokio = { workspace = true, features = ["full"] }
env_logger = { workspace = true }
log = { workspace = true }
eyre = { workspace = true }
ethers = { workspace = true }
warp = { workspace = true }

clap = { version = "4.3.21", features = ["derive", "env"] }
rpassword = "7.2.0"
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
```

## Metrics & Health checks

By default challenger exposes metrics and health checks on `:9090/metrics` route.
Metrics are exposed in prometheus format and health checks are exposed in json format.

Health check route is `:9090/health`.

`HTTP_PORT` env variable can be used to change the port.
Example: `HTTP_PORT=8080` will expose metrics on `:8080/metrics` and health checks on `:8080/health`.
30 changes: 15 additions & 15 deletions challenger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ edition = "2021"
doctest = false

[dependencies]
ethers = "2.0"
tokio = { version = "1", features = ["full"] }
hex-literal = "0.4.1"
env_logger = "0.10.0"
futures = "0.3.28"
eyre = "0.6.8"
clap = { version = "4.3.21", features = ["derive", "env"] }
log = "0.4.20"
hex = "0.4.3"
chrono = "0.4.26"
rpassword = "7.2.0"
lazy_static = "1.4.0"
tokio-util = "0.7.9"
async-trait = "0.1.73"
ethers = { workspace = true }
tokio = { workspace = true, features = ["full"] }
hex-literal = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
eyre = { workspace = true }
log = { workspace = true }
hex = { workspace = true }
chrono = { workspace = true }
lazy_static = { workspace = true }
tokio-util = { workspace = true }
async-trait = { workspace = true }

prometheus = { version = "0.13.3", features = ["process"] }

[dev-dependencies]
mockall = "0.11.2"
mockall = "0.12.1"
14 changes: 13 additions & 1 deletion challenger/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ethers::{
types::{Address, Block, TransactionReceipt, ValueOrArray, H256, U64},
};
use eyre::{Result, WrapErr};
use log::debug;
use log::{debug, warn};

// Yes it generates smart contract code based on given ABI
abigen!(ScribeOptimistic, "./abi/ScribeOptimistic.json");
Expand Down Expand Up @@ -61,6 +61,14 @@ pub trait ScribeOptimisticProvider: Send + Sync {

/// Challenges given `OpPoked` event.
async fn challenge(&self, schnorr_data: SchnorrData) -> Result<Option<TransactionReceipt>>;

/// Returns the from account address.
fn get_from(&self) -> Option<Address> {
warn!(
"get_from() uses default sender address, it's not recommended to use it in production"
);
Some(Address::zero())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -88,6 +96,10 @@ where
M: 'static,
M::Error: 'static,
{
fn get_from(&self) -> Option<Address> {
self.client.default_sender()
}

/// Returns the latest block number from RPC.
async fn get_latest_block_number(&self) -> Result<U64> {
self.client
Expand Down
56 changes: 51 additions & 5 deletions challenger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ use ethers::{
core::types::{Address, U64},
};
use eyre::Result;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use std::time::Duration;
use tokio::time;

pub mod contract;
pub mod metrics;

use contract::{OpPokeChallengedSuccessfullyFilter, OpPokedFilter, ScribeOptimisticProvider};

use crate::metrics::{ERRORS_COUNTER, LAST_SCANNED_BLOCK_GAUGE};

// Note: this is true virtually all of the time but because of leap seconds not always.
// We take minimal time just to be sure, it's always better to check outdated blocks
// rather than miss some.
Expand Down Expand Up @@ -165,6 +168,18 @@ where
// Updating last processed block with latest chain block
self.last_processed_block = Some(latest_block_number);

// Updating last scanned block metric
LAST_SCANNED_BLOCK_GAUGE
.with_label_values(&[
&self.address.to_string(),
&self
.contract_provider
.get_from()
.unwrap_or_default()
.to_string(),
])
.set(latest_block_number.as_u64() as i64);

// Fetch list of `OpPokeChallengedSuccessfully` events
let challenges = self
.contract_provider
Expand Down Expand Up @@ -221,10 +236,29 @@ where
// TODO: handle error gracefully, we should go further even if error happened
match self.contract_provider.challenge(log.schnorr_data).await {
Ok(receipt) => {
info!(
"Address {:?}, successfully sent `opChallenge` transaction {:?}",
self.address, receipt
);
if let Some(receipt) = receipt {
info!(
"Address {:?}, successfully sent `opChallenge` transaction {:?}",
self.address, receipt
);
// Add challenge to metrics
LAST_SCANNED_BLOCK_GAUGE
.with_label_values(&[
&self.address.to_string(),
&self
.contract_provider
.get_from()
.unwrap_or_default()
.to_string(),
&receipt.transaction_hash.to_string(),
])
.inc();
} else {
warn!(
"Address {:?}, successfully sent `opChallenge` transaction but no receipt returned",
self.address
);
}
}
Err(err) => {
error!(
Expand Down Expand Up @@ -283,6 +317,18 @@ where
self.address, err
);

ERRORS_COUNTER
.with_label_values(&[
&self.address.to_string(),
&self
.contract_provider
.get_from()
.unwrap_or_default()
.to_string(),
&err.to_string(),
])
.inc();

// Increment and check error counter
self.failure_count += 1;
if self.failure_count >= self.max_failure_count {
Expand Down
68 changes: 68 additions & 0 deletions challenger/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use eyre::{Context, Result};
use lazy_static::lazy_static;
use prometheus::{IntCounterVec, IntGaugeVec, Opts, Registry};

lazy_static! {
pub static ref REGISTRY: Registry =
Registry::new_custom(Some(String::from("challenger")), None)
.expect("registry can be created");
pub static ref ERRORS_COUNTER: IntCounterVec = IntCounterVec::new(
Opts::new("errors_total", "Challenger Errors Counter"),
&["address", "from", "error"]
)
.expect("metric can be created");
pub static ref CHALLENGE_COUNTER: IntCounterVec = IntCounterVec::new(
Opts::new("challenges_total", "Number of challenges made"),
&["address", "from", "tx"]
)
.expect("metric can be created");
pub static ref LAST_SCANNED_BLOCK_GAUGE: IntGaugeVec = IntGaugeVec::new(
Opts::new("last_scanned_block", "Last scanned block"),
&["address", "from"]
)
.expect("metric can be created");
}

/// `register_custom_metrics` registers custom metrics to the registry.
/// It have to be called before you plan to serve `/metrics` route.
pub fn register_custom_metrics() {
REGISTRY
.register(Box::new(ERRORS_COUNTER.clone()))
.expect("collector can be registered");

REGISTRY
.register(Box::new(CHALLENGE_COUNTER.clone()))
.expect("collector can be registered");

REGISTRY
.register(Box::new(LAST_SCANNED_BLOCK_GAUGE.clone()))
.expect("collector can be registered");
}

pub fn as_encoded_string() -> Result<String> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();

// Collect and encode custom metrics from `REGISTRY`
let mut buffer = Vec::new();
encoder
.encode(&REGISTRY.gather(), &mut buffer)
.wrap_err("Failed to encode REGISTRY metrics")?;

let mut res = String::from_utf8(buffer.clone())
.wrap_err("Failed to convert REGISTRY metrics from utf8")?;
buffer.clear();

// Collect and encode prometheus metrics from `prometheus::gather()`
let mut buffer = Vec::new();
encoder
.encode(&prometheus::gather(), &mut buffer)
.wrap_err("Failed to encode prometheus metrics")?;

let res_custom = String::from_utf8(buffer.clone())
.wrap_err("Failed to convert prometheus metrics from utf8")?;
buffer.clear();

res.push_str(&res_custom);
Ok(res)
}
62 changes: 53 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ use ethers::{
signers::Signer,
};
use eyre::Result;
use log::{debug, error, info};
use std::panic;
use log::{error, info};
use std::sync::Arc;
use std::{env, panic};

mod wallet;

use challenger_lib::contract::HttpScribeOptimisticProvider;
use challenger_lib::Challenger;
use challenger_lib::{contract::HttpScribeOptimisticProvider, metrics::ERRORS_COUNTER};
use challenger_lib::{metrics, Challenger};

use tokio::signal;
use tokio::task::JoinSet;

use wallet::{CustomWallet, KeystoreWallet, PrivateKeyWallet};
use warp::{reject::Rejection, reply::Reply, Filter};

#[derive(Parser, Debug)]
#[command(author, version, about)]
Expand Down Expand Up @@ -107,26 +108,31 @@ async fn main() -> Result<()> {

let provider = Provider::<Http>::try_from(args.rpc_url.as_str())?;

debug!("Connected to {:?}", provider.url());
info!("Connected to {:?}", provider.url());
let chain_id = args
.chain_id
.unwrap_or(provider.get_chainid().await?.as_u64());

debug!("Chain id: {:?}", chain_id);
info!("Chain id: {:?}", chain_id);
// Generating signer from given private key
let signer = args.wallet()?.unwrap().with_chain_id(chain_id);

debug!(
info!(
"Using {:?} for signing and chain_id {:?}",
signer.address(),
signer.chain_id()
);

let signer_address = signer.address();
let client = Arc::new(SignerMiddleware::new(provider, signer));

let mut set = JoinSet::new();

for address in &args.addresses {
// Removing duplicates from list of provided addresses
let mut addresses = args.addresses;
addresses.dedup();

for address in &addresses {
let address = address.parse::<Address>()?;

let client_clone = client.clone();
Expand All @@ -136,10 +142,38 @@ async fn main() -> Result<()> {
let contract_provider = HttpScribeOptimisticProvider::new(address, client_clone);
let mut challenger = Challenger::new(address, contract_provider, None, None);

challenger.start().await
let res = challenger.start().await;
// Check and add error into metrics
if res.is_err() {
ERRORS_COUNTER
.with_label_values(&[
&address.to_string(),
&signer_address.to_string(),
&res.err().unwrap().to_string(),
])
.inc();
}
});
}

// TODO: Start HTTP server for `:9090/metrics`
set.spawn(async move {
metrics::register_custom_metrics();

let metrics_route = warp::path!("metrics").and_then(metrics_handle);
let health_route = warp::path!("health").map(|| warp::reply::json(&"OK"));

let port = env::var("HTTP_PORT")
.unwrap_or(String::from("9090"))
.parse::<u16>()
.unwrap();

info!("Starting HTTP server on port {}", port);
warp::serve(metrics_route.or(health_route))
.run(([0, 0, 0, 0], port))
.await;
});

tokio::select! {
_ = signal::ctrl_c() => {
info!("Received Ctrl-C, shutting down");
Expand All @@ -162,6 +196,16 @@ async fn main() -> Result<()> {
Ok(())
}

async fn metrics_handle() -> Result<impl Reply, Rejection> {
match metrics::as_encoded_string() {
Ok(v) => Ok(v),
Err(e) => {
error!("could not encode custom metrics: {}", e);
Ok(String::default())
}
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;
Expand Down

0 comments on commit 7789a4d

Please sign in to comment.