diff --git a/src/lib.rs b/src/lib.rs index 6b5bb09..c475868 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,14 +122,15 @@ use crate::{ pool::PoolBinding, retry_guard::retry_guard, types::{ - query_result::stream_blocks::BlockStream, Cmd, Context, IntoOptions, OptionsSource, Packet, - Query, QueryResult, SqlType, + block::{ChunkIterator, INSERT_BLOCK_SIZE}, + query_result::stream_blocks::BlockStream, + Cmd, Context, IntoOptions, OptionsSource, Packet, Query, QueryResult, SqlType, }, }; pub use crate::{ errors::ConnectionError, pool::Pool, - types::{block::Block, Options}, + types::{block::Block, Options, Simple}, }; mod binary; @@ -437,26 +438,16 @@ impl ClientHandle { Query: From, B: AsRef, { - let transport = self.insert_(table, block.as_ref()).await?; + let query = Self::make_query(table, block.as_ref())?; + let transport = self.insert_(query.clone(), block.as_ref()).await?; self.inner = Some(transport); Ok(()) } - async fn insert_(&mut self, table: Q, block: &Block) -> Result - where - Query: From, - { + async fn insert_(&mut self, query: Query, block: &Block) -> Result { let timeout = try_opt!(self.context.options.get()) .insert_timeout .unwrap_or_else(|| Duration::from_secs(0)); - let mut names: Vec<_> = Vec::with_capacity(block.column_count()); - for column in block.columns() { - names.push(try_opt!(column_name_to_string(column.name()))); - } - let fields = names.join(", "); - - let query = - Query::from(table).map_sql(|table| format!("INSERT INTO {table} ({fields}) VALUES")); let context = self.context.clone(); @@ -468,22 +459,15 @@ impl ClientHandle { async move { let transport = transport?.clear().await?; - let stream = transport.call(Cmd::SendQuery(query, context.clone())); - let (transport, b) = stream.read_block().await?; - let dst_block = b.unwrap(); - - let casted_block = match block.cast_to(&dst_block) { - Ok(value) => value, - Err(err) => return Err(err), - }; - - let send_cmd = Cmd::Union( - Box::new(Cmd::SendData(casted_block, context.clone())), - Box::new(Cmd::SendData(Block::default(), context.clone())), - ); - - let (transport, _) = transport.call(send_cmd).read_block().await?; - Ok(transport) + let (transport, dst_block) = + Self::send_insert_query_(transport, context.clone(), query.clone()) + .await?; + let casted_block = block.cast_to(&dst_block)?; + let mut chunks = casted_block.chunks(INSERT_BLOCK_SIZE); + let transport = + Self::insert_block_(transport, context.clone(), chunks.next().unwrap()) + .await?; + Self::insert_tail_(transport, context, query, chunks).await } }) .await @@ -493,6 +477,56 @@ impl ClientHandle { .await } + async fn insert_tail_( + mut transport: ClickhouseTransport, + context: Context, + query: Query, + chunks: ChunkIterator, + ) -> Result { + for chunk in chunks { + let (transport_, _) = + Self::send_insert_query_(transport, context.clone(), query.clone()).await?; + transport = Self::insert_block_(transport_, context.clone(), chunk).await?; + } + Ok(transport) + } + + async fn send_insert_query_( + transport: ClickhouseTransport, + context: Context, + query: Query, + ) -> Result<(ClickhouseTransport, Block)> { + let stream = transport.call(Cmd::SendQuery(query, context)); + let (transport, b) = stream.read_block().await?; + let dst_block = b.unwrap(); + Ok((transport, dst_block)) + } + + async fn insert_block_( + transport: ClickhouseTransport, + context: Context, + block: Block, + ) -> Result { + let send_cmd = Cmd::Union( + Box::new(Cmd::SendData(block, context.clone())), + Box::new(Cmd::SendData(Block::default(), context)), + ); + let (transport, _) = transport.call(send_cmd).read_block().await?; + Ok(transport) + } + + fn make_query(table: Q, block: &Block) -> Result + where + Query: From, + { + let mut names: Vec<_> = Vec::with_capacity(block.as_ref().column_count()); + for column in block.as_ref().columns() { + names.push(try_opt!(column_name_to_string(column.name()))); + } + let fields = names.join(", "); + Ok(Query::from(table).map_sql(|table| format!("INSERT INTO {table} ({fields}) VALUES"))) + } + pub(crate) async fn wrap_future(&mut self, f: F) -> Result where F: FnOnce(&mut Self) -> R + Send, diff --git a/src/types/block/chunk_iterator.rs b/src/types/block/chunk_iterator.rs index f3e755c..a225554 100644 --- a/src/types/block/chunk_iterator.rs +++ b/src/types/block/chunk_iterator.rs @@ -2,13 +2,13 @@ use std::cmp; use crate::types::{Block, ColumnType}; -pub struct ChunkIterator<'a, K: ColumnType> { +pub(crate) struct ChunkIterator { position: usize, size: usize, - block: &'a Block, + block: Block, } -impl<'a, K: ColumnType> Iterator for ChunkIterator<'a, K> { +impl Iterator for ChunkIterator { type Item = Block; fn next(&mut self) -> Option { @@ -37,8 +37,8 @@ impl<'a, K: ColumnType> Iterator for ChunkIterator<'a, K> { } } -impl<'a, K: ColumnType> ChunkIterator<'a, K> { - pub fn new(size: usize, block: &Block) -> ChunkIterator { +impl ChunkIterator { + pub fn new(size: usize, block: Block) -> ChunkIterator { ChunkIterator { position: 0, size, diff --git a/src/types/block/mod.rs b/src/types/block/mod.rs index c6e5385..4ab5446 100644 --- a/src/types/block/mod.rs +++ b/src/types/block/mod.rs @@ -21,13 +21,12 @@ use crate::{ }, }; -use self::chunk_iterator::ChunkIterator; -pub(crate) use self::row::BlockRef; pub use self::{ block_info::BlockInfo, builder::{RCons, RNil, RowBuilder}, row::{Row, Rows}, }; +pub(crate) use self::{chunk_iterator::ChunkIterator, row::BlockRef}; mod block_info; mod builder; @@ -35,7 +34,7 @@ mod chunk_iterator; mod compressed; mod row; -const INSERT_BLOCK_SIZE: usize = 1_048_576; +pub(crate) const INSERT_BLOCK_SIZE: usize = 1_048_576; const DEFAULT_CAPACITY: usize = 100; @@ -372,12 +371,10 @@ impl Block { pub(crate) fn send_data(&self, encoder: &mut Encoder, compress: bool) { encoder.uvarint(protocol::CLIENT_DATA); encoder.string(""); // temporary table - for chunk in self.chunks(INSERT_BLOCK_SIZE) { - chunk.write(encoder, compress); - } + self.write(encoder, compress); } - pub(crate) fn chunks(&self, n: usize) -> ChunkIterator { + pub(crate) fn chunks(self, n: usize) -> ChunkIterator { ChunkIterator::new(n, self) } } @@ -571,7 +568,7 @@ mod test { #[test] fn test_chunks_of_empty_block() { let block = Block::default(); - assert_eq!(1, block.chunks(100_500).count()); + assert_eq!(1, block.clone().chunks(100_500).count()); assert_eq!(Some(block.clone()), block.chunks(100_500).next()); } diff --git a/tests/clickhouse.rs b/tests/clickhouse.rs index 482a74c..c128837 100644 --- a/tests/clickhouse.rs +++ b/tests/clickhouse.rs @@ -2378,3 +2378,31 @@ async fn test_iter_int_128() -> Result<(), Error> { Ok(()) } + +#[cfg(feature = "tokio_io")] +#[tokio::test] +async fn test_insert_big_block() -> Result<(), Error> { + let ddl = r" + CREATE TABLE clickhouse_test_insert_big_block ( + int8 Int8 + ) Engine=Memory"; + let big_block_size = 1024*1024 + 1; + + let block = Block::new() + .column("int8", vec![-1_i8; big_block_size]); + + let expected = block.clone(); + let pool = Pool::new(database_url()); + let mut c = pool.get_handle().await?; + c.execute("DROP TABLE IF EXISTS clickhouse_test_insert_big_block") + .await?; + c.execute(ddl).await?; + c.insert("clickhouse_test_insert_big_block", block).await?; + let actual = c + .query("SELECT * FROM clickhouse_test_insert_big_block") + .fetch_all() + .await?; + + assert_eq!(format!("{:?}", expected.as_ref()), format!("{:?}", &actual)); + Ok(()) +}