Skip to content

Commit

Permalink
wip: cont on http interface
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Jun 3, 2024
1 parent d895ab0 commit 09b6dee
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;

// needed to convert async-std Async to a tokio Async
use tokio_stream::StreamExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use tokio_util::io::ReaderStream;

use http_body_util::{combinators::BoxBody, BodyExt, Full};
use http_body_util::StreamBody;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
Expand Down Expand Up @@ -40,7 +44,7 @@ pub struct Request {
}

#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct Response {
pub struct ResponseMeta {
pub request_id: Scru128Id,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<u16>,
Expand Down Expand Up @@ -110,20 +114,23 @@ async fn handle(
)
.await;

async fn wait_for_response(store: &Store, frame_id: Scru128Id) -> Result<Response, &str> {
async fn wait_for_response(
store: &Store,
frame_id: Scru128Id,
) -> Result<(Option<ssri::Integrity>, ResponseMeta), &str> {
let mut recver = store
.read(ReadOptions {
follow: true,
last_id: Some(frame_id),
})
.await;

while let Some(event_frame) = recver.recv().await {
if event_frame.topic == "http.response" {
if let Some(meta) = event_frame.meta {
if let Ok(res) = serde_json::from_value::<Response>(meta) {
while let Some(frame) = recver.recv().await {
if frame.topic == "http.response" {
if let Some(meta) = frame.meta {
if let Ok(res) = serde_json::from_value::<ResponseMeta>(meta) {
if res.request_id == frame_id {
return Ok(res);
return Ok((frame.hash, res));
}
}
}
Expand All @@ -133,9 +140,8 @@ async fn handle(
Err("event stream ended")
}

let meta = wait_for_response(&store, frame.id).await.unwrap();

eprintln!("RESPONSE {:?}", meta);
let (hash, meta) = wait_for_response(&store, frame.id).await.unwrap();
let hash = hash.unwrap();

let res = hyper::Response::builder();
let mut res = res.status(meta.status.unwrap_or(200));
Expand All @@ -155,7 +161,19 @@ async fn handle(
}
}

Ok(res.body(full(serde_json::to_string(&frame).unwrap()))?)
let reader = store.cas_reader(hash).await?;
// convert reader from async-std -> tokio
let reader = reader.compat();
let stream = ReaderStream::new(reader);

let stream = stream.map(|frame| {
let frame = frame.unwrap();
Ok(hyper::body::Frame::data(frame))
});

let body = StreamBody::new(stream).boxed();

Ok(res.body(body)?)
}

pub async fn serve(
Expand Down Expand Up @@ -191,9 +209,3 @@ pub async fn serve(
});
}
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

0 comments on commit 09b6dee

Please sign in to comment.