diff --git a/src/delete.rs b/src/delete.rs new file mode 100644 index 0000000..7b9f8f2 --- /dev/null +++ b/src/delete.rs @@ -0,0 +1,27 @@ +use crate::{error::Result, query::Query, sql::Bind, Client}; + +#[must_use] +#[derive(Clone)] +pub struct Delete { + query: Query, +} + +impl Delete { + pub(crate) fn new(client: &Client, table_name: &str, pk_names: Vec) -> Self { + let mut out = format!("ALTER TABLE `{table_name}` DELETE WHERE"); + for (idx, pk_name) in pk_names.iter().enumerate() { + if idx > 0 { + out.push_str(" AND "); + } + out.push_str(&format!(" `{pk_name}` = ? ")); + } + let query = Query::new(client, &out); + Self { query } + } + pub async fn delete(mut self, delete_pk: Vec) -> Result<()> { + for i in delete_pk { + self.query.bind_ref(i); + } + self.query.execute().await + } +} diff --git a/src/insert.rs b/src/insert.rs index f9af43a..252a1f4 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -54,10 +54,24 @@ macro_rules! timeout { } impl Insert { + pub(crate) fn new_with_field_names( + client: &Client, + table: &str, + fields_names: Vec, + ) -> Result { + Insert::new_inner(client, table, fields_names.join(",")) + } + pub(crate) fn new(client: &Client, table: &str) -> Result where T: Row, { + let fields_names = row::join_column_names::() + .expect("the row type must be a struct or a wrapper around it"); + Insert::new_inner(client, table, fields_names) + } + + pub(crate) fn new_inner(client: &Client, table: &str, fields_names: String) -> Result { let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?; let mut pairs = url.query_pairs_mut(); pairs.clear(); @@ -66,12 +80,9 @@ impl Insert { pairs.append_pair("database", database); } - let fields = row::join_column_names::() - .expect("the row type must be a struct or a wrapper around it"); - // TODO: what about escaping a table name? // https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers - let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary"); + let query = format!("INSERT INTO {table}({fields_names}) FORMAT RowBinary"); pairs.append_pair("query", &query); if client.compression.is_lz4() { diff --git a/src/lib.rs b/src/lib.rs index 3af1e77..14ffefc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ pub use clickhouse_derive::Row; pub use self::{compression::Compression, row::Row}; use self::{error::Result, http_client::HttpClient}; +pub mod delete; pub mod error; pub mod insert; pub mod inserter; @@ -25,6 +26,7 @@ pub mod serde; pub mod sql; #[cfg(feature = "test-util")] pub mod test; +pub mod update; #[cfg(feature = "watch")] pub mod watch; @@ -73,7 +75,10 @@ impl Default for Client { connector.set_keepalive(Some(TCP_KEEPALIVE)); #[cfg(feature = "tls")] - let connector = HttpsConnector::new_with_connector(connector); + let connector = HttpsConnector::new_with_connector({ + connector.enforce_http(false); + connector + }); let client = hyper::Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) @@ -180,6 +185,14 @@ impl Client { insert::Insert::new(self, table) } + pub fn insert_with_fields_name( + &self, + table: &str, + fields_names: Vec, + ) -> Result> { + insert::Insert::new_with_field_names(self, table, fields_names) + } + /// Creates an inserter to perform multiple INSERTs. pub fn inserter(&self, table: &str) -> Result> { inserter::Inserter::new(self, table) @@ -190,6 +203,19 @@ impl Client { query::Query::new(self, query) } + pub fn delete(&self, table_name: &str, pk_names: Vec) -> delete::Delete { + delete::Delete::new(self, table_name, pk_names) + } + + pub fn update( + &self, + table_name: &str, + pk_names: Vec, + flieds_names: Vec, + ) -> update::Update { + update::Update::new(self, table_name, pk_names, flieds_names) + } + /// Starts a new WATCH query. #[cfg(feature = "watch")] pub fn watch(&self, query: &str) -> watch::Watch { diff --git a/src/query.rs b/src/query.rs index 198f015..1d9670c 100644 --- a/src/query.rs +++ b/src/query.rs @@ -40,6 +40,10 @@ impl Query { self } + pub fn bind_ref(&mut self, value: impl Bind) { + self.sql.bind_arg(value); + } + /// Executes the query. pub async fn execute(self) -> Result<()> { self.do_execute(false)?.finish().await diff --git a/src/sql/ser.rs b/src/sql/ser.rs index bc5f75f..0253a58 100644 --- a/src/sql/ser.rs +++ b/src/sql/ser.rs @@ -86,7 +86,6 @@ impl<'a, W: Write> Serializer for SqlSerializer<'a, W> { unsupported!( serialize_map(Option) -> Result, serialize_bytes(&[u8]), - serialize_none, serialize_unit, serialize_unit_struct(&'static str), ); @@ -144,6 +143,12 @@ impl<'a, W: Write> Serializer for SqlSerializer<'a, W> { Err(SqlSerializerError::Unsupported("serialize_some")) } + #[inline] + fn serialize_none(self) -> Result<()> { + self.writer.write_fmt(format_args!("null"))?; + Ok(()) + } + #[inline] fn serialize_unit_variant( self, @@ -340,7 +345,6 @@ mod tests { fn it_fails_on_unsupported() { let mut out = String::new(); assert!(write_arg(&mut out, &std::collections::HashMap::::new()).is_err()); - assert!(write_arg(&mut out, &None::).is_err()); assert!(write_arg(&mut out, &Some(42)).is_err()); assert!(write_arg(&mut out, &()).is_err()); diff --git a/src/update.rs b/src/update.rs new file mode 100644 index 0000000..0ee707a --- /dev/null +++ b/src/update.rs @@ -0,0 +1,45 @@ +use crate::{error::Result, query::Query, sql::Bind, Client}; + +#[must_use] +#[derive(Clone)] +pub struct Update { + query: Query, +} + +impl Update { + pub(crate) fn new( + client: &Client, + table_name: &str, + pk_names: Vec, + flieds_names: Vec, + ) -> Self { + let mut out: String = flieds_names.iter().enumerate().fold( + format!("ALTER TABLE `{table_name}` UPDATE"), + |mut res, (idx, key)| { + if idx > 0 { + res.push(','); + } + res.push_str(&format!(" `{key}` = ?")); + res + }, + ); + out.push_str(&format!(" where ")); + for (idx, pk_name) in pk_names.iter().enumerate() { + if idx > 0 { + out.push_str(" AND "); + } + out.push_str(&format!(" `{pk_name}` = ? ")); + } + let query = Query::new(client, &out); + Self { query } + } + pub async fn update_fields(mut self, fields: Vec, pk: Vec) -> Result<()> { + fields.into_iter().for_each(|a| { + self.query.bind_ref(a); + }); + for i in pk { + self.query.bind_ref(i); + } + self.query.execute().await + } +} diff --git a/tests/test_delete_and_update.rs b/tests/test_delete_and_update.rs new file mode 100644 index 0000000..44eb003 --- /dev/null +++ b/tests/test_delete_and_update.rs @@ -0,0 +1,93 @@ +use core::time::Duration; +use std::thread::sleep; + +use serde::{Deserialize, Serialize}; + +use clickhouse::Row; + +mod common; + +#[common::named] +#[tokio::test] +async fn test_update_delete() { + let client = common::prepare_database!(); + + #[derive(Debug, Row, Serialize, Deserialize)] + struct MyRow<'a> { + no: u32, + no2: u32, + name: &'a str, + list: Vec, + } + + // Create a table. + client + .query( + " + CREATE TABLE test(no UInt32, no2 UInt32, name LowCardinality(String) , list Array(UInt32)) + ENGINE = MergeTree + PRIMARY KEY (no,no2) + ", + ) + .execute() + .await + .unwrap(); + + // Write to the table. + let mut insert = client.insert("test").unwrap(); + for i in 0..5 { + insert + .write(&MyRow { + no: i, + no2: 1, + name: "foo", + list: vec![1, 2, 3], + }) + .await + .unwrap(); + } + for i in 0..5 { + insert + .write(&MyRow { + no: i, + no2: 2, + name: "foo", + list: vec![1, 2, 3], + }) + .await + .unwrap(); + } + + insert.end().await.unwrap(); + + let delete = client.delete("test", vec!["no".to_string(), "no2".to_string()]); + delete.delete(vec![1, 1]).await.unwrap(); + sleep(Duration::from_secs(1)); + + let update = client.update( + "test", + vec!["no".to_string(), "no2".to_string()], + vec![format!("name"), format!("list")], + ); + let vec = vec!["name1", "[2,5,8]"]; + update.update_fields(vec, vec![2, 2]).await.unwrap(); + + sleep(Duration::from_secs(2)); + + let mut cursor = client + .query("SELECT ?fields FROM test") + .fetch::>() + .unwrap(); + + while let Some(row) = cursor.next().await.unwrap() { + if row.no == 1 { + assert_ne!(row.no2, 1); + } else if row.no == 2 && row.no2 == 2 { + assert_eq!(row.name, "name1"); + assert_eq!(row.list, vec![2, 5, 8]); + } else { + assert_eq!(row.name, "foo"); + assert_eq!(row.list, vec![1, 2, 3]); + } + } +}