Skip to content

Commit

Permalink
Merge pull request #1975 from fermyon/http-fix
Browse files Browse the repository at this point in the history
Fix outbound http in the Rust SDK (take 2)
  • Loading branch information
rylev authored Oct 28, 2023
2 parents 8bafcb2 + 2ac71e4 commit 09c7e5c
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use spin_sdk::{
/// Send an HTTP request and return the response.
#[http_component]
async fn send_outbound(_req: Request) -> Result<impl IntoResponse> {
let mut res: http::Response<()> = spin_sdk::http::send(
let mut res: http::Response<String> = spin_sdk::http::send(
http::Request::builder()
.method("GET")
.uri("/hello")
Expand Down
2 changes: 1 addition & 1 deletion examples/http-rust-outbound-http/outbound-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use spin_sdk::{
/// Send an HTTP request and return the response.
#[http_component]
async fn send_outbound(_req: Request) -> Result<impl IntoResponse> {
let mut res: http::Response<()> = spin_sdk::http::send(
let mut res: http::Response<String> = spin_sdk::http::send(
http::Request::builder()
.method("GET")
.uri("https://random-data-api.fermyon.app/animals/json")
Expand Down
2 changes: 1 addition & 1 deletion sdk/rust/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ server, modifies the result, then returns it:
```rust
#[http_component]
async fn hello_world(_req: Request) -> Result<Response> {
let mut res: http::Response<()> = spin_sdk::http::send(
let mut res: http::Response<String> = spin_sdk::http::send(
http::Request::builder()
.method("GET")
.uri("https://fermyon.com")
Expand Down
98 changes: 80 additions & 18 deletions sdk/rust/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ use std::collections::HashMap;

#[doc(inline)]
pub use conversions::IntoResponse;

use self::conversions::TryFromIncomingResponse;

use super::wit::wasi::http::types;
#[doc(inline)]
pub use types::{
Error, Fields, Headers, IncomingRequest, IncomingResponse, Method, OutgoingBody,
OutgoingRequest, OutgoingResponse, Scheme, StatusCode, Trailers,
};

use self::conversions::{TryFromIncomingResponse, TryIntoOutgoingRequest};
use super::wit::wasi::http::types;
use crate::wit::wasi::io::streams;
use futures::SinkExt;

/// A unified request object that can represent both incoming and outgoing requests.
///
/// This should be used in favor of `IncomingRequest` and `OutgoingRequest` when there
Expand Down Expand Up @@ -107,6 +108,34 @@ impl Request {
uri,
)
}

/// Whether the request is an HTTPS request
fn is_https(&self) -> bool {
self.uri
.0
.as_ref()
.and_then(|u| u.scheme())
.map(|s| s == &hyperium::uri::Scheme::HTTPS)
.unwrap_or(true)
}

/// The URI's authority
fn authority(&self) -> Option<&str> {
self.uri
.0
.as_ref()
.and_then(|u| u.authority())
.map(|a| a.as_str())
}

/// The request path and query combined
pub fn path_and_query(&self) -> Option<&str> {
self.uri
.0
.as_ref()
.and_then(|u| u.path_and_query())
.map(|s| s.as_str())
}
}

/// A request builder
Expand Down Expand Up @@ -410,12 +439,12 @@ impl IncomingRequest {
/// # Panics
///
/// Panics if the body was already consumed.
pub fn into_body_stream(self) -> impl futures::Stream<Item = anyhow::Result<Vec<u8>>> {
pub fn into_body_stream(self) -> impl futures::Stream<Item = Result<Vec<u8>, streams::Error>> {
executor::incoming_body(self.consume().expect("request body was already consumed"))
}

/// Return a `Vec<u8>` of the body or fails
pub async fn into_body(self) -> anyhow::Result<Vec<u8>> {
pub async fn into_body(self) -> Result<Vec<u8>, streams::Error> {
use futures::TryStreamExt;
let mut stream = self.into_body_stream();
let mut body = Vec::new();
Expand All @@ -432,12 +461,12 @@ impl IncomingResponse {
/// # Panics
///
/// Panics if the body was already consumed.
pub fn into_body_stream(self) -> impl futures::Stream<Item = anyhow::Result<Vec<u8>>> {
pub fn into_body_stream(self) -> impl futures::Stream<Item = Result<Vec<u8>, streams::Error>> {
executor::incoming_body(self.consume().expect("response body was already consumed"))
}

/// Return a `Vec<u8>` of the body or fails
pub async fn into_body(self) -> anyhow::Result<Vec<u8>> {
pub async fn into_body(self) -> Result<Vec<u8>, streams::Error> {
use futures::TryStreamExt;
let mut stream = self.into_body_stream();
let mut body = Vec::new();
Expand All @@ -454,11 +483,22 @@ impl OutgoingResponse {
/// # Panics
///
/// Panics if the body was already taken.
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = anyhow::Error> {
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = Error> {
executor::outgoing_body(self.write().expect("response body was already taken"))
}
}

impl OutgoingRequest {
/// Construct a `Sink` which writes chunks to the body of the specified response.
///
/// # Panics
///
/// Panics if the body was already taken.
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = Error> {
executor::outgoing_body(self.write().expect("request body was already taken"))
}
}

/// The out param for setting an `OutgoingResponse`
pub struct ResponseOutparam(types::ResponseOutparam);

Expand All @@ -482,8 +522,7 @@ impl ResponseOutparam {
self,
response: OutgoingResponse,
buffer: Vec<u8>,
) -> anyhow::Result<()> {
use futures::SinkExt;
) -> Result<(), Error> {
let mut body = response.take_body();
self.set(response);
body.send(buffer).await
Expand All @@ -496,20 +535,35 @@ impl ResponseOutparam {
}

/// Send an outgoing request
///
/// If `request`` is an `OutgoingRequest` and you are streaming the body to the
/// outgoing request body sink, you need to ensure it is dropped before awaiting this function.
pub async fn send<I, O>(request: I) -> Result<O, SendError>
where
I: TryInto<OutgoingRequest>,
I: TryIntoOutgoingRequest,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
O: TryFromIncomingResponse,
O::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
let response = executor::outgoing_request_send(
request
.try_into()
.map_err(|e| SendError::RequestConversion(e.into()))?,
)
.await
let (request, body_buffer) = I::try_into_outgoing_request(request)
.map_err(|e| SendError::RequestConversion(e.into()))?;
let response = if let Some(body_buffer) = body_buffer {
// It is part of the contract of the trait that implementors of `TryIntoOutgoingRequest`
// do not call `OutgoingRequest::write`` if they return a buffered body.
let mut body_sink = request.take_body();
let response = executor::outgoing_request_send(request);
body_sink
.send(body_buffer)
.await
.map_err(|e| SendError::Http(Error::UnexpectedError(e.to_string())))?;
// The body sink needs to be dropped before we await the response, otherwise we deadlock
drop(body_sink);
response.await
} else {
executor::outgoing_request_send(request).await
}
.map_err(SendError::Http)?;

TryFromIncomingResponse::try_from_incoming_response(response)
.await
.map_err(|e: O::Error| SendError::ResponseConversion(e.into()))
Expand Down Expand Up @@ -614,6 +668,14 @@ pub mod responses {
}
}

impl std::fmt::Display for streams::Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.to_debug_string())
}
}

impl std::error::Error for streams::Error {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
96 changes: 78 additions & 18 deletions sdk/rust/src/http/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::collections::HashMap;

use async_trait::async_trait;

use crate::wit::wasi::io::streams;

use super::{Headers, IncomingRequest, IncomingResponse, OutgoingRequest, OutgoingResponse};

use super::{responses, NonUtf8BodyError, Request, Response};
Expand Down Expand Up @@ -61,12 +63,12 @@ impl TryFromIncomingRequest for Request {
.method(request.method())
.uri(request.uri())
.headers(request.headers())
.body(
request
.into_body()
.await
.map_err(IncomingRequestError::BodyConversionError)?,
)
.body(request.into_body().await.map_err(|e| {
IncomingRequestError::BodyConversionError(anyhow::anyhow!(
"{}",
e.to_debug_string()
))
})?)
.build())
}
}
Expand Down Expand Up @@ -482,15 +484,59 @@ where
}
}

/// A trait for converting a type into an `OutgoingRequest`
pub trait TryIntoOutgoingRequest {
/// The error if the conversion fails
type Error;

/// Turn the type into an `OutgoingRequest`
///
/// If the implementor can be sure that the `OutgoingRequest::write` has not been called they
/// can return a buffer as the second element of the returned tuple and `send` will send
/// that as the request body.
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error>;
}

impl TryIntoOutgoingRequest for OutgoingRequest {
type Error = std::convert::Infallible;

fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
Ok((self, None))
}
}

