Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableMetadataBuilder #587

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ impl Catalog for SqlCatalog {
}
};

let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
let tbl_metadata_location = format!(
"{}/metadata/0-{}.metadata.json",
location.clone(),
Expand Down
61 changes: 52 additions & 9 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,46 @@ impl TableUpdate {
/// Applies the update to the table metadata builder.
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
if let Some(last_column_id) = last_column_id {
if builder.last_column_id() > last_column_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid last column ID: {last_column_id} < {} (previous last column ID)",
builder.last_column_id()
),
));
}
};
Ok(builder.add_schema(schema))
}
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
builder.set_default_sort_order(sort_order_id)
}
TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => builder.set_ref(&ref_name, reference),
TableUpdate::RemoveSnapshots { snapshot_ids } => {
Ok(builder.remove_snapshots(&snapshot_ids))
}
TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
TableUpdate::SetProperties { updates } => builder.set_properties(updates),
TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
TableUpdate::UpgradeFormatVersion { format_version } => {
builder.upgrade_format_version(format_version)
}
}
}
}
Expand Down Expand Up @@ -799,9 +837,9 @@ mod tests {
TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
.unwrap()
.build()
.unwrap()
.metadata
}

#[test]
Expand Down Expand Up @@ -912,14 +950,14 @@ mod tests {
#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();

println!("{:?}", metadata.last_partition_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this debug statement?

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1,
last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 0,
last_assigned_partition_id: 999,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
Expand Down Expand Up @@ -1578,16 +1616,21 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
.unwrap()
.metadata;
let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
table_metadata,
Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
);

let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(updated_metadata.uuid(), uuid);
}

Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod schema;
mod snapshot;
mod sort;
mod table_metadata;
mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
Expand Down
6 changes: 5 additions & 1 deletion crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ impl BoundPartitionSpec {
}

/// Get the highest field id in the partition spec.
/// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999).
pub fn highest_field_id(&self) -> Option<i32> {
self.fields.iter().map(|f| f.field_id).max()
}
Expand Down Expand Up @@ -182,6 +181,11 @@ impl BoundPartitionSpec {

true
}

/// Change the spec id of the partition spec
pub fn with_spec_id(self, spec_id: i32) -> Self {
Self { spec_id, ..self }
}
}

impl SchemalessPartitionSpec {
Expand Down
19 changes: 18 additions & 1 deletion crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl SchemaBuilder {
/// Reassignment starts from the field-id specified in `start_from` (inclusive).
///
/// All specified aliases and identifier fields will be updated to the new field-ids.
#[allow(dead_code)] // Will be needed in TableMetadataBuilder
pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self {
self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX));
self
Expand Down Expand Up @@ -376,6 +375,24 @@ impl Schema {
pub fn accessor_by_field_id(&self, field_id: i32) -> Option<Arc<StructAccessor>> {
self.field_id_to_accessor.get(&field_id).cloned()
}

/// Check if this schema is identical to another schema semantically - excluding schema id.
pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement PartialEq, Eq trait?

self.as_struct().eq(other.as_struct())
&& self.identifier_field_ids().eq(other.identifier_field_ids())
}

/// Change the schema id of this schema.
// This is redundant with the `with_schema_id` method on the builder, but useful
// as it is infallible in contrast to the builder `build()` method.
pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self {
Self { schema_id, ..self }
}

/// Return A HashMap matching field ids to field names.
pub(crate) fn field_id_to_name_map(&self) -> &HashMap<i32, String> {
&self.id_to_name
}
}

impl Display for Schema {
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ pub struct SnapshotReference {
pub retention: SnapshotRetention,
}

impl SnapshotReference {
/// Returns true if the snapshot reference is a branch.
pub fn is_branch(&self) -> bool {
matches!(self.retention, SnapshotRetention::Branch { .. })
}
}

impl SnapshotReference {
/// Create new snapshot reference
pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
Expand Down
113 changes: 17 additions & 96 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;

use super::snapshot::SnapshotReference;
pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
use super::{
BoundPartitionSpec, BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef,
Snapshot, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, SnapshotRef,
SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
};
use crate::error::{timestamp_ms_to_utc, Result};
use crate::{Error, ErrorKind, TableCreation};
use crate::{Error, ErrorKind};

static MAIN_BRANCH: &str = "main";
pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
Expand Down Expand Up @@ -165,6 +166,17 @@ pub struct TableMetadata {
}

impl TableMetadata {
/// Convert this Table Metadata into a builder for modification.
///
/// `current_file_location` is the location where the current version
/// of the metadata file is stored. This is used to update the metadata log.
/// If `current_file_location` is `None`, the metadata log will not be updated.
/// This should only be used to stage-create tables.
#[must_use]
pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
TableMetadataBuilder::new_from_metadata(self, current_file_location)
}

/// Returns format version of this metadata.
#[inline]
pub fn format_version(&self) -> FormatVersion {
Expand Down Expand Up @@ -559,98 +571,6 @@ impl TableMetadata {
}
}

/// Manipulating table metadata.
pub struct TableMetadataBuilder(TableMetadata);

impl TableMetadataBuilder {
/// Creates a new table metadata builder from the given table metadata.
pub fn new(origin: TableMetadata) -> Self {
Self(origin)
}

/// Creates a new table metadata builder from the given table creation.
pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
let TableCreation {
name: _,
location,
schema,
partition_spec,
sort_order,
properties,
} = table_creation;

let schema: Arc<super::Schema> = Arc::new(schema);
let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema.clone());
let partition_specs = match partition_spec {
Some(_) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Can't create table with partition spec now",
))
}
None => HashMap::from([(
unpartition_spec.spec_id(),
Arc::new(unpartition_spec.clone().into_schemaless()),
)]),
};

let sort_orders = match sort_order {
Some(_) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Can't create table with sort order now",
))
}
None => HashMap::from([(
SortOrder::UNSORTED_ORDER_ID,
Arc::new(SortOrder::unsorted_order()),
)]),
};

let mut table_metadata = TableMetadata {
format_version: FormatVersion::V2,
table_uuid: Uuid::now_v7(),
location: location.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Can't create table without location",
)
})?,
last_sequence_number: 0,
last_updated_ms: Utc::now().timestamp_millis(),
last_column_id: schema.highest_field_id(),
current_schema_id: schema.schema_id(),
schemas: HashMap::from([(schema.schema_id(), schema)]),
partition_specs,
default_spec: BoundPartitionSpecRef::new(unpartition_spec),
last_partition_id: 0,
properties,
current_snapshot_id: None,
snapshots: Default::default(),
snapshot_log: vec![],
sort_orders,
metadata_log: vec![],
default_sort_order_id: SortOrder::UNSORTED_ORDER_ID,
refs: Default::default(),
};

table_metadata.try_normalize()?;

Ok(Self(table_metadata))
}

/// Changes uuid of table metadata.
pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
self.0.table_uuid = uuid;
Ok(self)
}

/// Returns the new table metadata after changes.
pub fn build(self) -> Result<TableMetadata> {
Ok(self.0)
}
}

pub(super) mod _serde {
use std::borrow::BorrowMut;
/// This is a helper module that defines types to help with serialization/deserialization.
Expand Down Expand Up @@ -2328,7 +2248,8 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(table_metadata.location, "s3://db/table");
assert_eq!(table_metadata.schemas.len(), 1);
assert_eq!(
Expand Down
Loading
Loading