Skip to content

Commit

Permalink
add sql test
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Jan 25, 2024
1 parent 4fcdb69 commit 3939077
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 76 deletions.
105 changes: 77 additions & 28 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
use crate::catalog::LakeSoulNamespace;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionContext;
use lakesoul_metadata::MetaDataClientRef;
use proto::proto::entity::Namespace;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use tokio::runtime::Handle;

/// A metadata wrapper
/// may need a lock
pub struct LakeSoulCatalog {
metadata_client: MetaDataClientRef,
context: Arc<SessionContext>,
catalog_lock: RwLock<()>,
}

impl Debug for LakeSoulCatalog {
Expand All @@ -29,6 +33,7 @@ impl LakeSoulCatalog {
Self {
metadata_client: meta_data_client_ref,
context,
catalog_lock: RwLock::new(()),
}
}
pub fn metadata_client(&self) -> MetaDataClientRef {
Expand All @@ -37,6 +42,15 @@ impl LakeSoulCatalog {
pub fn context(&self) -> Arc<SessionContext> {
self.context.clone()
}

fn get_all_namespace(&self) -> crate::error::Result<Vec<Namespace>> {
let client = self.metadata_client.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move { Ok(client.get_all_namespace().await?) })
.await?
})
}
}

impl CatalogProvider for LakeSoulCatalog {
Expand All @@ -45,45 +59,61 @@ impl CatalogProvider for LakeSoulCatalog {
}

fn schema_names(&self) -> Vec<String> {
let client = self.metadata_client.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move { client.get_all_namespace().await.unwrap() })
.await
.unwrap()
.into_iter()
.map(|t| t.namespace)
.collect()
})
let _guard = self.catalog_lock.read();
if let Ok(v) = self.get_all_namespace() {
v.into_iter().map(|np| np.namespace).collect()
} else {
vec![]
}
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if self.schema_names().contains(&name.to_string()) {
Some(Arc::new(LakeSoulNamespace::new(
let _guard = self.catalog_lock.read();
match self.get_all_namespace() {
Ok(v) if v.iter().any(|np| np.namespace == name) => Some(Arc::new(LakeSoulNamespace::new(
self.metadata_client.clone(),
self.context.clone(),
name,
)))
} else {
None
))),
_ => None,
}
}

/// Adds a new schema to this catalog.
///
/// If a schema of the same name existed before, it is replaced in
/// the catalog and returned.
///
/// By default returns a "Not Implemented" error
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
_schema: Arc<dyn SchemaProvider>,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn SchemaProvider>>> {
// the type info of dyn schema is not enough, nothing to use
let _ = name;
let _ = schema;
unimplemented!("Registering new schemas is not supported")
let _guard = self.catalog_lock.write();
let client = self.metadata_client.clone();
let schema: Option<Arc<dyn SchemaProvider>> = {
match self.get_all_namespace() {
Ok(v) if v.iter().any(|np| np.namespace == name) => Some(Arc::new(LakeSoulNamespace::new(
self.metadata_client.clone(),
self.context.clone(),
name,
))),
_ => None,
}
};
// use default value
let np = Namespace {
namespace: name.into(),
properties: "{}".into(),
comment: "created by lakesoul-datafusion".into(),
domain: "public".into(),
};
let _ = futures::executor::block_on(async move {
Handle::current()
.spawn(async move { client.create_namespace(np).await })
.await
.expect("tokio join error in register schema")
});
Ok(schema)
}

/// Removes a schema from this catalog. Implementations of this method should return
Expand All @@ -94,14 +124,33 @@ impl CatalogProvider for LakeSoulCatalog {
///
/// Implementations of this method should return None if schema with `name`
/// does not exist.
///
/// By default returns a "Not Implemented" error
fn deregister_schema(
&self,
_name: &str,
_cascade: bool,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn SchemaProvider>>> {
// the type info of dyn schema is not enough, nothing to use
unimplemented!("Deregistering new schemas is not supported")
// Not supported
// let _guard = self.catalog_lock.write();
// let client = self.metadata_client.clone();
// let schema: Option<Arc<dyn SchemaProvider>> = {
// match self.get_all_namespace() {
// Ok(v) if v.iter().any(|np| np.namespace == name) => Some(Arc::new(LakeSoulNamespace::new(
// self.metadata_client.clone(),
// self.context.clone(),
// name,
// ))),
// _ => None,
// }
// };
// let namespace = name.to_string();
// if let Some(s) = schema {
// if !s.table_names().is_empty() && !cascade {
// return Err(DataFusionError::External("can not delete".into()));
// }
// // delete all tables
// return Ok(Some(s));
// }
// return Ok(None);
Err(DataFusionError::NotImplemented("Not supported".into()))
}
}
}
90 changes: 78 additions & 12 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,34 @@
// SPDX-License-Identifier: Apache-2.0

use crate::catalog::create_io_config_builder;
use crate::error::Result;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionContext;
use lakesoul_io::datasource::file_format::LakeSoulParquetFormat;
use lakesoul_io::datasource::listing::LakeSoulListingTable;
use lakesoul_metadata::error::LakeSoulMetaDataError;
use lakesoul_metadata::MetaDataClientRef;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::field::debug;

/// A ['SchemaProvider`] that query pg to automatically discover tables
/// A [`SchemaProvider`] that query pg to automatically discover tables.
/// Due to the restriction of datafusion 's api, "CREATE [EXTERNAL] Table ... " is not supported.
/// May have race condition
pub struct LakeSoulNamespace {
metadata_client: MetaDataClientRef,
context: Arc<SessionContext>,
// primary key
namespace: String,
namespace_lock: Arc<RwLock<()>>,
}

