Skip to content

Commit

Permalink
feat: add serving /network and /escrow subgraphs back in
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Dec 11, 2023
1 parent 6d520f9 commit 9379047
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 42 deletions.
4 changes: 4 additions & 0 deletions common/src/indexer_service/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub struct DatabaseConfig {

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SubgraphConfig {
#[serde(default)]
pub serve_subgraph: bool,
pub serve_auth_token: Option<String>,

pub deployment: Option<DeploymentId>,
pub query_url: String,
pub syncing_interval: u64,
Expand Down
78 changes: 68 additions & 10 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use axum::{
error_handling::HandleErrorLayer,
response::{IntoResponse, Response},
routing::{get, post},
BoxError, Json, Router, Server,
BoxError, Extension, Json, Router, Server,
};
use build_info::BuildInfo;
use eventuals::Eventual;
Expand All @@ -31,7 +31,9 @@ use tower_governor::{errors::display_error, governor::GovernorConfigBuilder, Gov
use tracing::info;

use crate::{
indexer_service::http::metrics::IndexerServiceMetrics,
indexer_service::http::{
metrics::IndexerServiceMetrics, static_subgraph::static_subgraph_request_handler,
},
prelude::{
attestation_signers, dispute_manager, escrow_accounts, indexer_allocations,
AttestationSigner, DeploymentDetails, SubgraphClient,
Expand Down Expand Up @@ -83,7 +85,7 @@ where
InvalidRequest(anyhow::Error),
#[error("Error while processing the request: {0}")]
ProcessingError(E),
#[error("No receipt or free query auth token provided")]
#[error("No valid receipt or free query auth token provided")]
Unauthorized,
#[error("Invalid free query auth token")]
InvalidFreeQueryAuthToken,
Expand All @@ -93,6 +95,8 @@ where
FailedToProvideAttestation,
#[error("Failed to provide response")]
FailedToProvideResponse,
#[error("Failed to query subgraph: {0}")]
FailedToQueryStaticSubgraph(anyhow::Error),
}

impl<E> From<&IndexerServiceError<E>> for StatusCode
Expand All @@ -119,6 +123,8 @@ where
InvalidRequest(_) => StatusCode::BAD_REQUEST,
InvalidFreeQueryAuthToken => StatusCode::BAD_REQUEST,
ProcessingError(_) => StatusCode::BAD_REQUEST,

FailedToQueryStaticSubgraph(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
Expand Down Expand Up @@ -192,7 +198,7 @@ impl IndexerService {
.build()
.expect("Failed to init HTTP client");

let network_subgraph = Box::leak(Box::new(SubgraphClient::new(
let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
options
.config
Expand Down Expand Up @@ -234,7 +240,7 @@ impl IndexerService {
dispute_manager,
);

let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
http_client,
options
.config
Expand Down Expand Up @@ -294,7 +300,7 @@ impl IndexerService {
// Rate limits by allowing bursts of 10 requests and requiring 100ms of
// time between consecutive requests after that, effectively rate
// limiting to 10 req/s.
let rate_limiter = GovernorLayer {
let misc_rate_limiter = GovernorLayer {
config: Box::leak(Box::new(
GovernorConfigBuilder::default()
.per_millisecond(100)
Expand All @@ -304,17 +310,69 @@ impl IndexerService {
)),
};

let misc_routes = Router::new()
let mut misc_routes = Router::new()
.route("/", get("Service is up and running"))
.route("/version", get(Json(options.release)))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|e: BoxError| async move {
display_error(e)
}))
.layer(rate_limiter),
)
.with_state(state.clone());
.layer(misc_rate_limiter),
);

// Rate limits by allowing bursts of 50 requests and requiring 20ms of
// time between consecutive requests after that, effectively rate
// limiting to 50 req/s.
let static_subgraph_rate_limiter = GovernorLayer {
config: Box::leak(Box::new(
GovernorConfigBuilder::default()
.per_millisecond(20)
.burst_size(50)
.finish()
.expect("Failed to set up rate limiting"),
)),
};

if options.config.network_subgraph.serve_subgraph {
info!("Serving network subgraph at /network");

misc_routes = misc_routes.route(
"/network",
post(static_subgraph_request_handler::<I>)
.route_layer(Extension(network_subgraph))
.route_layer(Extension(
options.config.network_subgraph.serve_auth_token.clone(),
))
.route_layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|e: BoxError| async move {
display_error(e)
}))
.layer(static_subgraph_rate_limiter.clone()),
),
);
}

if options.config.escrow_subgraph.serve_subgraph {
info!("Serving escrow subgraph at /escrow");

misc_routes = misc_routes
.route("/escrow", post(static_subgraph_request_handler::<I>))
.route_layer(Extension(escrow_subgraph))
.route_layer(Extension(
options.config.escrow_subgraph.serve_auth_token.clone(),
))
.route_layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|e: BoxError| async move {
display_error(e)
}))
.layer(static_subgraph_rate_limiter),
);
}

misc_routes = misc_routes.with_state(state.clone());

let data_routes = Router::new()
.route(
Expand Down
1 change: 1 addition & 0 deletions common/src/indexer_service/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod indexer_service;
mod metrics;
mod request_handler;
mod scalar_receipt_header;
mod static_subgraph;

pub use config::{
DatabaseConfig, GraphNetworkConfig, IndexerConfig, IndexerServiceConfig, ServerConfig,
Expand Down
49 changes: 49 additions & 0 deletions common/src/indexer_service/http/static_subgraph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use axum::{body::Bytes, http::HeaderMap, response::IntoResponse, Extension};
use tracing::warn;

use crate::subgraph_client::SubgraphClient;

use super::{indexer_service::IndexerServiceError, IndexerServiceImpl};

#[autometrics::autometrics]
pub async fn static_subgraph_request_handler<I>(
Extension(subgraph_client): Extension<&'static SubgraphClient>,
Extension(required_auth_token): Extension<Option<String>>,
headers: HeaderMap,
body: Bytes,
) -> Result<impl IntoResponse, IndexerServiceError<I::Error>>
where
I: IndexerServiceImpl + Sync + Send + 'static,
{
if let Some(required_auth_token) = required_auth_token {
let authorization = headers
.get("authorization")
.map(|value| value.to_str())
.transpose()
.map_err(|_| IndexerServiceError::Unauthorized)?
.ok_or_else(|| IndexerServiceError::Unauthorized)?
.trim_start_matches("Bearer ");

if authorization != required_auth_token {
return Err(IndexerServiceError::Unauthorized);
}
}

let response = subgraph_client
.query_raw(body)
.await
.map_err(|e| IndexerServiceError::FailedToQueryStaticSubgraph(e))?;

Ok((
response.status(),
response.headers().to_owned(),
response
.text()
.await
.map_err(|e| {
warn!("Failed to read response body: {}", e);
e
})
.map_err(|e| IndexerServiceError::FailedToQueryStaticSubgraph(e.into()))?,
))
}
33 changes: 1 addition & 32 deletions service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,18 @@

use std::path::PathBuf;

use alloy_primitives::Address;
use figment::{
providers::{Format, Toml},
Figment,
};
use indexer_common::indexer_service::http::IndexerServiceConfig;
use serde::{Deserialize, Serialize};
use thegraph::types::DeploymentId;
use serde::Deserialize;

#[derive(Clone, Debug, Deserialize)]
pub struct Config {
// pub receipts: Receipts,
// pub indexer_infrastructure: IndexerInfrastructure,
// pub postgres: Postgres,
// pub network_subgraph: NetworkSubgraph,
// pub escrow_subgraph: EscrowSubgraph,
pub common: IndexerServiceConfig,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct NetworkSubgraph {
// #[clap(
// long,
// value_name = "serve-network-subgraph",
// env = "SERVE_NETWORK_SUBGRAPH",
// default_value_t = false,
// help = "Whether to serve the network subgraph at /network"
// )]
pub serve_network_subgraph: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct EscrowSubgraph {
// #[clap(
// long,
// value_name = "serve-escrow-subgraph",
// env = "SERVE_ESCROW_SUBGRAPH",
// default_value_t = false,
// help = "Whether to serve the escrow subgraph at /escrow"
// )]
// pub serve_escrow_subgraph: bool,
}

impl Config {
pub fn load(filename: &PathBuf) -> Result<Self, figment::Error> {
Figment::new().merge(Toml::file(filename)).extract()
Expand Down

0 comments on commit 9379047

Please sign in to comment.