Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge schema #2229

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cd11c76
compiles ;)
aersam Feb 28, 2024
3ce861a
python compiles
aersam Feb 28, 2024
36d3463
fmt & clippy
aersam Feb 28, 2024
3c21940
renamings
aersam Feb 29, 2024
3c9ff11
clippy's feedback
aersam Feb 29, 2024
4d99d99
test seems ok
aersam Mar 1, 2024
d159020
fmt
aersam Mar 1, 2024
fd457d8
clippy feedback
aersam Mar 1, 2024
a8a711c
compiles again after refactoring
aersam Mar 1, 2024
f515f31
fmt
aersam Mar 1, 2024
6182cff
clippy
aersam Mar 1, 2024
ca761a2
wip on new merge method
aersam Mar 1, 2024
35027ed
fmt
aersam Mar 1, 2024
d95889a
next fix
aersam Mar 1, 2024
36fa567
WIP
aersam Mar 1, 2024
1602333
compiles again
aersam Mar 1, 2024
563bf30
fmt
aersam Mar 1, 2024
0f97fd7
might fixes test
aersam Mar 1, 2024
0f7fba5
better cast
aersam Mar 1, 2024
950cd23
test passes!
aersam Mar 1, 2024
449007c
Merge branch 'main' of https://github.com/bmsuisse/delta-rs into appe…
aersam Mar 4, 2024
3292de0
tests passing in both rust and python
aersam Mar 4, 2024
dfec2ac
fnt
aersam Mar 4, 2024
4a09921
format
aersam Mar 4, 2024
46c084a
thanks, clippy for your feedback
aersam Mar 4, 2024
e629f4c
fix ruff and mypy version and format
aersam Mar 4, 2024
360c43b
Merge branch 'linter_versions' into append-python
aersam Mar 4, 2024
f86d069
Merge branch 'main' into append-python
aersam Mar 4, 2024
d14b4b0
validate schema if schema_mode not given
aersam Mar 4, 2024
dc71771
use new schema_mode parameter and refactor tests to match new behavior
aersam Mar 4, 2024
4c7a9e1
docs
aersam Mar 4, 2024
9fbb9bb
fmt
aersam Mar 4, 2024
d70b716
remove parameter that causes trouble with pyarrow 8
aersam Mar 4, 2024
e816061
format again :)
aersam Mar 4, 2024
b07f219
fighting with py 3.8 ;)
aersam Mar 4, 2024
a7ee463
address feedback
aersam Mar 4, 2024
6a9012b
clippy
aersam Mar 4, 2024
06eb8b3
errors
aersam Mar 5, 2024
9b81041
fmt
aersam Mar 5, 2024
e3f8b8b
unused import
aersam Mar 5, 2024
e26add2
Merge branch 'main' into append-python
aersam Mar 5, 2024
15d4be3
Better exception handling
aersam Mar 5, 2024
2d36999
commit missing file
aersam Mar 5, 2024
82a0233
do not use 0.9.1 of object_store for now
aersam Mar 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
object_store = { version = "0.9" }
object_store = { version = "=0.9.0" }
parquet = { version = "50" }

# datafusion
Expand Down
115 changes: 94 additions & 21 deletions crates/core/src/operations/cast.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,109 @@
//! Provide common cast functionality for callers
//!
use arrow_array::{Array, ArrayRef, RecordBatch, StructArray};
use arrow::datatypes::DataType::Dictionary;
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{DataType, Fields, SchemaRef as ArrowSchemaRef};

