Skip to content

Commit

Permalink
chore: upgrade to hyper 1.2.0 (step 1) (#4157)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Mar 5, 2024
1 parent 240b958 commit 3be516b
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 19 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pest_derive = "2.0"
shellexpand = "2.1"
blake3 = "1.3"
async-trait = "0.1"
hyper = { version = "0.14", features = ["full"] }
hyper = { version = "0.14", features = ["full", "backports", "deprecated"] }
tokio = { version = "1.33", features = ["full"] }
tokio-stream = "0.1"
hyper-tls = "0.5"
Expand Down
4 changes: 3 additions & 1 deletion core/src/blocks/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::block::Env;
use crate::project::Project;
use anyhow::{anyhow, Result};
use hyper::body::HttpBody;
use hyper::header;
use hyper::{body::Buf, http::StatusCode, Body, Client, Method, Request};
use hyper_tls::HttpsConnector;
Expand Down Expand Up @@ -84,7 +85,8 @@ pub async fn get_data_source_project(
))?;
}

let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();

let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;

Expand Down
3 changes: 2 additions & 1 deletion core/src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::stores::store::Store;
use crate::utils;
use anyhow::{anyhow, Result};
use dns_lookup::lookup_host;
use hyper::body::HttpBody;
use hyper::header;
use hyper::{body::Buf, Body, Client, Method, Request};
use hyper_tls::HttpsConnector;
Expand Down Expand Up @@ -157,7 +158,7 @@ impl HttpRequest {
let status = res.status();
let headers = res.headers().clone();

let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.to_bytes();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;

Expand Down
3 changes: 2 additions & 1 deletion core/src/providers/ai21.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::run::Credentials;
use crate::utils;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::{body::Buf, Body, Client, Method, Request, Uri};
use hyper_tls::HttpsConnector;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -120,7 +121,7 @@ impl AI21LLM {

let res = cli.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down
3 changes: 2 additions & 1 deletion core/src/providers/anthropic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_trait::async_trait;
use eventsource_client as es;
use eventsource_client::Client as ESClient;
use futures::TryStreamExt;
use hyper::body::HttpBody;
use hyper::{body::Buf, Body, Client, Method, Request, Uri};
use hyper_tls::HttpsConnector;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -398,7 +399,7 @@ impl AnthropicLLM {

let status = res.status();

let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.to_bytes();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down
5 changes: 3 additions & 2 deletions core/src/providers/azure_openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::run::Credentials;
use crate::utils;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::header;
use hyper::{body::Buf, http::StatusCode, Body, Client, Method, Request, Uri};
use hyper_tls::HttpsConnector;
Expand Down Expand Up @@ -76,7 +77,7 @@ async fn get_deployments(endpoint: &str, api_key: &str) -> Result<Vec<AzureOpenA
))?;
}

let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down Expand Up @@ -125,7 +126,7 @@ async fn get_deployment(
))?;
}

