Skip to content

Commit

Permalink
Introduce schema evolution on RecordBatchWriter
Browse files Browse the repository at this point in the history
This commit introduces the `WriteMode` enum and the ability to specify
writes which should enable [schema
evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/).

The result of this is a new `metaData` action added to the transaction
log with the write which reflects the updated schema

There are some caveats however such as all writes must include non-nullable columns.

Fixes delta-io#1386

Sponsored-by: Raft, LLC.
  • Loading branch information
rtyler committed Jan 3, 2024
1 parent 3aa6dc6 commit 72babcc
Show file tree
Hide file tree
Showing 3 changed files with 399 additions and 106 deletions.
41 changes: 14 additions & 27 deletions crates/deltalake-core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value,
};
use super::{utils::PartitionPath, DeltaWriter, DeltaWriterError};
use super::{utils::PartitionPath, DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::{Add, StructType};
use crate::table::builder::DeltaTableBuilder;
use crate::table::DeltaTableMetaData;
use crate::writer::utils::ShareableBuffer;
use crate::DeltaTable;

Expand Down Expand Up @@ -226,30 +225,6 @@ impl JsonWriter {
})
}

/// Retrieves the latest schema from table, compares to the current and updates if changed.
/// When schema is updated then `true` is returned which signals the caller that parquet
/// created file or arrow batch should be revisited.
pub fn update_schema(
&mut self,
metadata: &DeltaTableMetaData,
) -> Result<bool, DeltaTableError> {
let schema: ArrowSchema =
<ArrowSchema as TryFrom<&StructType>>::try_from(&metadata.schema)?;

let schema_updated = self.arrow_schema_ref.as_ref() != &schema
|| self.partition_columns != metadata.partition_columns;

if schema_updated {
let _ = std::mem::replace(&mut self.arrow_schema_ref, Arc::new(schema));
let _ = std::mem::replace(
&mut self.partition_columns,
metadata.partition_columns.clone(),
);
}

Ok(schema_updated)
}

/// Returns the current byte length of the in memory buffer.
/// This may be used by the caller to decide when to finalize the file write.
pub fn buffer_len(&self) -> usize {
Expand Down Expand Up @@ -310,8 +285,20 @@ impl JsonWriter {

#[async_trait::async_trait]
impl DeltaWriter<Vec<Value>> for JsonWriter {
/// Writes the given values to internal parquet buffers for each represented partition.
/// Write a chunk of values into the internal write buffers with the default write mode
async fn write(&mut self, values: Vec<Value>) -> Result<(), DeltaTableError> {
self.write_with_mode(values, WriteMode::Default).await
}

/// Writes the given values to internal parquet buffers for each represented partition.
async fn write_with_mode(
&mut self,
values: Vec<Value>,
mode: WriteMode,
) -> Result<(), DeltaTableError> {
if mode != WriteMode::Default {
warn!("The JsonWriter does not currently support non-default write modes, falling back to default mode");
}
let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new();
let arrow_schema = self.arrow_schema();
let divided = self.divide_by_partition_values(values)?;
Expand Down
69 changes: 47 additions & 22 deletions crates/deltalake-core/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,34 @@ impl From<DeltaWriterError> for DeltaTableError {
DeltaWriterError::Io { source } => DeltaTableError::Io { source },
DeltaWriterError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
DeltaWriterError::Parquet { source } => DeltaTableError::Parquet { source },
DeltaWriterError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
msg: err.to_string(),
},
_ => DeltaTableError::Generic(err.to_string()),
}
}
}

/// Write mode for the [DeltaWriter]
#[derive(Clone, Debug, PartialEq)]
pub enum WriteMode {
/// Default write mode which will return an error if schemas do not match correctly
Default,
/// Merge the schema of the table with the newly written data
///
/// [Read more here](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/)
MergeSchema,
}

#[async_trait]
/// Trait for writing data to Delta tables
pub trait DeltaWriter<T> {
/// write a chunk of values into the internal write buffers.
/// Write a chunk of values into the internal write buffers with the default write mode
async fn write(&mut self, values: T) -> Result<(), DeltaTableError>;

/// Wreite a chunk of values into the internal write buffers with the specified [WriteMode]
async fn write_with_mode(&mut self, values: T, mode: WriteMode) -> Result<(), DeltaTableError>;

/// Flush the internal write buffers to files in the delta table folder structure.
/// The corresponding delta [`Add`] actions are returned and should be committed via a transaction.
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError>;
Expand All @@ -135,26 +152,34 @@ pub trait DeltaWriter<T> {
/// and commit the changes to the Delta log, creating a new table version.
async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result<i64, DeltaTableError> {
let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect();
let partition_cols = table.metadata()?.partition_columns.clone();
let partition_by = if !partition_cols.is_empty() {
Some(partition_cols)
} else {
None
};
let operation = DeltaOperation::Write {
mode: SaveMode::Append,
partition_by,
predicate: None,
};
let version = commit(
table.log_store.as_ref(),
&adds,
operation,
&table.state,
None,
)
.await?;
table.update().await?;
Ok(version)
flush_and_commit(adds, table).await
}
}

/// Method for flushing to be used by writers
pub(crate) async fn flush_and_commit(
adds: Vec<Action>,
table: &mut DeltaTable,
) -> Result<i64, DeltaTableError> {
let partition_cols = table.metadata()?.partition_columns.clone();
let partition_by = if !partition_cols.is_empty() {
Some(partition_cols)
} else {
None
};
let operation = DeltaOperation::Write {
mode: SaveMode::Append,
partition_by,
predicate: None,
};
let version = commit(
table.log_store.as_ref(),
&adds,
operation,
&table.state,
None,
)
.await?;
table.update().await?;
Ok(version)
}
Loading

0 comments on commit 72babcc

Please sign in to comment.