use arrow_schema::{
ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use std::sync::Arc;

use crate::DeltaResult;

pub(crate) fn merge_field(left: &ArrowField, right: &ArrowField) -> Result<ArrowField, ArrowError> {
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
if let Dictionary(_, value_type) = right.data_type() {
if value_type.equals_datatype(left.data_type()) {
return Ok(left.clone());
aersam marked this conversation as resolved.
Show resolved Hide resolved
}
}
if let Dictionary(_, value_type) = left.data_type() {
if value_type.equals_datatype(right.data_type()) {
return Ok(right.clone());
}
}
let mut new_field = left.clone();
new_field.try_merge(right)?;
Ok(new_field)
}

pub(crate) fn merge_schema(
left: ArrowSchema,
right: ArrowSchema,
) -> Result<ArrowSchema, ArrowError> {
let mut errors = Vec::with_capacity(left.fields().len());
let merged_fields: Result<Vec<ArrowField>, ArrowError> = left
.fields()
.iter()
.map(|field| {
let right_field = right.field_with_name(field.name());
if let Ok(right_field) = right_field {
let field_or_not = merge_field(field.as_ref(), right_field);
match field_or_not {
Err(e) => {
errors.push(e.to_string());
Err(e)
}
Ok(f) => Ok(f),
}
} else {
Ok(field.as_ref().clone())
}
})
.collect();
match merged_fields {
Ok(mut fields) => {
for field in right.fields() {
if !left.field_with_name(field.name()).is_ok() {
fields.push(field.as_ref().clone());
}
}

Ok(ArrowSchema::new(fields))
}
Err(e) => {
errors.push(e.to_string());
Err(ArrowError::SchemaError(errors.join("\n")))
}
}
}

fn cast_struct(
struct_array: &StructArray,
fields: &Fields,
cast_options: &CastOptions,
add_missing: bool,
) -> Result<Vec<Arc<(dyn Array)>>, arrow_schema::ArrowError> {
fields
.iter()
.map(|field| {
let col = struct_array.column_by_name(field.name()).unwrap();
if let (DataType::Struct(_), DataType::Struct(child_fields)) =
(col.data_type(), field.data_type())
{
let child_struct = StructArray::from(col.into_data());
let s = cast_struct(&child_struct, child_fields, cast_options)?;
Ok(Arc::new(StructArray::new(
child_fields.clone(),
s,
child_struct.nulls().map(ToOwned::to_owned),
)) as ArrayRef)
} else if is_cast_required(col.data_type(), field.data_type()) {
cast_with_options(col, field.data_type(), cast_options)
} else {
Ok(col.clone())
let col_or_not = struct_array.column_by_name(field.name());
match col_or_not {
None => match add_missing {
true => Ok(new_null_array(field.data_type(), struct_array.len())),
false => Err(arrow_schema::ArrowError::SchemaError(format!(
"Could not find column {0}",
field.name()
))),
},
Some(col) => {
if let (DataType::Struct(_), DataType::Struct(child_fields)) =
(col.data_type(), field.data_type())
{
let child_struct = StructArray::from(col.into_data());
let s =
cast_struct(&child_struct, child_fields, cast_options, add_missing)?;
Ok(Arc::new(StructArray::new(
child_fields.clone(),
s,
child_struct.nulls().map(ToOwned::to_owned),
)) as ArrayRef)
} else if is_cast_required(col.data_type(), field.data_type()) {
cast_with_options(col, field.data_type(), cast_options)
} else {
Ok(col.clone())
}
}
}
})
.collect::<Result<Vec<_>, _>>()
Expand All @@ -51,6 +124,7 @@ pub fn cast_record_batch(
batch: &RecordBatch,
target_schema: ArrowSchemaRef,
safe: bool,
add_missing: bool,
) -> DeltaResult<RecordBatch> {
let cast_options = CastOptions {
safe,
Expand All @@ -62,8 +136,7 @@ pub fn cast_record_batch(
batch.columns().to_owned(),
None,
);

let columns = cast_struct(&s, target_schema.fields(), &cast_options)?;
let columns = cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?;
Ok(RecordBatch::try_new(target_schema, columns)?)
}

Expand Down Expand Up @@ -93,7 +166,7 @@ mod tests {
)]);
let target_schema = Arc::new(Schema::new(fields)) as SchemaRef;

let result = cast_record_batch(&record_batch, target_schema, false);
let result = cast_record_batch(&record_batch, target_schema, false, false);

let schema = result.unwrap().schema();
let field = schema.column_with_name("list_column").unwrap().1;
Expand Down
11 changes: 9 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! .await?;
//! ````

use core::panic;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -167,9 +168,15 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
None,
)
.await?;
.await?
.into_iter()
.map(|a| match a {
Action::Add(a) => a,
_ => panic!("Expected Add action"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I allowed to do that? :)

})
.collect::<Vec<Add>>();

let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows());
let filter_records = filter.metrics().and_then(|m| m.output_rows());
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,13 +1379,13 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
None,
)
.await?;

metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64;

let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();
metrics.num_target_files_added = actions.len();

let survivors = barrier
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,12 @@ impl MergePlan {
while let Some(maybe_batch) = read_stream.next().await {
let mut batch = maybe_batch?;

batch =
super::cast::cast_record_batch(&batch, task_parameters.file_schema.clone(), false)?;
batch = super::cast::cast_record_batch(
&batch,
task_parameters.file_schema.clone(),
false,
false,
)?;
partial_metrics.num_batches += 1;
writer.write(&batch).await.map_err(DeltaTableError::from)?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
None,
)
.await?;

Expand All @@ -377,7 +377,7 @@ async fn execute(
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();

metrics.num_added_files = actions.len();
metrics.num_removed_files = candidates.candidates.len();
Expand Down
Loading
Loading