Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/improve_boolean…
Browse files Browse the repository at this point in the history
…_handling
  • Loading branch information
alamb committed Sep 30, 2024
2 parents 36a2003 + ba4488f commit c985303
Show file tree
Hide file tree
Showing 40 changed files with 693 additions and 150 deletions.
3 changes: 2 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType,
};
Expand Down Expand Up @@ -274,7 +275,7 @@ pub trait TableProvider: Debug + Sync + Send {
&self,
_state: &dyn Session,
_input: Arc<dyn ExecutionPlan>,
_overwrite: bool,
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Insert into not implemented for this table")
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
}

/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`.
///
/// TODO: use implementation in arrow-rs when available:
/// <https://github.com/apache/arrow-rs/pull/6475>
pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result<Vec<ArrayRef>> {
arrays
.iter()
Expand Down
35 changes: 20 additions & 15 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
Expand All @@ -66,8 +67,9 @@ use datafusion_catalog::Session;
/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
overwrite: bool,
/// Controls how new data should be written to the table, determining whether
/// to append, overwrite, or replace existing data.
insert_op: InsertOp,
/// Controls if all partitions should be coalesced into a single output file
/// Generally will have slower performance when set to true.
single_file_output: bool,
Expand All @@ -80,14 +82,15 @@ impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions {
overwrite: false,
insert_op: InsertOp::Append,
single_file_output: false,
partition_by: vec![],
}
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;

/// Set the insert operation
pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
self.insert_op = insert_op;
self
}

Expand Down Expand Up @@ -1525,7 +1528,7 @@ impl DataFrame {
self.plan,
table_name.to_owned(),
&arrow_schema,
write_options.overwrite,
write_options.insert_op,
)?
.build()?;

Expand Down Expand Up @@ -1566,10 +1569,11 @@ impl DataFrame {
options: DataFrameWriteOptions,
writer_options: Option<CsvOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_csv.".to_owned(),
));
if options.insert_op != InsertOp::Append {
return Err(DataFusionError::NotImplemented(format!(
"{} is not implemented for DataFrame::write_csv.",
options.insert_op
)));
}

let format = if let Some(csv_opts) = writer_options {
Expand Down Expand Up @@ -1626,10 +1630,11 @@ impl DataFrame {
options: DataFrameWriteOptions,
writer_options: Option<JsonOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_json.".to_owned(),
));
if options.insert_op != InsertOp::Append {
return Err(DataFusionError::NotImplemented(format!(
"{} is not implemented for DataFrame::write_json.",
options.insert_op
)));
}

let format = if let Some(json_opts) = writer_options {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::{
};

use datafusion_common::config::TableParquetOptions;
use datafusion_expr::dml::InsertOp;

impl DataFrame {
/// Execute the `DataFrame` and write the results to Parquet file(s).
Expand Down Expand Up @@ -57,10 +58,11 @@ impl DataFrame {
options: DataFrameWriteOptions,
writer_options: Option<TableParquetOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_parquet.".to_owned(),
));
if options.insert_op != InsertOp::Append {
return Err(DataFusionError::NotImplemented(format!(
"{} is not implemented for DataFrame::write_parquet.",
options.insert_op
)));
}

let format = if let Some(parquet_opts) = writer_options {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -181,7 +182,7 @@ impl FileFormat for ArrowFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Arrow format");
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion_common::{
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

Expand Down Expand Up @@ -382,7 +383,7 @@ impl FileFormat for CsvFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for CSV");
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -252,7 +253,7 @@ impl FileFormat for JsonFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Json");
}

Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use datafusion_common::{
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -403,7 +404,7 @@ impl FileFormat for ParquetFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}

Expand Down Expand Up @@ -2269,7 +2270,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
Expand Down Expand Up @@ -2364,7 +2365,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
Expand Down Expand Up @@ -2447,7 +2448,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
Expand Down Expand Up @@ -916,7 +917,7 @@ impl TableProvider for ListingTable {
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self
Expand Down Expand Up @@ -975,7 +976,7 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
overwrite,
insert_op,
keep_partition_by_columns,
};

Expand Down Expand Up @@ -1990,7 +1991,8 @@ mod tests {
// Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
Expand Down Expand Up @@ -262,7 +263,7 @@ impl TableProvider for MemTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// If we are inserting into the table, any sort order may be messed up so reset it here
*self.sort_order.lock() = vec![];
Expand All @@ -289,8 +290,8 @@ impl TableProvider for MemTable {
.collect::<Vec<_>>()
);
}
if overwrite {
return not_impl_err!("Overwrite not implemented for MemoryTable yet");
if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(DataSinkExec::new(
Expand Down Expand Up @@ -638,7 +639,8 @@ mod tests {
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
use datafusion_expr::dml::InsertOp;
pub use file_groups::FileGroupPartitioner;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
Expand Down Expand Up @@ -83,8 +84,9 @@ pub struct FileSinkConfig {
/// A vector of column names and their corresponding data types,
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Controls how new data should be written to the file, determining whether
/// to append to, overwrite, or replace records in existing files.
pub insert_op: InsertOp,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use arrow_schema::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -350,7 +351,7 @@ impl TableProvider for StreamTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
_overwrite: bool,
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
let ordering = match self.0.order.first() {
Some(x) => {
Expand Down
25 changes: 14 additions & 11 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,27 +174,30 @@ pub struct SessionState {
}

impl Debug for SessionState {
/// Prefer having short fields at the top and long vector fields near the end
/// Group fields by
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionState")
.field("session_id", &self.session_id)
.field("config", &self.config)
.field("runtime_env", &self.runtime_env)
.field("catalog_list", &"...")
.field("serializer_registry", &"...")
.field("catalog_list", &self.catalog_list)
.field("serializer_registry", &self.serializer_registry)
.field("file_formats", &self.file_formats)
.field("execution_props", &self.execution_props)
.field("table_options", &self.table_options)
.field("table_factories", &"...")
.field("function_factory", &"...")
.field("expr_planners", &"...")
.field("query_planner", &"...")
.field("analyzer", &"...")
.field("optimizer", &"...")
.field("physical_optimizers", &"...")
.field("table_functions", &"...")
.field("table_factories", &self.table_factories)
.field("function_factory", &self.function_factory)
.field("expr_planners", &self.expr_planners)
.field("query_planners", &self.query_planner)
.field("analyzer", &self.analyzer)
.field("optimizer", &self.optimizer)
.field("physical_optimizers", &self.physical_optimizers)
.field("table_functions", &self.table_functions)
.field("scalar_functions", &self.scalar_functions)
.field("aggregate_functions", &self.aggregate_functions)
.field("window_functions", &self.window_functions)
.finish_non_exhaustive()
.finish()
}
}

Expand Down
Loading

0 comments on commit c985303

Please sign in to comment.