Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(delete,update,insert): Support build Insert with schema strings. #80

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
27 changes: 27 additions & 0 deletions src/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{error::Result, query::Query, sql::Bind, Client};

#[must_use]
#[derive(Clone)]
pub struct Delete {
query: Query,
}

impl Delete {
pub(crate) fn new(client: &Client, table_name: &str, pk_names: Vec<String>) -> Self {
let mut out = format!("ALTER TABLE `{table_name}` DELETE WHERE");
for (idx, pk_name) in pk_names.iter().enumerate() {
if idx > 0 {
out.push_str(" AND ");
}
out.push_str(&format!(" `{pk_name}` = ? "));
}
let query = Query::new(client, &out);
Self { query }
}
pub async fn delete(mut self, delete_pk: Vec<impl Bind>) -> Result<()> {
for i in delete_pk {
self.query.bind_ref(i);
}
self.query.execute().await
}
}
19 changes: 15 additions & 4 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,24 @@ macro_rules! timeout {
}

impl<T> Insert<T> {
pub(crate) fn new_with_field_names(
client: &Client,
table: &str,
fields_names: Vec<String>,
) -> Result<Self> {
Insert::new_inner(client, table, fields_names.join(","))
}

pub(crate) fn new(client: &Client, table: &str) -> Result<Self>
where
T: Row,
{
let fields_names = row::join_column_names::<T>()
.expect("the row type must be a struct or a wrapper around it");
Insert::new_inner(client, table, fields_names)
}

pub(crate) fn new_inner(client: &Client, table: &str, fields_names: String) -> Result<Self> {
let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
let mut pairs = url.query_pairs_mut();
pairs.clear();
Expand All @@ -66,12 +80,9 @@ impl<T> Insert<T> {
pairs.append_pair("database", database);
}

let fields = row::join_column_names::<T>()
.expect("the row type must be a struct or a wrapper around it");

// TODO: what about escaping a table name?
// https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers
let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary");
let query = format!("INSERT INTO {table}({fields_names}) FORMAT RowBinary");
pairs.append_pair("query", &query);

if client.compression.is_lz4() {
Expand Down
28 changes: 27 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use clickhouse_derive::Row;
pub use self::{compression::Compression, row::Row};
use self::{error::Result, http_client::HttpClient};

pub mod delete;
pub mod error;
pub mod insert;
pub mod inserter;
Expand All @@ -25,6 +26,7 @@ pub mod serde;
pub mod sql;
#[cfg(feature = "test-util")]
pub mod test;
pub mod update;
#[cfg(feature = "watch")]
pub mod watch;

Expand Down Expand Up @@ -73,7 +75,10 @@ impl Default for Client {
connector.set_keepalive(Some(TCP_KEEPALIVE));

#[cfg(feature = "tls")]
let connector = HttpsConnector::new_with_connector(connector);
let connector = HttpsConnector::new_with_connector({
connector.enforce_http(false);
connector
});

let client = hyper::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
Expand Down Expand Up @@ -180,6 +185,14 @@ impl Client {
insert::Insert::new(self, table)
}

pub fn insert_with_fields_name<T: Row>(
&self,
table: &str,
fields_names: Vec<String>,
) -> Result<insert::Insert<T>> {
insert::Insert::new_with_field_names(self, table, fields_names)
}

/// Creates an inserter to perform multiple INSERTs.
pub fn inserter<T: Row>(&self, table: &str) -> Result<inserter::Inserter<T>> {
inserter::Inserter::new(self, table)
Expand All @@ -190,6 +203,19 @@ impl Client {
query::Query::new(self, query)
}

pub fn delete(&self, table_name: &str, pk_names: Vec<String>) -> delete::Delete {
delete::Delete::new(self, table_name, pk_names)
}

pub fn update(
&self,
table_name: &str,
pk_names: Vec<String>,
flieds_names: Vec<String>,
) -> update::Update {
update::Update::new(self, table_name, pk_names, flieds_names)
}

/// Starts a new WATCH query.
#[cfg(feature = "watch")]
pub fn watch(&self, query: &str) -> watch::Watch {
Expand Down
4 changes: 4 additions & 0 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl Query {
self
}

pub fn bind_ref(&mut self, value: impl Bind) {
self.sql.bind_arg(value);
}

/// Executes the query.
pub async fn execute(self) -> Result<()> {
self.do_execute(false)?.finish().await
Expand Down
8 changes: 6 additions & 2 deletions src/sql/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl<'a, W: Write> Serializer for SqlSerializer<'a, W> {
unsupported!(
serialize_map(Option<usize>) -> Result<Impossible>,
serialize_bytes(&[u8]),
serialize_none,
serialize_unit,
serialize_unit_struct(&'static str),
);
Expand Down Expand Up @@ -144,6 +143,12 @@ impl<'a, W: Write> Serializer for SqlSerializer<'a, W> {
Err(SqlSerializerError::Unsupported("serialize_some"))
}

#[inline]
fn serialize_none(self) -> Result<()> {
self.writer.write_fmt(format_args!("null"))?;
Ok(())
}

#[inline]
fn serialize_unit_variant(
self,
Expand Down Expand Up @@ -340,7 +345,6 @@ mod tests {
fn it_fails_on_unsupported() {
let mut out = String::new();
assert!(write_arg(&mut out, &std::collections::HashMap::<u32, u32>::new()).is_err());
assert!(write_arg(&mut out, &None::<u32>).is_err());
assert!(write_arg(&mut out, &Some(42)).is_err());
assert!(write_arg(&mut out, &()).is_err());

Expand Down
45 changes: 45 additions & 0 deletions src/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::{error::Result, query::Query, sql::Bind, Client};

#[must_use]
#[derive(Clone)]
pub struct Update {
query: Query,
}

impl Update {
pub(crate) fn new(
client: &Client,
table_name: &str,
pk_names: Vec<String>,
flieds_names: Vec<String>,
) -> Self {
let mut out: String = flieds_names.iter().enumerate().fold(
format!("ALTER TABLE `{table_name}` UPDATE"),
|mut res, (idx, key)| {
if idx > 0 {
res.push(',');
}
res.push_str(&format!(" `{key}` = ?"));
res
},
);
out.push_str(&format!(" where "));
for (idx, pk_name) in pk_names.iter().enumerate() {
if idx > 0 {
out.push_str(" AND ");
}
out.push_str(&format!(" `{pk_name}` = ? "));
}
let query = Query::new(client, &out);
Self { query }
}
pub async fn update_fields(mut self, fields: Vec<impl Bind>, pk: Vec<impl Bind>) -> Result<()> {
fields.into_iter().for_each(|a| {
self.query.bind_ref(a);
});
for i in pk {
self.query.bind_ref(i);
}
self.query.execute().await
}
}
93 changes: 93 additions & 0 deletions tests/test_delete_and_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use core::time::Duration;
use std::thread::sleep;

use serde::{Deserialize, Serialize};

use clickhouse::Row;

mod common;

#[common::named]
#[tokio::test]
async fn test_update_delete() {
let client = common::prepare_database!();

#[derive(Debug, Row, Serialize, Deserialize)]
struct MyRow<'a> {
no: u32,
no2: u32,
name: &'a str,
list: Vec<i32>,
}

// Create a table.
client
.query(
"
CREATE TABLE test(no UInt32, no2 UInt32, name LowCardinality(String) , list Array(UInt32))
ENGINE = MergeTree
PRIMARY KEY (no,no2)
",
)
.execute()
.await
.unwrap();

// Write to the table.
let mut insert = client.insert("test").unwrap();
for i in 0..5 {
insert
.write(&MyRow {
no: i,
no2: 1,
name: "foo",
list: vec![1, 2, 3],
})
.await
.unwrap();
}
for i in 0..5 {
insert
.write(&MyRow {
no: i,
no2: 2,
name: "foo",
list: vec![1, 2, 3],
})
.await
.unwrap();
}

insert.end().await.unwrap();

let delete = client.delete("test", vec!["no".to_string(), "no2".to_string()]);
delete.delete(vec![1, 1]).await.unwrap();
sleep(Duration::from_secs(1));

let update = client.update(
"test",
vec!["no".to_string(), "no2".to_string()],
vec![format!("name"), format!("list")],
);
let vec = vec!["name1", "[2,5,8]"];
update.update_fields(vec, vec![2, 2]).await.unwrap();

sleep(Duration::from_secs(2));

let mut cursor = client
.query("SELECT ?fields FROM test")
.fetch::<MyRow<'_>>()
.unwrap();

while let Some(row) = cursor.next().await.unwrap() {
if row.no == 1 {
assert_ne!(row.no2, 1);
} else if row.no == 2 && row.no2 == 2 {
assert_eq!(row.name, "name1");
assert_eq!(row.list, vec![2, 5, 8]);
} else {
assert_eq!(row.name, "foo");
assert_eq!(row.list, vec![1, 2, 3]);
}
}
}