Skip to content

Commit

Permalink
fix: jsonwriter should checkpoint by default
Browse files Browse the repository at this point in the history
This is a fix aimed to enable jsonwriter to checkpoint in accordance
with delta.checkpointInterval.  It changes the default commitbuilder to
set a post_commit_hook so that checkpointing will be done by default.
Potentially we could also expose CommitProperties as an argument to
flush_and_commit, but that would require a change to the function
signature and would be a breaking change.

Signed-off-by: Justin Jossick <[email protected]>
  • Loading branch information
jusjosj authored and rtyler committed Nov 16, 2024
1 parent 9982e9c commit d4f18b3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
49 changes: 49 additions & 0 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,4 +659,53 @@ mod tests {
assert_eq!(table.version(), 1);
}
}

#[tokio::test]
async fn test_json_write_checkpoint() {
use crate::operations::create::CreateBuilder;
use std::fs;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let config: HashMap<String, Option<String>> = vec![
(
"delta.checkpointInterval".to_string(),
Some("5".to_string()),
),
("delta.checkpointPolicy".to_string(), Some("v2".to_string())),
]
.into_iter()
.collect();
let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.with_configuration(config)
.await
.unwrap();
assert_eq!(table.version(), 0);
let mut writer = JsonWriter::for_table(&table).unwrap();
let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);
for _ in 1..6 {
writer.write(vec![data.clone()]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
}
let dir_path = path + "/_delta_log";

let target_file = "00000000000000000004.checkpoint.parquet";
let entries: Vec<_> = fs::read_dir(dir_path)
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_name().into_string().unwrap() == target_file)
.collect();
assert_eq!(entries.len(), 1);
}
}
4 changes: 2 additions & 2 deletions crates/core/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::Value;

use crate::errors::DeltaTableError;
use crate::kernel::{Action, Add};
use crate::operations::transaction::CommitBuilder;
use crate::operations::transaction::{CommitBuilder, CommitProperties};
use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode};
use crate::DeltaTable;

Expand Down Expand Up @@ -174,7 +174,7 @@ pub(crate) async fn flush_and_commit(
predicate: None,
};

let version = CommitBuilder::default()
let version = CommitBuilder::from(CommitProperties::default())
.with_actions(adds)
.build(Some(snapshot), table.log_store.clone(), operation)
.await?
Expand Down

0 comments on commit d4f18b3

Please sign in to comment.