impl LakeSoulNamespace {
Expand All @@ -32,6 +39,7 @@ impl LakeSoulNamespace {
metadata_client: meta_data_client_ref,
context,
namespace: namespace.to_string(),
namespace_lock: Arc::new(RwLock::new(())),
}
}

Expand All @@ -46,6 +54,12 @@ impl LakeSoulNamespace {
pub fn namespace(&self) -> &str {
&self.namespace
}

/// Dangerous
/// Should use transaction?
fn _delete_all_tables(&self) -> Result<()> {
unimplemented!()
}
}

impl Debug for LakeSoulNamespace {
Expand All @@ -61,25 +75,29 @@ impl SchemaProvider for LakeSoulNamespace {
}

/// query table_name_id by namespace
/// ref: https://www.modb.pro/db/618126
fn table_names(&self) -> Vec<String> {
let client = self.metadata_client.clone();
let np = self.namespace.clone();
let lock = self.namespace_lock.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move { client.get_all_table_name_id_by_namespace(&np).await.unwrap() })
.spawn(async move {
let _guard = lock.read().await;
client.get_all_table_name_id_by_namespace(&np).await.unwrap()
})
.await
.unwrap()
.into_iter()
.map(|t| t.table_name)
.collect()
})
.into_iter()
.map(|v| v.table_name)
.collect()
}

/// Search table by name
/// return LakeSoulListing table
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if let Ok(_t) = self
let _guard = self.namespace_lock.read().await;
if let Ok(_) = self
.metadata_client
.get_table_info_by_table_name(name, &self.namespace)
.await
Expand Down Expand Up @@ -132,15 +150,63 @@ impl SchemaProvider for LakeSoulNamespace {
/// If no table of that name exists, returns Ok(None).
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn TableProvider>>> {
// the type info of dyn TableProvider is not enough or use AST??????
unimplemented!("schema provider does not support deregistering tables")
let client = self.metadata_client.clone();
let table_name = name.to_string();
let np = self.namespace.clone();
let cxt = self.context.clone();
let lock = self.namespace_lock.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move {
// get table info
let _guard = lock.write().await;
match client.get_table_info_by_table_name(&table_name, &np).await {
Ok(table_info) => {
let config;
if let Ok(config_builder) =
create_io_config_builder(client.clone(), Some(&table_name), true, &np).await
{
config = config_builder.build();
} else {
return Err(DataFusionError::External("get table provider config failed".into()));
}
// Maybe should change
let file_format = Arc::new(LakeSoulParquetFormat::new(
Arc::new(ParquetFormat::new()),
config.clone(),
));
if let Ok(table_provider) = LakeSoulListingTable::new_with_config_and_format(
&cxt.state(),
config,
file_format,
// care this
false,
)
.await
{
debug!("get table provider success");
client
.delete_table_by_table_info_cascade(&table_info)
.await
.map_err(|_| DataFusionError::External("delete table info failed".into()))?;
return Ok(Some(Arc::new(table_provider) as Arc<dyn TableProvider>));
}
debug("get table provider fail");
return Err(DataFusionError::External("get table provider failed".into()));
}
Err(e) => match e {
LakeSoulMetaDataError::NotFound(_) => Ok(None),
_ => Err(DataFusionError::External("get table info failed".into())),
},
}
})
.await
.unwrap()
})
}

fn table_exist(&self, name: &str) -> bool {
// table name is primary key for `table_name_id`
self.table_names().into_iter().any(|s| s == name)
}
}

#[cfg(test)]
mod test {}
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::error::Result;
// pub mod lakesoul_sink;
// pub mod lakesoul_source;
mod lakesoul_catalog;
// used in catalog_test, but still say unused_imports, i think it is a bug about rust-lint.
// used in catalog_test, but still say unused_imports, i think it is a bug about rust-lint.
// this is a workaround
#[cfg(test)]
pub use lakesoul_catalog::*;
Expand Down
10 changes: 10 additions & 0 deletions rust/lakesoul-datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{error::Error, fmt::Display, result, sync::Arc};
use tokio::task::JoinError;

use lakesoul_io::lakesoul_reader::{ArrowError, DataFusionError};
use lakesoul_metadata::error::LakeSoulMetaDataError;
Expand All @@ -22,6 +23,7 @@ pub enum LakeSoulError {
DataFusionError(DataFusionError),
ArrowError(ArrowError),
SerdeJsonError(serde_json::Error),
TokioJoinError(tokio::task::JoinError),
Internal(String),
}

Expand Down Expand Up @@ -49,13 +51,20 @@ impl From<serde_json::Error> for LakeSoulError {
}
}

impl From<tokio::task::JoinError> for LakeSoulError {
fn from(err: JoinError) -> Self {
Self::TokioJoinError(err)
}
}

impl Display for LakeSoulError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
LakeSoulError::MetaDataError(ref desc) => write!(f, "metadata error: {desc}"),
LakeSoulError::DataFusionError(ref desc) => write!(f, "DataFusion error: {desc}"),
LakeSoulError::SerdeJsonError(ref desc) => write!(f, "serde_json error: {desc}"),
LakeSoulError::ArrowError(ref desc) => write!(f, "arrow error: {desc}"),
LakeSoulError::TokioJoinError(ref desc) => write!(f, "tokio error: {desc}"),
LakeSoulError::Internal(ref desc) => {
write!(
f,
Expand All @@ -74,6 +83,7 @@ impl Error for LakeSoulError {
LakeSoulError::DataFusionError(e) => Some(e),
LakeSoulError::SerdeJsonError(e) => Some(e),
LakeSoulError::ArrowError(e) => Some(e),
LakeSoulError::TokioJoinError(e) => Some(e),
LakeSoulError::Internal(_) => None,
}
}
Expand Down
Loading

0 comments on commit 3939077

Please sign in to comment.