Skip to content

Commit

Permalink
Stream request body
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Oct 25, 2024
1 parent 9faa449 commit 1576514
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 20 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ chrono = { version = "0.4.26", features = ["serde"] }
anyhow = "1.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
reqwest = { version = "0.12.8", features = ["json"] }
reqwest = { version = "0.12.8", features = ["json", "stream"] }
serde = "1.0.210"
serde_json = "1.0.128"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<(), Error> {
host,
port,
table,
32_768,
128,
Duration::from_secs(4),
ReduceShutdownBehaviour::Flush,
),
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn main() -> Result<(), Error> {
host,
port,
table,
32_768,
128,
Duration::from_secs(4),
ReduceShutdownBehaviour::Flush,
),
Expand Down
83 changes: 66 additions & 17 deletions src/reducers/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
use crate::{ReduceConfig, ReduceShutdownBehaviour, Reducer};
use anyhow::{anyhow, Ok};
use reqwest::Client;
use std::{collections::HashMap, mem::replace, time::Duration};
use reqwest::{Client, Error, Response};
use std::{collections::HashMap, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::ReceiverStream;
use tracing::info;

struct WriteHandle {
max_size: usize,
cur_size: usize,
write_stream: mpsc::Sender<Result<Vec<u8>, ::std::io::Error>>,
response_handle: JoinHandle<Result<Response, Error>>,
}

impl WriteHandle {
fn new(client: Client, url: String, max_size: usize) -> Self {
let (sender, receiver) = mpsc::channel(max_size);

Self {
max_size,
cur_size: 0,
write_stream: sender,
response_handle: tokio::spawn(async move {
client
.post(url)
.body(reqwest::Body::wrap_stream(ReceiverStream::new(receiver)))
.send()
.await
}),
}
}

async fn write(&mut self, data: Vec<u8>) -> Result<(), Error> {
self.write_stream
.send(Result::<_, _>::Ok(data))
.await
.map_err(|err| anyhow!("Unable to write to socket, got SendError: {:?}", err))
.expect("We are always sending Ok during write");
self.cur_size += 1;
Result::<_, _>::Ok(())
}

fn flush(self) -> JoinHandle<Result<Response, Error>> {
self.response_handle
}

fn is_full(&self) -> bool {
self.cur_size >= self.max_size
}
}

pub struct ClickhouseBatchWriter {
buffer: Vec<u8>,
write_handle: Option<WriteHandle>,
http_client: Client,
max_buf_size: usize,
url: String,
Expand All @@ -22,7 +68,7 @@ impl ClickhouseBatchWriter {
shutdown_behaviour: ReduceShutdownBehaviour,
) -> Self {
Self {
buffer: Vec::with_capacity(max_buf_size),
write_handle: None,
http_client: Client::new(),
max_buf_size,
url: format!(
Expand All @@ -41,23 +87,24 @@ impl Reducer for ClickhouseBatchWriter {
type Item = Vec<u8>;

async fn reduce(&mut self, t: Self::Item) -> Result<(), anyhow::Error> {
self.buffer.extend(&t);
self.write_handle
.get_or_insert_with(|| {
WriteHandle::new(
self.http_client.clone(),
self.url.clone(),
self.max_buf_size,
)
})
.write(t)
.await?;
Ok(())
}

async fn flush(&mut self) -> Result<(), anyhow::Error> {
if self.buffer.is_empty() {
if self.write_handle.is_none() {
return Ok(());
}
let res = self
.http_client
.post(self.url.clone())
.body(replace(
&mut self.buffer,
Vec::with_capacity(self.max_buf_size),
))
.send()
.await?;
let res = self.write_handle.take().unwrap().flush().await??;

if res.status().as_str() == "200" {
info!(
Expand All @@ -78,11 +125,13 @@ impl Reducer for ClickhouseBatchWriter {
}

fn reset(&mut self) {
self.buffer.clear();
self.write_handle.take();
}

fn is_full(&self) -> bool {
self.buffer.len() >= self.max_buf_size
self.write_handle
.as_ref()
.map_or(false, WriteHandle::is_full)
}

fn get_reduce_config(&self) -> ReduceConfig {
Expand Down

0 comments on commit 1576514

Please sign in to comment.