Skip to content

Commit

Permalink
refactor: indexer_error metrics moved to indexer_common, auto inc and…
Browse files Browse the repository at this point in the history
… logs in new fn
  • Loading branch information
hopeyen committed Nov 4, 2023
1 parent d61add4 commit 444f50a
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 81 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ keccak-hash = "0.10.0"
lazy_static = "1.4.0"
log = "0.4.20"
lru = "0.11.1"
once_cell = "1.17"
prometheus = "0.13.3"
regex = "1.7.1"
reqwest = "0.11.20"
secp256k1 = { version = "0.28.0", features = ["recovery"] }
Expand Down
14 changes: 14 additions & 0 deletions common/src/indexer_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use std::{
fmt::{self, Display},
};

use log::warn;

use crate::metrics;

const ERROR_BASE_URL: &str = "https://github.com/graphprotocol/indexer/blob/main/docs/errors.md";

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -300,8 +304,18 @@ pub struct IndexerError {
}

impl IndexerError {
// Create Indexer Error and automatically increment counter by the error code
pub fn new(code: IndexerErrorCode, cause: Option<IndexerErrorCause>) -> Self {
metrics::INDEXER_ERROR
.with_label_values(&[&code.to_string()])
.inc();
let explanation = code.message();
warn!(
"Encountered error {}: {}. Cause: {:#?}",
code.to_string(),
explanation,
cause
);
Self {
code,
explanation: explanation.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod attestations;
pub mod escrow_accounts;
pub mod graphql;
pub mod indexer_errors;
pub mod metrics;
pub mod signature_verification;
pub mod subgraph_client;
pub mod tap_manager;
Expand All @@ -21,7 +22,7 @@ pub mod prelude {
dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers,
};
pub use super::escrow_accounts::escrow_accounts;
pub use super::subgraph_client::{DeploymentDetails, SubgraphClient};
pub use super::indexer_errors;
pub use super::subgraph_client::{DeploymentDetails, SubgraphClient};
pub use super::tap_manager::TapManager;
}
33 changes: 33 additions & 0 deletions common/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use once_cell::sync::Lazy;
use prometheus::{core::Collector, register, IntCounterVec, Opts, Registry};

#[allow(dead_code)]
pub static INDEXER_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
let m = IntCounterVec::new(
Opts::new("indexer_error", "Indexer errors observed over time")
.namespace("indexer")
.subsystem("service"),
&["code"],
)
.expect("Failed to create indexer_error");
register(Box::new(m.clone())).expect("Failed to register indexer_error counter");
m
});

#[allow(dead_code)]
pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);

#[allow(dead_code)]
pub fn register_metrics(registry: &Registry, metrics: Vec<Box<dyn Collector>>) {
for metric in metrics {
registry.register(metric).expect("Cannot register metrics");
}
}

