Skip to content

Commit

Permalink
Align ClickhouseBatchWriter error type
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Oct 25, 2024
1 parent 1576514 commit 85058f0
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/reducers/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{ReduceConfig, ReduceShutdownBehaviour, Reducer};
use anyhow::{anyhow, Ok};
use reqwest::{Client, Error, Response};
use anyhow::{anyhow, Error, Ok};
use reqwest::{Client, Response};
use std::{collections::HashMap, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -22,11 +22,12 @@ impl WriteHandle {
cur_size: 0,
write_stream: sender,
response_handle: tokio::spawn(async move {
client
let res = client
.post(url)
.body(reqwest::Body::wrap_stream(ReceiverStream::new(receiver)))
.send()
.await
.await?;
Ok(res)
}),
}
}
Expand All @@ -38,7 +39,7 @@ impl WriteHandle {
.map_err(|err| anyhow!("Unable to write to socket, got SendError: {:?}", err))
.expect("We are always sending Ok during write");
self.cur_size += 1;
Result::<_, _>::Ok(())
Ok(())
}

fn flush(self) -> JoinHandle<Result<Response, Error>> {
Expand Down

0 comments on commit 85058f0

Please sign in to comment.