Skip to content

Commit

Permalink
feat(service): finish implementing the new subgraph service
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Dec 7, 2023
1 parent 5d6b2da commit ca23a3b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 30 deletions.
95 changes: 71 additions & 24 deletions service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use axum::{
};
use clap::Parser;
use indexer_common::indexer_service::http::{
IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease, IsAttestable,
IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease,
IndexerServiceResponse,
};
use reqwest::StatusCode;
use serde::Serialize;
use reqwest::{StatusCode, Url};
use serde_json::{json, Value};
use sqlx::PgPool;
use thegraph::types::DeploymentId;
use thegraph::types::{Attestation, DeploymentId};
use thiserror::Error;
use tracing::error;

Expand All @@ -39,6 +39,10 @@ pub enum SubgraphServiceError {
UnsupportedStatusQueryFields(Vec<String>),
#[error("Internal server error: {0}")]
StatusQueryError(Error),
#[error("Invalid deployment: {0}")]
InvalidDeployment(DeploymentId),
#[error("Failed to process query: {0}")]
QueryForwardingError(reqwest::Error),
}

impl From<&SubgraphServiceError> for StatusCode {
Expand All @@ -48,6 +52,8 @@ impl From<&SubgraphServiceError> for StatusCode {
InvalidStatusQuery(_) => StatusCode::BAD_REQUEST,
UnsupportedStatusQueryFields(_) => StatusCode::BAD_REQUEST,
StatusQueryError(_) => StatusCode::INTERNAL_SERVER_ERROR,
InvalidDeployment(_) => StatusCode::BAD_REQUEST,
QueryForwardingError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
Expand All @@ -59,30 +65,36 @@ impl IntoResponse for SubgraphServiceError {
}
}

#[derive(Serialize)]
#[serde(transparent)]
struct SubgraphResponse {
inner: Value,
#[serde(skip)]
#[derive(Debug)]
struct SubgraphServiceResponse {
inner: String,
attestable: bool,
}

impl SubgraphResponse {
fn new(inner: Value, attestable: bool) -> Self {
impl SubgraphServiceResponse {
fn new(inner: String, attestable: bool) -> Self {
Self { inner, attestable }
}
}

impl IntoResponse for SubgraphResponse {
fn into_response(self) -> Response {
Json(self.inner).into_response()
}
}
impl IndexerServiceResponse for SubgraphServiceResponse {
type Data = Json<Value>;
type Error = SubgraphServiceError; // not used

impl IsAttestable for SubgraphResponse {
fn is_attestable(&self) -> bool {
self.attestable
}

fn as_str(&self) -> Result<&str, Self::Error> {
Ok(self.inner.as_str())
}

fn finalize(self, attestation: Option<Attestation>) -> Self::Data {
Json(json!({
"graphQLResponse": self.inner,
"attestation": attestation
}))
}
}

pub struct SubgraphServiceState {
Expand All @@ -91,31 +103,59 @@ pub struct SubgraphServiceState {
pub cost_schema: routes::cost::CostSchema,
pub graph_node_client: reqwest::Client,
pub graph_node_status_url: String,
pub graph_node_query_base_url: String,
}

struct SubgraphService {
config: Config,
state: Arc<SubgraphServiceState>,
}

impl SubgraphService {
fn new(config: Config) -> Self {
Self { config }
fn new(state: Arc<SubgraphServiceState>) -> Self {
Self { state }
}
}

#[async_trait]
impl IndexerServiceImpl for SubgraphService {
type Error = SubgraphServiceError;
type Request = serde_json::Value;
type Response = SubgraphResponse;
type Response = SubgraphServiceResponse;
type State = SubgraphServiceState;

async fn process_request(
&self,
_manifest_id: DeploymentId,
deployment: DeploymentId,
request: Self::Request,
) -> Result<(Self::Request, Self::Response), Self::Error> {
Ok((request, SubgraphResponse::new(json!("hello"), false)))
let deployment_url = Url::parse(&format!(
"{}/subgraphs/id/{}",
&self.state.graph_node_query_base_url, deployment
))
.map_err(|_| SubgraphServiceError::InvalidDeployment(deployment))?;

let response = self
.state
.graph_node_client
.post(deployment_url)
.json(&request)
.send()
.await
.map_err(SubgraphServiceError::QueryForwardingError)?;

let attestable = response
.headers()
.get("graph-attestable")
.map_or(false, |value| {
value.to_str().map(|value| value == "true").unwrap_or(false)
});

let body = response
.text()
.await
.map_err(|e| SubgraphServiceError::QueryForwardingError(reqwest::Error::from(e)))?;

Ok((request, SubgraphServiceResponse::new(body, attestable)))
}
}

Expand Down Expand Up @@ -165,14 +205,21 @@ async fn main() -> Result<(), Error> {
.expect("Config must have `common.graph_node.status_url` set")
.status_url
.clone(),
graph_node_query_base_url: config
.common
.graph_node
.as_ref()
.expect("config must have `common.graph_node.query_url` set")
.query_base_url
.clone(),
});

IndexerService::run(IndexerServiceOptions {
release,
config: config.common.clone(),
url_namespace: "subgraphs",
metrics_prefix: "subgraph",
service_impl: SubgraphService::new(config),
service_impl: SubgraphService::new(state.clone()),
extra_routes: Router::new()
.route("/cost", post(routes::cost::cost))
.route("/status", post(routes::status))
Expand Down
15 changes: 9 additions & 6 deletions service/src/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub async fn status(
request: GraphQLRequest,
) -> Result<impl IntoResponse, SubgraphServiceError> {
let request = request.into_inner();

let query: q::Document<String> = q::parse_query(request.query.as_str())
.map_err(|e| SubgraphServiceError::InvalidStatusQuery(e.into()))?;

Expand Down Expand Up @@ -105,10 +106,12 @@ pub async fn status(
.await
.map_err(|e| SubgraphServiceError::StatusQueryError(e.into()))?;

result.map(Json).or_else(|e| match e {
ResponseError::Failure { errors } => Ok(Json(json!({
"errors": errors,
}))),
ResponseError::Empty => todo!(),
})
result
.map(|data| Json(json!({"data": data})))
.or_else(|e| match e {
ResponseError::Failure { errors } => Ok(Json(json!({
"errors": errors,
}))),
ResponseError::Empty => todo!(),
})
}

0 comments on commit ca23a3b

Please sign in to comment.