/// Register indexer error metrics in Prometheus registry
pub fn register_indexer_error_metrics() {
register_metrics(&REGISTRY, vec![Box::new(INDEXER_ERROR.clone())]);
}
3 changes: 3 additions & 0 deletions service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ async fn main() -> Result<(), std::io::Error> {
let query_processor =
QueryProcessor::new(graph_node.clone(), attestation_signers.clone(), tap_manager);

indexer_common::metrics::register_indexer_error_metrics();
metrics::register_query_metrics();

// Start indexer service basic metrics
tokio::spawn(handle_serve_metrics(
String::from("0.0.0.0"),
Expand Down
33 changes: 4 additions & 29 deletions service/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use autometrics::{encode_global_metrics, global_metrics_exporter};
use axum::http::StatusCode;
use axum::routing::get;
use axum::Router;
use indexer_common::metrics::{register_metrics, REGISTRY};
use once_cell::sync::Lazy;
use prometheus::{core::Collector, Registry};

use prometheus::{linear_buckets, HistogramOpts, HistogramVec, IntCounterVec, Opts};
use std::{net::SocketAddr, str::FromStr};
use tracing::{debug, info};
use tracing::info;

pub static QUERIES: Lazy<IntCounterVec> = Lazy::new(|| {
let m = IntCounterVec::new(
Expand Down Expand Up @@ -96,33 +97,8 @@ pub static QUERY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
m
});

#[allow(dead_code)]
pub static INDEXER_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
let m = IntCounterVec::new(
Opts::new("indexer_error", "Indexer errors observed over time")
.namespace("indexer")
.subsystem("service"),
&["code"],
)
.expect("Failed to create indexer_error");
prometheus::register(Box::new(m.clone())).expect("Failed to register indexer_error counter");
m
});

#[allow(dead_code)]
pub static REGISTRY: Lazy<prometheus::Registry> = Lazy::new(prometheus::Registry::new);

#[allow(dead_code)]
pub fn register_metrics(registry: &Registry, metrics: Vec<Box<dyn Collector>>) {
for metric in metrics {
registry.register(metric).expect("Cannot register metrics");
debug!("registered metric");
}
}

/// Start the basic metrics for indexer services
#[allow(dead_code)]
pub fn start_metrics() {
pub fn register_query_metrics() {
register_metrics(
&REGISTRY,
vec![
Expand All @@ -132,7 +108,6 @@ pub fn start_metrics() {
Box::new(QUERIES_WITH_INVALID_RECEIPT_HEADER.clone()),
Box::new(QUERIES_WITHOUT_RECEIPT.clone()),
Box::new(QUERY_DURATION.clone()),
Box::new(INDEXER_ERROR.clone()),
],
);
}
Expand Down
46 changes: 21 additions & 25 deletions service/src/query_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use tap_core::tap_manager::SignedReceipt;
use toolshed::thegraph::attestation::Attestation;
use toolshed::thegraph::DeploymentId;

use crate::metrics;
use indexer_common::indexer_errors::IndexerErrorCode;
use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause, IndexerErrorCode};
use indexer_common::prelude::AttestationSigner;

use crate::graph_node::GraphNodeInstance;
Expand Down Expand Up @@ -93,14 +92,7 @@ impl QueryProcessor {
let response = self
.graph_node
.subgraph_query_raw(&query.subgraph_deployment_id, query.query)
.await
.map_err(|e| {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE033.to_string()])
.inc();

e
})?;
.await?;

Ok(Response {
result: response,
Expand All @@ -123,9 +115,12 @@ impl QueryProcessor {
{
Ok(r) => r,
Err(e) => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE031.to_string()])
.inc();
IndexerError::new(
IndexerErrorCode::IE031,
Some(IndexerErrorCause::new(
"Failed to parse receipt for a paid query",
)),
);

return Err(e);
}
Expand All @@ -137,10 +132,12 @@ impl QueryProcessor {
.verify_and_store_receipt(parsed_receipt)
.await
.map_err(|e| {
//TODO: fit indexer errors to TAP better, currently keeping the old messages
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE053.to_string()])
.inc();
IndexerError::new(
IndexerErrorCode::IE053,
Some(IndexerErrorCause::new(
"Failed to verify and store a parsed receipt",
)),
);

QueryError::Other(e)
})?;
Expand All @@ -150,14 +147,13 @@ impl QueryProcessor {
.value_immediate()
.ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?;
let signer = signers.get(&allocation_id).ok_or_else(|| {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE022.to_string()])
.inc();

QueryError::Other(anyhow::anyhow!(
"No signer found for allocation id {}",
allocation_id
))
let err_msg = format!("No signer found for allocation id {}", allocation_id);
IndexerError::new(
IndexerErrorCode::IE022,
Some(IndexerErrorCause::new(err_msg.clone())),
);

QueryError::Other(anyhow::anyhow!(err_msg))
})?;

let response = self
Expand Down
22 changes: 14 additions & 8 deletions service/src/server/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use hyper::body::Bytes;

use reqwest::{header, Client};

use crate::{metrics, server::ServerOptions};
use indexer_common::{graphql::filter_supported_fields, indexer_errors::IndexerErrorCode};
use crate::server::ServerOptions;
use indexer_common::{graphql::filter_supported_fields, indexer_errors::*};

use super::bad_request_response;

Expand Down Expand Up @@ -68,16 +68,22 @@ pub async fn status_queries(
Ok(r) => match r.json::<Box<serde_json::value::RawValue>>().await {
Ok(r) => (StatusCode::OK, Json(r)).into_response(),
Err(e) => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE018.to_string()])
.inc();
IndexerError::new(
IndexerErrorCode::IE018,
Some(IndexerErrorCause::new(
"Failed to parse the indexing status API response",
)),
);
bad_request_response(&e.to_string())
}
},
Err(e) => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE018.to_string()])
.inc();
IndexerError::new(
IndexerErrorCode::IE018,
Some(IndexerErrorCause::new(
"Failed to query indexing status API from the graph node status endpoint",
)),
);
bad_request_response(&e.to_string())
}
}
Expand Down
30 changes: 19 additions & 11 deletions service/src/server/routes/subgraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
ServerOptions,
},
};
use indexer_common::indexer_errors::IndexerErrorCode;
use indexer_common::indexer_errors::*;

