Skip to content

Commit

Permalink
Merge pull request #383 from systemaccounting/382-client-websocket-reuse
Browse files Browse the repository at this point in the history
382 client websocket reuse
  • Loading branch information
mxfactorial authored Sep 6, 2024
2 parents 83c761c + 982380c commit 01f5d60
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 59 deletions.
30 changes: 9 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 38 additions & 17 deletions client/src/routes/measure/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,48 @@
let priceTag: HTMLDivElement;
let map: google.maps.Map;
let markers = [] as any[]; // todo: google.maps.marker.AdvancedMarkerElement does not expect setMap method
let currentWsClient: { dispose: () => void } | null = null;
let messageCount = 0;
let wsClient: any = null; // todo: graphql-ws Client type
let stop = false;
let defaultCoordinates = { lat: 39.8283, lng: -98.5795 }; // usa
async function subscribeGdp(
date: string,
country: string,
region: string,
municipality: string | null
) {
// terminate previous websocket connection
if (currentWsClient) {
currentWsClient.dispose();
currentWsClient = null;
if (wsClient) {
stop = true;
} else {
wsClient = createWSClient({
url: b64.decode(process.env.GRAPHQL_SUBSCRIPTIONS_URI as string),
lazy: false
});
}
const wsClient = createWSClient({
url: b64.decode(process.env.GRAPHQL_SUBSCRIPTIONS_URI as string)
});
currentWsClient = wsClient;
const query = `subscription QueryGdp($date: String!, $country: String, $region: String, $municipality: String) {
queryGdp(date: $date, country: $country, region: $region, municipality: $municipality)
}`;
const variables: any = { date, country, region };
if (municipality) {
variables.municipality = municipality;
}
const subscription: any = wsClient.iterate({
query,
let subscription = wsClient.iterate({
query: `subscription QueryGdp($date: String!, $country: String, $region: String, $municipality: String) {
queryGdp(date: $date, country: $country, region: $region, municipality: $municipality)
}`,
variables
});
messageCount = 0;
for await (const { data } of subscription) {
if (stop) {
// console.log('stopping subscription');
stop = false;
break;
}
if (messageCount == 0) {
// console.log('starting subscription');
}
messageCount++;
price = data.queryGdp.toFixed(3);
}
Expand All @@ -58,6 +65,11 @@
async function handleSearch(event: Event) {
event.preventDefault();
if (searchQuery.trim() === '') {
resetMap();
return;
}
// parse location from search query
let location;
if (searchQuery.includes('gdp')) {
Expand All @@ -72,8 +84,8 @@
location = 'sacramento california';
}
const queryVars = parseQuery(location);
changeMapCenter(location);
const queryVars = parseQuery(location);
await subscribeGdp(queryVars.time, queryVars.country, queryVars.region, queryVars.municipality);
}
Expand Down Expand Up @@ -108,7 +120,7 @@
loader.load().then((google) => {
const { Map } = google.maps;
map = new Map(document.getElementById('map') as HTMLElement, {
center: { lat: 39.8283, lng: -98.5795 }, // usa coordinates
center: defaultCoordinates, // usa coordinates
zoom: 4,
mapId: '4504f8b37365c3d0'
});
Expand All @@ -126,6 +138,15 @@
}, 100); // check every 100 milliseconds
}
function resetMap() {
if (map) {
markers.forEach((marker) => marker.setMap(null));
markers = [];
map.setCenter(defaultCoordinates);
map.setZoom(4);
}
}
function changeMapCenter(location: string) {
if (map) {
// clear existing markers
Expand Down
8 changes: 5 additions & 3 deletions crates/wsclient/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ impl WsClient {
WsClient { uri }
}

pub fn connect(&self) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (socket, _) = connect(self.uri.query_string()).expect("failed to connect");
socket
pub fn connect(
&self,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::error::Error> {
let (ws_stream, _) = connect(self.uri.query_string())?;
Ok(ws_stream)
}
}

Expand Down
8 changes: 4 additions & 4 deletions services/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ rust-version.workspace = true
async-graphql = "7.0.0"
async-graphql-axum = "7.0.0"
axum = "0.7.4"
tokio = { version = "1.35.1", features = [
"macros",
"rt-multi-thread",
] }
tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
types = { path = "../../crates/types" }
Expand All @@ -25,6 +22,9 @@ http = "1.0.0"
shutdown = { path = "../../crates/shutdown" }
futures-util = "0.3.30"
wsclient = { path = "../../crates/wsclient" }
tungstenite = { version = "0.24.0", default-features = false }
async-stream = "0.3.5"
log = "0.4.22"

[target.x86_64-unknown-linux-musl.dependencies]
# https://github.com/cross-rs/cross/wiki/Recipes#vendored
Expand Down
76 changes: 62 additions & 14 deletions services/graphql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ use ::types::{
};
use ::wsclient::WsClient;
use async_graphql::*;
use async_graphql_axum::{GraphQL, GraphQLSubscription};
use async_graphql_axum::{GraphQL, GraphQLProtocol, GraphQLWebSocket};
use async_stream::stream;
use aws_lambda_events::event::apigw::ApiGatewayV2httpRequestContext;
use axum::{
extract::{State, WebSocketUpgrade},
http::{HeaderMap, StatusCode},
response::{self, IntoResponse},
routing::get,
Router,
};
use futures_util::{stream, stream::Stream};
use futures_util::stream::Stream;
use httpclient::HttpClient as Client;
use log::debug;
use serde_json::json;
use shutdown::shutdown_signal;
use std::{env, net::ToSocketAddrs, result::Result, task::Poll};
use std::{env, net::ToSocketAddrs, result::Result};
use tokio::net::TcpListener;
use tower_http::cors::CorsLayer;
use tungstenite::error::Error as WsError;

const READINESS_CHECK_PATH: &str = "READINESS_CHECK_PATH";
const GRAPHQL_RESOURCE: &str = "query";
Expand Down Expand Up @@ -198,16 +202,44 @@ impl Subscription {
let resource = env::var("MEASURE_RESOURCE").unwrap();
let uri = format!("{}/{}", base_uri, resource);
let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality);
let mut socket = ws_client.connect();

// send socket messages to stream
stream::poll_fn(move |_cx| match socket.read() {
Ok(msg) => {
let gdp = msg.into_text().unwrap().parse::<f64>().unwrap();
Poll::Ready(Some(gdp))
stream! {
let mut measure_socket = match ws_client.connect() {
Ok(ws) => {
debug!("measure websocket connection created");
ws
}
Err(_e) => {
debug!("measure webSocket connection failure: {:?}", _e);
return;
}
};

loop {
match measure_socket.read() {
Ok(msg) => {
match msg {
tungstenite::Message::Text(text) => {
let gdp: f64 = serde_json::from_str(&text).unwrap();
yield gdp;
}
_ => {
debug!("received non-text message: {:?}", msg);
}
}
}
Err(WsError::ConnectionClosed) => {
measure_socket.close(None).unwrap();
debug!("measure websocket closed");
break;
}
Err(e) => {
debug!("measure message receipt failure: {:?}", e);
break;
}
}
}
Err(_e) => Poll::Ready(None),
})
}
}
}

Expand Down Expand Up @@ -266,6 +298,21 @@ async fn graphiql() -> impl IntoResponse {
)
}

async fn graphql_subscription(
State(schema): State<Schema<Query, Mutation, Subscription>>,
protocol: GraphQLProtocol,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.protocols(http::ALL_WEBSOCKET_PROTOCOLS)
.on_upgrade(move |socket| async move {
// println!("connection opened");
GraphQLWebSocket::new(socket, schema, protocol)
.serve()
.await;
// println!("connection closed");
})
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_ansi(false).init();
Expand All @@ -285,11 +332,12 @@ async fn main() {
readiness_check_path.as_str(), // absolute path so format not used
get(|| async { StatusCode::OK }),
)
.route_service(
.route(
format!("/{}", SUBSCRIPTION_RESOURCE).as_str(),
GraphQLSubscription::new(schema),
get(graphql_subscription),
)
.layer(CorsLayer::permissive());
.layer(CorsLayer::permissive())
.with_state(schema);

let hostname_or_ip = env::var("HOSTNAME_OR_IP").unwrap_or("0.0.0.0".to_string());

Expand Down

0 comments on commit 01f5d60

Please sign in to comment.