Skip to content

Commit

Permalink
use async tungstenite in graphql service
Browse files Browse the repository at this point in the history
  • Loading branch information
mxfactorial committed Nov 28, 2024
1 parent 44ddf45 commit 1b922da
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions services/graphql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use axum::{
routing::get,
Router,
};
use futures_util::stream::Stream;
use futures_util::{stream::Stream, StreamExt};
use httpclient::HttpClient as Client;
use serde_json::json;
use shutdown::shutdown_signal;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl Subscription {
.to_string();
let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality);
stream! {
let mut measure_socket = match ws_client.connect() {
let measure_socket = match ws_client.connect().await {
Ok(ws) => {
tracing::info!("graphql websocket connection created with measure");
ws
Expand All @@ -215,9 +215,17 @@ impl Subscription {
return;
}
};

// the async-graphql crate limits the scope of the websocket lifecycle to where its upgraded:
// https://github.com/async-graphql/async-graphql/issues/1022#issuecomment-1214541591
// this means it cant send a close message to the measure service from inside the subscription
// todo: add on_close(closed) callback support to graphql subscription context so
// a close message can be written/sent to the measure service
let (_write, mut read) = measure_socket.split();

loop {
match measure_socket.read() {
Ok(msg) => {
match read.next().await {
Some(Ok(msg)) => {
match msg {
tungstenite::Message::Text(text) => {
let gdp: f64 = serde_json::from_str(&text).unwrap();
Expand All @@ -229,18 +237,22 @@ impl Subscription {
}
}
}
Err(WsError::ConnectionClosed) => {
measure_socket.close(None).unwrap();
tracing::info!("graphql received closed message from measure");
Some(Err(e)) => {
match e {
WsError::ConnectionClosed => {
tracing::info!("graphql received closed message from measure");
}
_ => {
tracing::info!("graphql message receipt failure from measure: {:?}", e);
}
}
break;
}
Err(e) => {
tracing::info!("graphql message receipt failure from measure: {:?}", e);
None => {
tracing::info!("graphql received closed message from measure");
break;
}
}
// throttle reads from measure service to avoid blocking close messages from client
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
}
Expand Down

0 comments on commit 1b922da

Please sign in to comment.