Skip to content

Commit

Permalink
[Rust] Add LakeSoulHashTable Sink for DataFusion (lakesoul-io#382)
Browse files Browse the repository at this point in the history
* support LakeSoulHashTable

Signed-off-by: zenghua <[email protected]>

* add parquet_filter_pushdown swiches

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Dec 27, 2023
1 parent f5e075e commit c748f6c
Show file tree
Hide file tree
Showing 17 changed files with 1,878 additions and 188 deletions.
3 changes: 3 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 15 additions & 22 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,11 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
TableInfo {
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: format!("{}default/{}", env::temp_dir().to_str().unwrap(), table_name),
table_path: format!("file://{}default/{}", env::temp_dir().to_str().unwrap(), table_name),
table_schema: serde_json::to_string::<ArrowJavaSchema>(&config.schema().into()).unwrap(),
table_namespace: "default".to_string(),
properties: "{}".to_string(),
partitions: ";".to_owned() + config.primary_keys_slice().iter().map(String::as_str).collect::<Vec<_>>().join(",").as_str(),
domain: "public".to_string(),
}).await?;
Ok(())
}

pub(crate) async fn create_table_with_config_map(client: MetaDataClientRef, table_name: &str, config: HashMap<String, String>) -> Result<()> {
let properties = serde_json::to_string(&config).unwrap();
client.create_table(
TableInfo {
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::<PathBuf>().to_str().unwrap().to_string(),
table_schema: config.get("schema").unwrap().clone(),
table_namespace: "default".to_string(),
properties,
partitions: format!("{};{}", config.get("rangePartitions").unwrap(), config.get("hashPartitions").unwrap()),
properties: serde_json::to_string(&LakeSoulTableProperty {hash_bucket_num: Some(4)})?,
partitions: format!("{};{}", "", config.primary_keys_slice().iter().map(String::as_str).collect::<Vec<_>>().join(",")),
domain: "public".to_string(),
}).await?;
Ok(())
Expand Down Expand Up @@ -98,11 +82,19 @@ pub(crate) fn parse_table_info_partitions(partitions: String) -> (Vec<String>, V
)
}

pub(crate) async fn commit_data(client: MetaDataClientRef, table_name: &str, files: &[String]) -> Result<()>{
pub(crate) async fn commit_data(client: MetaDataClientRef, table_name: &str, partitions: Vec<(String, String)>, files: &[String]) -> Result<()>{
let table_name_id = client.get_table_name_id_by_table_name(table_name, "default").await?;
client.commit_data_commit_info(DataCommitInfo {
table_id: table_name_id.table_id,
partition_desc: "-5".to_string(),
partition_desc: if partitions.is_empty() {
"-5".to_string()
} else {
partitions
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
},
file_ops: files
.iter()
.map(|file| DataFileOp {
Expand All @@ -117,7 +109,8 @@ pub(crate) async fn commit_data(client: MetaDataClientRef, table_name: &str, fil
let (high, low) = uuid::Uuid::new_v4().as_u64_pair();
Some(Uuid{high, low})
},
..Default::default()
committed: false,
domain: "public".to_string(),
}).await?;
Ok(())
}
Loading

0 comments on commit c748f6c

Please sign in to comment.