Skip to content

Commit

Permalink
add route handlers so we can keep returning MutationError/QueryError …
Browse files Browse the repository at this point in the history
…after sdk update
  • Loading branch information
BenoitRanque committed Sep 27, 2024
1 parent 487b073 commit a2920f3
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 207 deletions.
226 changes: 19 additions & 207 deletions crates/ndc-graphql/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use self::state::ServerState;
use crate::query_builder::{build_mutation_document, build_query_document};
use async_trait::async_trait;
use common::{
capabilities::capabilities,
client::{execute_graphql, GraphQLRequest},
config::ServerConfig,
schema_response::schema_response,
};
use indexmap::IndexMap;
use common::{capabilities::capabilities, config::ServerConfig, schema_response::schema_response};
use mutation::{handle_mutation, handle_mutation_explain};
use ndc_sdk::{
connector::{self, Connector, MutationError, QueryError},
connector::{self, Connector},
json_response::JsonResponse,
models::{self, FieldName},
models,
};
use std::{collections::BTreeMap, mem};
use tracing::{Instrument, Level};
use query::{handle_query, handle_query_explain};
mod mutation;
mod query;
pub mod setup;
mod state;

Expand Down Expand Up @@ -49,224 +44,41 @@ impl Connector for GraphQLConnector {

async fn query_explain(
configuration: &Self::Configuration,
_state: &Self::State,
state: &Self::State,
request: models::QueryRequest,
) -> connector::Result<JsonResponse<models::ExplainResponse>> {
let operation = tracing::info_span!("Build Query Document", internal.visibility = "user")
.in_scope(|| build_query_document(&request, configuration))
.map_err(|err| QueryError::new_invalid_request(&err))?;

let query = serde_json::to_string_pretty(&GraphQLRequest::new(
&operation.query,
&operation.variables,
Ok(JsonResponse::Value(
handle_query_explain(configuration, state, request).await?,
))
.map_err(|err| QueryError::new_invalid_request(&err))?;

let details = BTreeMap::from_iter(vec![
("SQL Query".to_string(), operation.query),
("Execution Plan".to_string(), query),
(
"Headers".to_string(),
serde_json::to_string(&operation.headers).expect("should convert headers to json"),
),
]);

Ok(JsonResponse::Value(models::ExplainResponse { details }))
}

async fn mutation_explain(
configuration: &Self::Configuration,
_state: &Self::State,
state: &Self::State,
request: models::MutationRequest,
) -> connector::Result<JsonResponse<models::ExplainResponse>> {
let operation =
tracing::info_span!("Build Mutation Document", internal.visibility = "user")
.in_scope(|| build_mutation_document(&request, configuration))
.map_err(|err| MutationError::new_invalid_request(&err))?;

let query = serde_json::to_string_pretty(&GraphQLRequest::new(
&operation.query,
&operation.variables,
Ok(JsonResponse::Value(
handle_mutation_explain(configuration, state, request).await?,
))
.map_err(|err| MutationError::new_invalid_request(&err))?;

let details = BTreeMap::from_iter(vec![
("SQL Query".to_string(), operation.query),
("Execution Plan".to_string(), query),
(
"Headers".to_string(),
serde_json::to_string(&operation.headers).expect("should convert headers to json"),
),
]);

Ok(JsonResponse::Value(models::ExplainResponse { details }))
}

async fn mutation(
configuration: &Self::Configuration,
state: &Self::State,
request: models::MutationRequest,
) -> connector::Result<JsonResponse<models::MutationResponse>> {
#[cfg(debug_assertions)]
{
// this block only present in debug builds, to avoid leaking sensitive information
let request_string = serde_json::to_string(&request)
.map_err(|err| MutationError::new_invalid_request(&err))?;
tracing::event!(Level::DEBUG, "Incoming IR" = request_string);
}

let operation =
tracing::info_span!("Build Mutation Document", internal.visibility = "user").in_scope(
|| {
build_mutation_document(&request, configuration)
.map_err(|err| MutationError::new_invalid_request(&err))
},
)?;

let client = state
.client(configuration)
.await
.map_err(|err| MutationError::new_invalid_request(&err))?;

let execution_span =
tracing::info_span!("Execute GraphQL Mutation", internal.visibility = "user");

let (headers, response) = execute_graphql::<IndexMap<String, serde_json::Value>>(
&operation.query,
operation.variables,
&configuration.connection.endpoint,
&operation.headers,
&client,
&configuration.response.forward_headers,
)
.instrument(execution_span)
.await
.map_err(|err| MutationError::new_invalid_request(&err))?;

Ok(tracing::info_span!("Process Response").in_scope(|| {
if let Some(errors) = response.errors {
Err(MutationError::new_unprocessable_content(&errors[0].message)
.with_details(serde_json::json!({ "errors": errors })))
} else if let Some(mut data) = response.data {
let forward_response_headers = !configuration.response.forward_headers.is_empty();

let operation_results = request
.operations
.iter()
.enumerate()
.map(|(index, operation)| match operation {
models::MutationOperation::Procedure { .. } => Ok({
let alias = format!("procedure_{index}");
let result = data
.get_mut(&alias)
.map(|val| mem::replace(val, serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
let result = if forward_response_headers {
serde_json::to_value(BTreeMap::from_iter(vec![
(
configuration.response.headers_field.to_string(),
serde_json::to_value(&headers)?,
),
(configuration.response.response_field.to_string(), result),
]))?
} else {
result
};

models::MutationOperationResults::Procedure { result }
}),
})
.collect::<Result<Vec<_>, serde_json::Error>>()
.map_err(|err| MutationError::new_invalid_request(&err))?;

Ok(JsonResponse::Value(models::MutationResponse {
operation_results,
}))
} else {
Err(MutationError::new_unprocessable_content(
&"No data or errors in response",
))
}
})?)
Ok(JsonResponse::Value(
handle_mutation(configuration, state, request).await?,
))
}

async fn query(
configuration: &Self::Configuration,
state: &Self::State,
request: models::QueryRequest,
) -> connector::Result<JsonResponse<models::QueryResponse>> {
#[cfg(debug_assertions)]
{
// this block only present in debug builds, to avoid leaking sensitive information
let request_string = serde_json::to_string(&request)
.map_err(|err| QueryError::new_invalid_request(&err))?;
tracing::event!(Level::DEBUG, "Incoming IR" = request_string);
}

let operation = tracing::info_span!("Build Query Document", internal.visibility = "user")
.in_scope(|| {
build_query_document(&request, configuration)
.map_err(|err| QueryError::new_invalid_request(&err))
})?;

let client = state
.client(configuration)
.await
.map_err(|err| QueryError::new_invalid_request(&err))?;

let execution_span =
tracing::info_span!("Execute GraphQL Query", internal.visibility = "user");

let (headers, response) = execute_graphql::<IndexMap<FieldName, models::RowFieldValue>>(
&operation.query,
operation.variables,
&configuration.connection.endpoint,
&operation.headers,
&client,
&configuration.response.forward_headers,
)
.instrument(execution_span)
.await
.map_err(|err| QueryError::new_invalid_request(&err))?;

Ok(tracing::info_span!("Process Response").in_scope(|| {
if let Some(errors) = response.errors {
Err(QueryError::new_unprocessable_content(&errors[0].message)
.with_details(serde_json::json!({ "errors": errors })))
} else if let Some(data) = response.data {
let forward_response_headers = !configuration.response.forward_headers.is_empty();

let row = if forward_response_headers {
let headers = serde_json::to_value(headers)
.map_err(|err| QueryError::new_invalid_request(&err))?;
let data = serde_json::to_value(data)
.map_err(|err| QueryError::new_invalid_request(&err))?;

IndexMap::from_iter(vec![
(
configuration.response.headers_field.to_string().into(),
models::RowFieldValue(headers),
),
(
configuration.response.response_field.to_string().into(),
models::RowFieldValue(data),
),
])
} else {
data
};

Ok(JsonResponse::Value(models::QueryResponse(vec![
models::RowSet {
aggregates: None,
rows: Some(vec![row]),
},
])))
} else {
Err(QueryError::new_unprocessable_content(
&"No data or errors in response",
))
}
})?)
Ok(JsonResponse::Value(
handle_query(configuration, state, request).await?,
))
}
}
115 changes: 115 additions & 0 deletions crates/ndc-graphql/src/connector/mutation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use super::state::ServerState;
use crate::query_builder::build_mutation_document;
use common::{
client::{execute_graphql, GraphQLRequest},
config::ServerConfig,
};
use indexmap::IndexMap;
use ndc_sdk::{connector::MutationError, models};
use std::{collections::BTreeMap, mem};
use tracing::{Instrument, Level};

pub async fn handle_mutation_explain(
configuration: &ServerConfig,
_state: &ServerState,
request: models::MutationRequest,
) -> Result<models::ExplainResponse, MutationError> {
let operation = tracing::info_span!("Build Mutation Document", internal.visibility = "user")
.in_scope(|| build_mutation_document(&request, configuration))?;

let query =
serde_json::to_string_pretty(&GraphQLRequest::new(&operation.query, &operation.variables))
.map_err(|err| MutationError::new_invalid_request(&err))?;

let details = BTreeMap::from_iter(vec![
("SQL Query".to_string(), operation.query),
("Execution Plan".to_string(), query),
(
"Headers".to_string(),
serde_json::to_string(&operation.headers).expect("should convert headers to json"),
),
]);

Ok(models::ExplainResponse { details })
}

pub async fn handle_mutation(
configuration: &ServerConfig,
state: &ServerState,
request: models::MutationRequest,
) -> Result<models::MutationResponse, MutationError> {
#[cfg(debug_assertions)]
{
// this block only present in debug builds, to avoid leaking sensitive information
let request_string = serde_json::to_string(&request)
.map_err(|err| MutationError::new_invalid_request(&err))?;
tracing::event!(Level::DEBUG, "Incoming IR" = request_string);
}

let operation = tracing::info_span!("Build Mutation Document", internal.visibility = "user")
.in_scope(|| build_mutation_document(&request, configuration))?;

let client = state
.client(configuration)
.await
.map_err(|err| MutationError::new_invalid_request(&err))?;

let execution_span =
tracing::info_span!("Execute GraphQL Mutation", internal.visibility = "user");

let (headers, response) = execute_graphql::<IndexMap<String, serde_json::Value>>(
&operation.query,
operation.variables,
&configuration.connection.endpoint,
&operation.headers,
&client,
&configuration.response.forward_headers,
)
.instrument(execution_span)
.await
.map_err(|err| MutationError::new_unprocessable_content(&err))?;

tracing::info_span!("Process Response").in_scope(|| {
if let Some(errors) = response.errors {
Err(MutationError::new_unprocessable_content(&errors[0].message)
.with_details(serde_json::json!({ "errors": errors })))
} else if let Some(mut data) = response.data {
let forward_response_headers = !configuration.response.forward_headers.is_empty();

let operation_results = request
.operations
.iter()
.enumerate()
.map(|(index, operation)| match operation {
models::MutationOperation::Procedure { .. } => Ok({
let alias = format!("procedure_{index}");
let result = data
.get_mut(&alias)
.map(|val| mem::replace(val, serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
let result = if forward_response_headers {
serde_json::to_value(BTreeMap::from_iter(vec![
(
configuration.response.headers_field.to_string(),
serde_json::to_value(&headers)?,
),
(configuration.response.response_field.to_string(), result),
]))?
} else {
result
};

models::MutationOperationResults::Procedure { result }
}),
})
.collect::<Result<Vec<_>, serde_json::Error>>()
.map_err(|err| MutationError::new_unprocessable_content(&err))?;

Ok(models::MutationResponse { operation_results })
} else {
Err(MutationError::new_unprocessable_content(
&"No data or errors in response",
))
}
})
}
Loading

0 comments on commit a2920f3

Please sign in to comment.