/// Parse an incoming query request and route queries with authenticated
/// free query token to graph node
Expand Down Expand Up @@ -53,10 +53,12 @@ pub async fn subgraph_queries(
metrics::QUERIES_WITH_INVALID_RECEIPT_HEADER
.with_label_values(&[&deployment_label])
.inc();
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE029.to_string()])
.inc();
return bad_request_response("Bad scalar receipt for subgraph query");
let err_msg = "Bad scalar receipt for subgraph query";
IndexerError::new(
IndexerErrorCode::IE029,
Some(IndexerErrorCause::new(err_msg)),
);
return bad_request_response(err_msg);
}
}
} else {
Expand Down Expand Up @@ -97,9 +99,12 @@ pub async fn subgraph_queries(
(StatusCode::OK, Json(res.result)).into_response()
}
_ => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE033.to_string()])
.inc();
IndexerError::new(
IndexerErrorCode::IE033,
Some(IndexerErrorCause::new(
"Failed to execute a free subgraph query to graph node",
)),
);
bad_request_response("Failed to execute free query")
}
}
Expand All @@ -122,9 +127,12 @@ pub async fn subgraph_queries(
metrics::FAILED_QUERIES
.with_label_values(&[&deployment_label])
.inc();
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE032.to_string()])
.inc();
IndexerError::new(
IndexerErrorCode::IE032,
Some(IndexerErrorCause::new(
"Failed to execute a paid subgraph query to graph node",
)),
);
return bad_request_response("Failed to execute paid query");
}
}
Expand Down
11 changes: 4 additions & 7 deletions service/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tracing::{
use tracing_subscriber::{EnvFilter, FmtSubscriber};

use crate::common::address::{build_wallet, wallet_address};
use indexer_common::indexer_errors::{indexer_error, IndexerError, IndexerErrorCode};

/// Struct for version control
#[derive(Serialize, Debug, Clone)]
Expand All @@ -24,16 +23,14 @@ pub struct PackageVersion {
}

/// Read the manfiest
fn read_manifest() -> Result<Value, IndexerError> {
let toml_string = fs::read_to_string("service/Cargo.toml")
.map_err(|_e| indexer_error(IndexerErrorCode::IE074))?;
let toml_value: Value =
toml::from_str(&toml_string).map_err(|_e| indexer_error(IndexerErrorCode::IE074))?;
fn read_manifest() -> Result<Value, anyhow::Error> {
let toml_string = fs::read_to_string("service/Cargo.toml")?;
let toml_value: Value = toml::from_str(&toml_string)?;
Ok(toml_value)
}

/// Parse package versioning from the manifest
pub fn package_version() -> Result<PackageVersion, IndexerError> {
pub fn package_version() -> Result<PackageVersion, anyhow::Error> {
read_manifest().map(|toml_file| {
let pkg = toml_file.as_table().unwrap();
let version = pkg
Expand Down

0 comments on commit 444f50a

Please sign in to comment.