impl TryIntoOutgoingRequest for Request {
type Error = std::convert::Infallible;

fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
let headers = self
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_owned()))
.collect::<Vec<_>>();
let request = OutgoingRequest::new(
self.method(),
self.path_and_query(),
Some(if self.is_https() {
&super::Scheme::Https
} else {
&super::Scheme::Http
}),
self.authority(),
&Headers::new(&headers),
);
Ok((request, Some(self.into_body())))
}
}

#[cfg(feature = "http")]
impl<B> TryFrom<hyperium::Request<B>> for OutgoingRequest
impl<B> TryIntoOutgoingRequest for hyperium::Request<B>
where
B: TryIntoBody,
B::Error: std::error::Error + Send + Sync + 'static,
{
type Error = anyhow::Error;
fn try_from(req: hyperium::Request<B>) -> Result<Self, Self::Error> {
let method = match req.method() {
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
let method = match self.method() {
&hyperium::Method::GET => super::Method::Get,
&hyperium::Method::POST => super::Method::Post,
&hyperium::Method::PUT => super::Method::Put,
Expand All @@ -500,34 +546,36 @@ where
&hyperium::Method::OPTIONS => super::Method::Options,
m => anyhow::bail!("Unsupported method: {m}"),
};
let headers = req
let headers = self
.headers()
.into_iter()
.map(|(n, v)| (n.as_str().to_owned(), v.as_bytes().to_owned()))
.collect::<Vec<_>>();
Ok(OutgoingRequest::new(
let request = OutgoingRequest::new(
&method,
req.uri().path_and_query().map(|p| p.as_str()),
req.uri()
self.uri().path_and_query().map(|p| p.as_str()),
self.uri()
.scheme()
.map(|s| match s.as_str() {
"http" => super::Scheme::Http,
"https" => super::Scheme::Https,
s => super::Scheme::Other(s.to_owned()),
})
.as_ref(),
req.uri().authority().map(|a| a.as_str()),
self.uri().authority().map(|a| a.as_str()),
&Headers::new(&headers),
))
);
let buffer = TryIntoBody::try_into_body(self.into_body())?;
Ok((request, Some(buffer)))
}
}

/// A trait for converting from an `IncomingRequest`
#[async_trait]
/// TODO
pub trait TryFromIncomingResponse {
/// TODO
/// The error if conversion fails
type Error;
/// TODO
/// Turn the `IncomingResponse` into the type
async fn try_from_incoming_response(resp: IncomingResponse) -> Result<Self, Self::Error>
where
Self: Sized;
Expand All @@ -541,6 +589,18 @@ impl TryFromIncomingResponse for IncomingResponse {
}
}

#[async_trait]
impl TryFromIncomingResponse for Response {
type Error = streams::Error;
async fn try_from_incoming_response(resp: IncomingResponse) -> Result<Self, Self::Error> {
Ok(Response::builder()
.status(resp.status())
.headers(resp.headers())
.body(resp.into_body().await?)
.build())
}
}

#[cfg(feature = "http")]
#[async_trait]
impl<B: TryFromBody> TryFromIncomingResponse for hyperium::Response<B> {
Expand Down
Loading

0 comments on commit 09c7e5c

Please sign in to comment.