Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add push_value method to Block #154

Open
wants to merge 3 commits into
base: async-await
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 19 additions & 38 deletions src/types/block/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::{borrow::Cow, marker};
use chrono_tz::Tz;

use crate::{
Block,
errors::{Error, FromSqlError, Result},
types::{
block::ColumnIdx, ColumnType,
column::{ArcColumnWrapper, ColumnData, Either}, Column, Value,
block::ColumnIdx,
column::{ArcColumnWrapper, ColumnData, Either},
Column, ColumnType, Value,
},
Block,
};

pub trait RowBuilder {
Expand Down Expand Up @@ -62,53 +63,30 @@ where
{
#[inline(always)]
fn apply<K: ColumnType>(self, block: &mut Block<K>) -> Result<()> {
put_param(self.key, self.value, block)?;
block.push_value(&self.key, self.value)?;
self.tail.apply(block)
}
}

impl RowBuilder for Vec<(String, Value)> {
impl RowBuilder for Vec<(&'static str, Value)> {
fn apply<K: ColumnType>(self, block: &mut Block<K>) -> Result<()> {
for (k, v) in self {
put_param(k.into(), v, block)?;
block.push_value(k, v)?;
}
Ok(())
}
}

fn put_param<K: ColumnType>(
key: Cow<'static, str>,
value: Value,
block: &mut Block<K>,
) -> Result<()> {
let col_index = match key.as_ref().get_index(&block.columns) {
Ok(col_index) => col_index,
Err(Error::FromSql(FromSqlError::OutOfRange)) => {
if block.row_count() <= 1 {
let sql_type = From::from(value.clone());

let timezone = extract_timezone(&value);

let column = Column {
name: key.clone().into(),
data: <dyn ColumnData>::from_type::<ArcColumnWrapper>(sql_type, timezone, block.capacity)?,
_marker: marker::PhantomData,
};

block.columns.push(column);
return put_param(key, value, block);
} else {
return Err(Error::FromSql(FromSqlError::OutOfRange));
}
impl RowBuilder for Vec<(String, Value)> {
fn apply<K: ColumnType>(self, block: &mut Block<K>) -> Result<()> {
for (k, v) in self {
block.push_value(&k, v)?;
}
Err(err) => return Err(err),
};

block.columns[col_index].push(value);
Ok(())
Ok(())
}
}

fn extract_timezone(value: &Value) -> Tz {
pub fn extract_timezone(value: &Value) -> Tz {
match value {
Value::Date(_, tz) => *tz,
Value::DateTime(_, tz) => *tz,
Expand All @@ -131,7 +109,7 @@ mod test {

use crate::{
row,
types::{Decimal, SqlType, Simple, DateTimeType},
types::{DateTimeType, Decimal, Simple, SqlType},
};

use super::*;
Expand Down Expand Up @@ -196,7 +174,10 @@ mod test {
);

assert_eq!(block.columns[13].sql_type(), SqlType::Date);
assert_eq!(block.columns[14].sql_type(), SqlType::DateTime(DateTimeType::Chrono));
assert_eq!(
block.columns[14].sql_type(),
SqlType::DateTime(DateTimeType::Chrono)
);
assert_eq!(block.columns[15].sql_type(), SqlType::Decimal(18, 4));
}
}
91 changes: 85 additions & 6 deletions src/types/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{
borrow::Cow,
cmp,
default::Default,
fmt,
io::{Cursor, Read},
os::raw::c_char,
marker::PhantomData,
os::raw::c_char,
};

use byteorder::{LittleEndian, WriteBytesExt};
Expand All @@ -16,16 +17,16 @@ use crate::{
binary::{protocol, Encoder, ReadEx},
errors::{Error, FromSqlError, Result},
types::{
column::{self, ArcColumnWrapper, Column, ColumnFrom},
FromSql, SqlType, ColumnType, Simple, Complex
column::{self, ArcColumnWrapper, Column, ColumnData, ColumnFrom},
ColumnType, Complex, FromSql, Simple, SqlType, Value,
},
};

use self::chunk_iterator::ChunkIterator;
pub(crate) use self::row::BlockRef;
pub use self::{
block_info::BlockInfo,
builder::{RCons, RNil, RowBuilder},
builder::{extract_timezone, RCons, RNil, RowBuilder},
row::{Row, Rows},
};

Expand Down Expand Up @@ -148,7 +149,7 @@ impl Block {
Self {
info: Default::default(),
columns: vec![],
capacity
capacity,
}
}

Expand Down Expand Up @@ -271,10 +272,88 @@ impl<K: ColumnType> Block<K> {
let column = &self.columns[column_index];
Ok(column)
}

/// Adds a value to a block column
///
/// # Arguments
/// * `key` - A string representing the column name
/// * `value` - The value to add to the end of the column
///
/// This method should be cautiously used outside of RowBuilder implementations.
/// Clickhouse requires that every column has the same number of items
///
///
/// This can be used to add the RowBuilder trait to custom-defined structs
/// ```
/// use clickhouse_rs::errors;
/// use clickhouse_rs::types;
///
/// struct Person {
/// name: String,
/// age: i32
/// }
///
/// impl types::RowBuilder for Person {
/// fn apply<K>(
/// self,
/// block: &mut types::Block<K>,
/// ) -> Result<(), errors::Error>
/// where
/// K: types::ColumnType,
/// {
/// block.push_value("name", self.name.into())?;
/// block.push_value("age", self.age.into())?;
/// Ok(())
/// }
///}
///
/// # fn main(){
/// let mut block = types::Block::new();
/// let person = Person {
/// name: "Bob".to_string(),
/// age: 42,
/// };
///
/// block.push(person).unwrap();
/// assert_eq!(block.row_count(), 1);
/// # }
/// ```
/// Note this requires the types to have either Into or From for Value implemented
pub fn push_value(&mut self, key: &str, value: Value) -> Result<()> {
let col_index = match key.get_index(&self.columns) {
Ok(col_index) => col_index,
Err(Error::FromSql(FromSqlError::OutOfRange)) => {
if self.row_count() <= 1 {
let sql_type = From::from(value.clone());

let timezone = extract_timezone(&value);

let column = Column {
name: key.clone().into(),
data: <dyn ColumnData>::from_type::<ArcColumnWrapper>(
sql_type,
timezone,
self.capacity,
)?,
_marker: PhantomData,
};

self.columns.push(column);
return self.push_value(key, value);
} else {
return Err(Error::FromSql(FromSqlError::OutOfRange));
}
}
Err(err) => return Err(err),
};

self.columns[col_index].push(value);
Ok(())
}
}

impl Block<Simple> {
pub(crate) fn concat(blocks: &[Self]) -> Block<Complex> {
pub fn concat(blocks: &[Self]) -> Block<Complex> {
let first = blocks.first().expect("blocks should not be empty.");

for block in blocks {
Expand Down