Skip to content

Commit

Permalink
feat: merge schema support for the write operation and Python (with R…
Browse files Browse the repository at this point in the history
…ust engine)

This replaces the old "overwrite_schema" parameter with a
schema_write_mode parameter that basically allows to distinguish between
overwrite/merge/none

Fixes delta-io#1386
  • Loading branch information
aersam authored and rtyler committed Mar 5, 2024
1 parent dadefc3 commit b880f51
Show file tree
Hide file tree
Showing 16 changed files with 702 additions and 126 deletions.
115 changes: 94 additions & 21 deletions crates/core/src/operations/cast.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,109 @@
//! Provide common cast functionality for callers
//!
use arrow_array::{Array, ArrayRef, RecordBatch, StructArray};
use arrow::datatypes::DataType::Dictionary;
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{DataType, Fields, SchemaRef as ArrowSchemaRef};

use arrow_schema::{
ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use std::sync::Arc;

use crate::DeltaResult;

pub(crate) fn merge_field(left: &ArrowField, right: &ArrowField) -> Result<ArrowField, ArrowError> {
if let Dictionary(_, value_type) = right.data_type() {
if value_type.equals_datatype(left.data_type()) {
return Ok(left.clone());
}
}
if let Dictionary(_, value_type) = left.data_type() {
if value_type.equals_datatype(right.data_type()) {
return Ok(right.clone());
}
}
let mut new_field = left.clone();
new_field.try_merge(right)?;
Ok(new_field)
}

pub(crate) fn merge_schema(
left: ArrowSchema,
right: ArrowSchema,
) -> Result<ArrowSchema, ArrowError> {
let mut errors = Vec::with_capacity(left.fields().len());
let merged_fields: Result<Vec<ArrowField>, ArrowError> = left
.fields()
.iter()
.map(|field| {
let right_field = right.field_with_name(field.name());
if let Ok(right_field) = right_field {
let field_or_not = merge_field(field.as_ref(), right_field);
match field_or_not {
Err(e) => {
errors.push(e.to_string());
Err(e)
}
Ok(f) => Ok(f),
}
} else {
Ok(field.as_ref().clone())
}
})
.collect();
match merged_fields {
Ok(mut fields) => {
for field in right.fields() {
if !left.field_with_name(field.name()).is_ok() {
fields.push(field.as_ref().clone());
}
}

Ok(ArrowSchema::new(fields))
}
Err(e) => {
errors.push(e.to_string());
Err(ArrowError::SchemaError(errors.join("\n")))
}
}
}

fn cast_struct(
struct_array: &StructArray,
fields: &Fields,
cast_options: &CastOptions,
add_missing: bool,
) -> Result<Vec<Arc<(dyn Array)>>, arrow_schema::ArrowError> {
fields
.iter()
.map(|field| {
let col = struct_array.column_by_name(field.name()).unwrap();
if let (DataType::Struct(_), DataType::Struct(child_fields)) =
(col.data_type(), field.data_type())
{
let child_struct = StructArray::from(col.into_data());
let s = cast_struct(&child_struct, child_fields, cast_options)?;
Ok(Arc::new(StructArray::new(
child_fields.clone(),
s,
child_struct.nulls().map(ToOwned::to_owned),
)) as ArrayRef)
} else if is_cast_required(col.data_type(), field.data_type()) {
cast_with_options(col, field.data_type(), cast_options)
} else {
Ok(col.clone())
let col_or_not = struct_array.column_by_name(field.name());
match col_or_not {
None => match add_missing {
true => Ok(new_null_array(field.data_type(), struct_array.len())),
false => Err(arrow_schema::ArrowError::SchemaError(format!(
"Could not find column {0}",
field.name()
))),
},
Some(col) => {
if let (DataType::Struct(_), DataType::Struct(child_fields)) =
(col.data_type(), field.data_type())
{
let child_struct = StructArray::from(col.into_data());
let s =
cast_struct(&child_struct, child_fields, cast_options, add_missing)?;
Ok(Arc::new(StructArray::new(
child_fields.clone(),
s,
child_struct.nulls().map(ToOwned::to_owned),
)) as ArrayRef)
} else if is_cast_required(col.data_type(), field.data_type()) {
cast_with_options(col, field.data_type(), cast_options)
} else {
Ok(col.clone())
}
}
}
})
.collect::<Result<Vec<_>, _>>()
Expand All @@ -51,6 +124,7 @@ pub fn cast_record_batch(
batch: &RecordBatch,
target_schema: ArrowSchemaRef,
safe: bool,
add_missing: bool,
) -> DeltaResult<RecordBatch> {
let cast_options = CastOptions {
safe,
Expand All @@ -62,8 +136,7 @@ pub fn cast_record_batch(
batch.columns().to_owned(),
None,
);

let columns = cast_struct(&s, target_schema.fields(), &cast_options)?;
let columns = cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?;
Ok(RecordBatch::try_new(target_schema, columns)?)
}

Expand Down Expand Up @@ -93,7 +166,7 @@ mod tests {
)]);
let target_schema = Arc::new(Schema::new(fields)) as SchemaRef;

let result = cast_record_batch(&record_batch, target_schema, false);
let result = cast_record_batch(&record_batch, target_schema, false, false);

let schema = result.unwrap().schema();
let field = schema.column_with_name("list_column").unwrap().1;
Expand Down
11 changes: 9 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! .await?;
//! ````

use core::panic;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -167,9 +168,15 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
None,
)
.await?;
.await?
.into_iter()
.map(|a| match a {
Action::Add(a) => a,
_ => panic!("Expected Add action"),
})
.collect::<Vec<Add>>();

let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows());
let filter_records = filter.metrics().and_then(|m| m.output_rows());
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,13 +1379,13 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
None,
)
.await?;

metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64;

let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();
metrics.num_target_files_added = actions.len();

let survivors = barrier
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,12 @@ impl MergePlan {
while let Some(maybe_batch) = read_stream.next().await {
let mut batch = maybe_batch?;

batch =
super::cast::cast_record_batch(&batch, task_parameters.file_schema.clone(), false)?;
batch = super::cast::cast_record_batch(
&batch,
task_parameters.file_schema.clone(),
false,
false,
)?;
partial_metrics.num_batches += 1;
writer.write(&batch).await.map_err(DeltaTableError::from)?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
None,
)
.await?;

Expand All @@ -377,7 +377,7 @@ async fn execute(
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();

metrics.num_added_files = actions.len();
metrics.num_removed_files = candidates.candidates.len();
Expand Down
Loading

0 comments on commit b880f51

Please sign in to comment.