Skip to content

Commit

Permalink
Add hive metastore catalog support (part 1/2) (#237)
Browse files Browse the repository at this point in the history
* fmt members

* setup basic test-infra for hms-catalog

* add license

* add hms create_namespace

* add hms get_namespace

* fix: typo

* add hms namespace_exists and drop_namespace

* add hms update_namespace

* move fns into HmsCatalog

* use `expose` in docker-compose

* add hms list_tables

* fix: clippy

* fix: cargo sort

* fix: cargo workspace

* move fns into utils + add constants

* include database name in error msg

* add pilota to cargo workspace

* add minio version

* change visibility to pub(crate); return namespace from conversion fn

* add minio version in rest-catalog docker-compose

* fix: hms test docker infrastructure

* add version to minio/mc

* fix: license header

* fix: core-site

---------

Co-authored-by: mlanhenke <[email protected]>
  • Loading branch information
marvinlanhenke and mlanhenke authored Mar 10, 2024
1 parent 0914f7a commit 3a947fa
Show file tree
Hide file tree
Showing 12 changed files with 825 additions and 41 deletions.
8 changes: 4 additions & 4 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

[advisories]
ignore = [
# rsa
# Marvin Attack: potential key recovery through timing sidechannels
# Issues: https://github.com/apache/iceberg-rust/issues/221
"RUSTSEC-2023-0071",
# rsa
# Marvin Attack: potential key recovery through timing sidechannels
# Issues: https://github.com/apache/iceberg-rust/issues/221
"RUSTSEC-2023-0071",
]
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

[workspace]
resolver = "2"
members = ["crates/catalog/*", "crates/examples", "crates/iceberg", "crates/test_utils"]
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/test_utils",
]

[workspace.package]
version = "0.2.0"
Expand Down Expand Up @@ -55,6 +60,7 @@ once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
parquet = "50"
pilota = "0.10.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
Expand Down
7 changes: 7 additions & 0 deletions crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,12 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
hive_metastore = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
pilota = { workspace = true }
typed-builder = { workspace = true }
volo-thrift = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
160 changes: 145 additions & 15 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::utils::*;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
Expand All @@ -28,6 +29,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;

/// Which variant of the thrift transport to communicate with HMS
/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
Expand Down Expand Up @@ -97,7 +99,6 @@ impl HmsCatalog {
}
}

/// Refer to <https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java> for implementation details.
#[async_trait]
impl Catalog for HmsCatalog {
/// HMS doesn't support nested namespaces.
Expand Down Expand Up @@ -125,36 +126,165 @@ impl Catalog for HmsCatalog {
.collect())
}

/// Creates a new namespace with the given identifier and properties.
///
/// Attempts to create a namespace defined by the `namespace`
/// parameter and configured with the specified `properties`.
///
/// This function can return an error in the following situations:
///
/// - If `hive.metastore.database.owner-type` is specified without
/// `hive.metastore.database.owner`,
/// - Errors from `validate_namespace` if the namespace identifier does not
/// meet validation criteria.
/// - Errors from `convert_to_database` if the properties cannot be
/// successfully converted into a database configuration.
/// - Errors from the underlying database creation process, converted using
/// `from_thrift_error`.
async fn create_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
todo!()
let database = convert_to_database(namespace, &properties)?;

self.client
.0
.create_database(database)
.await
.map_err(from_thrift_error)?;

Ok(Namespace::new(namespace.clone()))
}

async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
todo!()
/// Retrieves a namespace by its identifier.
///
/// Validates the given namespace identifier and then queries the
/// underlying database client to fetch the corresponding namespace data.
/// Constructs a `Namespace` object with the retrieved data and returns it.
///
/// This function can return an error in any of the following situations:
/// - If the provided namespace identifier fails validation checks
/// - If there is an error querying the database, returned by
/// `from_thrift_error`.
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let name = validate_namespace(namespace)?;

let db = self
.client
.0
.get_database(name.clone().into())
.await
.map_err(from_thrift_error)?;

let ns = convert_to_namespace(&db)?;

Ok(ns)
}

async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
todo!()
/// Checks if a namespace exists within the Hive Metastore.
///
/// Validates the namespace identifier by querying the Hive Metastore
/// to determine if the specified namespace (database) exists.
///
/// # Returns
/// A `Result<bool>` indicating the outcome of the check:
/// - `Ok(true)` if the namespace exists.
/// - `Ok(false)` if the namespace does not exist, identified by a specific
/// `UserException` variant.
/// - `Err(...)` if an error occurs during validation or the Hive Metastore
/// query, with the error encapsulating the issue.
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let name = validate_namespace(namespace)?;

let resp = self.client.0.get_database(name.clone().into()).await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if let ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
_,
)) = &err
{
Ok(false)
} else {
Err(from_thrift_error(err))
}
}
}
}

/// Asynchronously updates properties of an existing namespace.
///
/// Converts the given namespace identifier and properties into a database
/// representation and then attempts to update the corresponding namespace
/// in the Hive Metastore.
///
/// # Returns
/// Returns `Ok(())` if the namespace update is successful. If the
/// namespace cannot be updated due to missing information or an error
/// during the update process, an `Err(...)` is returned.
async fn update_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
todo!()
let db = convert_to_database(namespace, &properties)?;

let name = match &db.name {
Some(name) => name,
None => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Database name must be specified",
))
}
};

self.client
.0
.alter_database(name.clone(), db)
.await
.map_err(from_thrift_error)?;

Ok(())
}

async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
todo!()
/// Asynchronously drops a namespace from the Hive Metastore.
///
/// # Returns
/// A `Result<()>` indicating the outcome:
/// - `Ok(())` signifies successful namespace deletion.
/// - `Err(...)` signifies failure to drop the namespace due to validation
/// errors, connectivity issues, or Hive Metastore constraints.
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let name = validate_namespace(namespace)?;

self.client
.0
.drop_database(name.into(), false, false)
.await
.map_err(from_thrift_error)?;

Ok(())
}

async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
todo!()
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let name = validate_namespace(namespace)?;

let tables = self
.client
.0
.get_all_tables(name.clone().into())
.await
.map_err(from_thrift_error)?;

let tables = tables
.iter()
.map(|table| TableIdent::new(namespace.clone(), table.to_string()))
.collect();

Ok(tables)
}

async fn create_table(
Expand Down
Loading

0 comments on commit 3a947fa

Please sign in to comment.