let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down
9 changes: 5 additions & 4 deletions core/src/providers/cohere.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::run::Credentials;
use crate::utils;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::{body::Buf, Body, Client, Method, Request, Uri};
use hyper_tls::HttpsConnector;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -46,7 +47,7 @@ async fn api_encode(api_key: &str, text: &str) -> Result<Vec<usize>> {

let res = cli.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down Expand Up @@ -98,7 +99,7 @@ async fn api_decode(api_key: &str, tokens: Vec<usize>) -> Result<String> {

let res = cli.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down Expand Up @@ -225,7 +226,7 @@ impl CohereLLM {

let res = cli.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down Expand Up @@ -456,7 +457,7 @@ impl CohereEmbedder {

let res = cli.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();
let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
let c: &[u8] = &b;
Expand Down
9 changes: 5 additions & 4 deletions core/src/providers/mistral.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::llm::{ChatFunction, ChatMessage as BaseChatMessage};
use super::tiktoken::tiktoken::{decode_async, encode_async, tokenize_async};
use crate::providers::embedder::Embedder;
use crate::providers::llm::{ChatMessageRole, LLMChatGeneration, LLMGeneration, LLM};
use crate::providers::provider::{ModelError, ModelErrorRetryOptions, Provider, ProviderID};
Expand All @@ -9,6 +11,7 @@ use async_trait::async_trait;
use eventsource_client as es;
use eventsource_client::Client as ESClient;
use futures::TryStreamExt;
use hyper::body::HttpBody;
use hyper::{body::Buf, Body, Client, Method, Request, Uri};
use hyper_tls::HttpsConnector;
use parking_lot::{Mutex, RwLock};
Expand All @@ -22,9 +25,6 @@ use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::timeout;

use super::llm::{ChatFunction, ChatMessage as BaseChatMessage};
use super::tiktoken::tiktoken::{decode_async, encode_async, tokenize_async};

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum MistralAIChatMessageRole {
Expand Down Expand Up @@ -476,13 +476,14 @@ impl MistralAILLM {
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout sending request to Mistral AI after 180s"))?,
};
let body = match timeout(Duration::new(180, 0), hyper::body::aggregate(res)).await {
let collected = match timeout(Duration::new(180, 0), res.collect()).await {
Ok(Ok(body)) => body,
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!(
"Timeout reading response from Mistral AI after 180s"
))?,
};
let body = collected.aggregate();

let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
Expand Down
11 changes: 8 additions & 3 deletions core/src/providers/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use eventsource_client as es;
use eventsource_client::Client as ESClient;
use futures::TryStreamExt;
use hyper::body::HttpBody;
use hyper::{body::Buf, Body, Client, Method, Request, Uri};
use hyper_tls::HttpsConnector;
use itertools::izip;
Expand Down Expand Up @@ -488,11 +489,12 @@ pub async fn completion(
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout sending request to OpenAI after 180s"))?,
};
let body = match timeout(Duration::new(180, 0), hyper::body::aggregate(res)).await {
let collected = match timeout(Duration::new(180, 0), res.collect()).await {
Ok(Ok(body)) => body,
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout reading response from OpenAI after 180s"))?,
};
let body = collected.aggregate();

let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
Expand Down Expand Up @@ -974,11 +976,12 @@ pub async fn chat_completion(
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout sending request to OpenAI after 180s"))?,
};
let body = match timeout(Duration::new(180, 0), hyper::body::aggregate(res)).await {
let collected = match timeout(Duration::new(180, 0), res.collect()).await {
Ok(Ok(body)) => body,
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout reading response from OpenAI after 180s"))?,
};
let body = collected.aggregate();

let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
Expand Down Expand Up @@ -1064,11 +1067,13 @@ pub async fn embed(
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout sending request to OpenAI after 60s"))?,
};
let body = match timeout(Duration::new(60, 0), hyper::body::aggregate(res)).await {

let collected = match timeout(Duration::new(60, 0), res.collect()).await {
Ok(Ok(body)) => body,
Ok(Err(e)) => Err(e)?,
Err(_) => Err(anyhow!("Timeout reading response from OpenAI after 60s"))?,
};
let body = collected.aggregate();

let mut b: Vec<u8> = vec![];
body.reader().read_to_end(&mut b)?;
Expand Down
3 changes: 2 additions & 1 deletion core/src/sqlite_workers/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use hyper::body::HttpBody;
use hyper::{body::Bytes, Body, Client, Request};
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand Down Expand Up @@ -147,7 +148,7 @@ async fn get_response_body(req: hyper::Request<hyper::Body>) -> Result<Bytes, Sq
let res = Client::new().request(req).await?;

let status = res.status().as_u16();
let body = hyper::body::to_bytes(res.into_body()).await?;
let body = res.collect().await?.to_bytes();

match status {
200 => Ok(body),
Expand Down

0 comments on commit 3be516b

Please sign in to comment.