Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/debug
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 21, 2024
2 parents a75220c + 515a64e commit 9b024cb
Show file tree
Hide file tree
Showing 64 changed files with 2,489 additions and 947 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ members = [
"datafusion/functions-aggregate-common",
"datafusion/functions-nested",
"datafusion/functions-window",
"datafusion/functions-window-common",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
Expand Down Expand Up @@ -103,6 +104,7 @@ datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", vers
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "42.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "42.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "42.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "42.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "42.0.0", default-features = false }
Expand Down
10 changes: 10 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use arrow::{
array::{ArrayRef, AsArray, Float64Array},
datatypes::Float64Type,
};
use arrow_schema::Field;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
use datafusion_expr::function::WindowUDFFieldArgs;
use datafusion_expr::{
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
};
Expand Down Expand Up @@ -70,16 +72,15 @@ impl WindowUDFImpl for SmoothItUdf {
&self.signature
}

/// What is the type of value that will be returned by this function.
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Float64)
}

/// Create a `PartitionEvaluator` to evaluate this function on a new
/// partition.
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(MyPartitionEvaluator::new()))
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, true))
}
}

/// This implements the lowest level evaluation for a window function
Expand Down
12 changes: 6 additions & 6 deletions datafusion-examples/examples/simplify_udwf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

use std::any::Any;

use arrow_schema::DataType;
use arrow_schema::{DataType, Field};

use datafusion::execution::context::SessionContext;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::{error::Result, execution::options::CsvReadOptions};
use datafusion_expr::function::WindowFunctionSimplification;
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
use datafusion_expr::{
expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, Signature,
Volatility, WindowUDF, WindowUDFImpl,
Expand Down Expand Up @@ -60,10 +60,6 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Float64)
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
todo!()
}
Expand All @@ -84,6 +80,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf {

Some(Box::new(simplify))
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, true))
}
}

// create local execution context with `cars.csv` registered as a table named `cars`
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{

/// Represents which type of plan, when storing multiple
/// for use in EXPLAIN plans
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
InitialLogicalPlan,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Display for PlanType {
}

/// Represents some sort of execution plan, in String form
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct StringifiedPlan {
/// An identifier of what type of plan this string represents
pub plan_type: PlanType,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
use sqlparser::ast::TableConstraint;

/// This object defines a constraint on a table.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum Constraint {
/// Columns with the given indices form a composite primary key (they are
/// jointly unique and not nullable):
Expand All @@ -40,7 +40,7 @@ pub enum Constraint {
}

/// This object encapsulates a list of functional constraints:
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Constraints {
inner: Vec<Constraint>,
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::_not_impl_err;
use crate::{DataFusionError, Result};

/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinType {
/// Inner Join
Inner,
Expand Down Expand Up @@ -88,7 +88,7 @@ impl FromStr for JoinType {
}

/// Join constraint
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinConstraint {
/// Join ON
On,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ bigdecimal = { workspace = true }
criterion = { version = "0.5", features = ["async_tokio"] }
csv = "1.1.6"
ctor = { workspace = true }
datafusion-functions-window-common = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
half = { workspace = true, default-features = true }
Expand Down
16 changes: 14 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,9 +907,21 @@ impl TableProvider for ListingTable {
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
// Return an error if schema of the input query does not match with the table schema.
return plan_err!(
// Return an error if schema of the input query does not match with the table schema.
"Inserting query must have the same schema with the table."
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}

Expand Down
31 changes: 26 additions & 5 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Factory for creating ListingTables with default options

use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -27,7 +28,7 @@ use crate::datasource::listing::{
use crate::execution::context::SessionState;

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::{arrow_datafusion_err, DataFusionError};
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
use datafusion_common::{config_datafusion_err, Result};
use datafusion_expr::CreateExternalTable;

Expand Down Expand Up @@ -113,19 +114,39 @@ impl TableProviderFactory for ListingTableFactory {
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone());
.with_table_partition_cols(table_partition_cols);

options
.validate_partitions(session_state, &table_path)
.await?;

let resolved_schema = match provided_schema {
None => options.infer_schema(session_state, &table_path).await?,
// We will need to check the table columns against the schema
// this is done so that we can do an ORDER BY for external table creation
// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = schema.clone().to_dfschema()?;
let column_refs: HashSet<_> = cmd
.order_exprs
.iter()
.flat_map(|sort| sort.iter())
.flat_map(|s| s.expr.column_refs())
.collect();

for column in &column_refs {
if !df_schema.has_column(column) {
return plan_err!("Column {column} is not in schema");
}
}

schema
}
Some(s) => s,
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,19 @@ impl TableProvider for MemTable {
.logically_equivalent_names_and_types(&input.schema())
{
return plan_err!(
"Inserting query must have the same schema with the table."
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}
if overwrite {
Expand Down
30 changes: 30 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,18 @@ impl SessionStateBuilder {
self
}

/// Add a [`TableProviderFactory`] to the map of factories
pub fn with_table_factory(
mut self,
key: String,
table_factory: Arc<dyn TableProviderFactory>,
) -> Self {
let mut table_factories = self.table_factories.unwrap_or_default();
table_factories.insert(key, table_factory);
self.table_factories = Some(table_factories);
self
}

/// Set the map of [`TableProviderFactory`]s
pub fn with_table_factories(
mut self,
Expand Down Expand Up @@ -1930,4 +1942,22 @@ mod tests {
Optimizer::default().rules.len() + 1
);
}

#[test]
fn test_with_table_factories() -> Result<()> {
use crate::test_util::TestTableFactory;

let state = SessionStateBuilder::new().build();
let table_factories = state.table_factories();
assert!(table_factories.is_empty());

let table_factory = Arc::new(TestTableFactory {});
let state = SessionStateBuilder::new()
.with_table_factory("employee".to_string(), table_factory)
.build();
let table_factories = state.table_factories();
assert_eq!(table_factories.len(), 1);
assert!(table_factories.contains_key("employee"));
Ok(())
}
}
9 changes: 9 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,7 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::cmp::Ordering;
use std::fmt::{self, Debug};
use std::ops::{BitAnd, Not};

Expand Down Expand Up @@ -2528,6 +2529,14 @@ mod tests {
}
}

// Implementation needed for `UserDefinedLogicalNodeCore`, since the only field is
// a schema, we can't derive `PartialOrd`, and we can't compare these.
impl PartialOrd for NoOpExtensionNode {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for NoOpExtensionNode {
fn name(&self) -> &str {
"NoOp"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl OptimizerRule for TopKOptimizerRule {
}
}

#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, PartialOrd, Hash)]
struct TopKPlanNode {
k: usize,
input: LogicalPlan,
Expand Down
Loading

0 comments on commit 9b024cb

Please sign in to comment.