Skip to content

Commit

Permalink
feat: Support scalar subquery in WHERE
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed Jan 30, 2024
1 parent a0b4a6d commit 3d0c281
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 114 deletions.
8 changes: 6 additions & 2 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use super::{dfschema::ToDFSchema, expr_rewriter::coerce_plan_expr_for_schema, Di
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values,
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition,
SubqueryType, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;

Expand Down Expand Up @@ -528,12 +529,15 @@ impl LogicalPlanBuilder {
pub fn subquery(
&self,
subqueries: impl IntoIterator<Item = impl Into<LogicalPlan>>,
types: impl IntoIterator<Item = SubqueryType>,
) -> Result<Self> {
let subqueries = subqueries.into_iter().map(|l| l.into()).collect::<Vec<_>>();
let schema = Arc::new(Subquery::merged_schema(&self.plan, &subqueries));
let types = types.into_iter().collect::<Vec<_>>();
let schema = Arc::new(Subquery::merged_schema(&self.plan, &subqueries, &types));
Ok(Self::from(LogicalPlan::Subquery(Subquery {
input: Arc::new(self.plan.clone()),
subqueries,
types,
schema,
})))
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ pub use plan::{
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, Distinct,
DropTable, EmptyRelation, Filter, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, Subquery,
TableScan, ToStringifiedPlan, Union, Values,
SubqueryType, TableScan, ToStringifiedPlan, Union, Values,
};
pub use registry::FunctionRegistry;
74 changes: 64 additions & 10 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,64 @@ pub struct Limit {
/// Evaluates correlated sub queries
#[derive(Clone)]
pub struct Subquery {
/// The list of sub queries
pub subqueries: Vec<LogicalPlan>,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The list of sub queries
pub subqueries: Vec<LogicalPlan>,
/// The list of subquery types
pub types: Vec<SubqueryType>,
/// The schema description of the output
pub schema: DFSchemaRef,
}

/// Subquery type
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum SubqueryType {
/// Scalar (SELECT, WHERE) evaluating to one value
Scalar,
// This will be extended with `Exists` and `AnyAll` types.
}

impl Display for SubqueryType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let subquery_type = match self {
SubqueryType::Scalar => "Scalar",
};
write!(f, "{}", subquery_type)
}
}

impl Subquery {
/// Merge schema of main input and correlated subquery columns
pub fn merged_schema(input: &LogicalPlan, subqueries: &[LogicalPlan]) -> DFSchema {
subqueries.iter().fold((**input.schema()).clone(), |a, b| {
let mut res = a;
res.merge(b.schema());
res
})
pub fn merged_schema(
input: &LogicalPlan,
subqueries: &[LogicalPlan],
types: &[SubqueryType],
) -> DFSchema {
subqueries.iter().zip(types.iter()).fold(
(**input.schema()).clone(),
|schema, (plan, typ)| {
let mut schema = schema;
schema.merge(&Self::transform_dfschema(plan.schema(), *typ));
schema
},
)
}

/// Transform DataFusion schema according to subquery type
pub fn transform_dfschema(schema: &DFSchema, typ: SubqueryType) -> DFSchema {
match typ {
SubqueryType::Scalar => schema.clone(),
// Schema will be transformed for `Exists` and `AnyAll`
}
}

/// Transform Arrow field according to subquery type
pub fn transform_field(field: &Field, typ: SubqueryType) -> Field {
match typ {
SubqueryType::Scalar => field.clone(),
// Field will be transformed for `Exists` and `AnyAll`
}
}
}

Expand Down Expand Up @@ -475,13 +517,23 @@ impl LogicalPlan {
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
LogicalPlan::Window(Window { input, schema, .. })
| LogicalPlan::Projection(Projection { input, schema, .. })
| LogicalPlan::Subquery(Subquery { input, schema, .. })
| LogicalPlan::Aggregate(Aggregate { input, schema, .. })
| LogicalPlan::TableUDFs(TableUDFs { input, schema, .. }) => {
let mut schemas = input.all_schemas();
schemas.insert(0, schema);
schemas
}
LogicalPlan::Subquery(Subquery {
input,
subqueries,
schema,
..
}) => {
let mut schemas = input.all_schemas();
schemas.extend(subqueries.iter().map(|s| s.schema()));
schemas.insert(0, schema);
schemas
}
LogicalPlan::Join(Join {
left,
right,
Expand Down Expand Up @@ -1063,7 +1115,9 @@ impl LogicalPlan {
}
Ok(())
}
LogicalPlan::Subquery(Subquery { .. }) => write!(f, "Subquery"),
LogicalPlan::Subquery(Subquery { types, .. }) => {
write!(f, "Subquery: types={:?}", types)
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/optimizer/projection_drop_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ fn optimize_plan(
LogicalPlan::Subquery(Subquery {
input,
subqueries,
types,
schema,
}) => {
// TODO: subqueries are not optimized
Expand All @@ -269,6 +270,7 @@ fn optimize_plan(
.map(|(p, _)| p)?,
),
subqueries: subqueries.clone(),
types: types.clone(),
schema: schema.clone(),
}),
None,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,10 @@ fn optimize_plan(
}))
}
LogicalPlan::Subquery(Subquery {
input, subqueries, ..
input,
subqueries,
types,
..
}) => {
let mut subquery_required_columns = HashSet::new();
for subquery in subqueries.iter() {
Expand Down Expand Up @@ -484,11 +487,12 @@ fn optimize_plan(
has_projection,
_optimizer_config,
)?;
let new_schema = Subquery::merged_schema(&input, subqueries);
let new_schema = Subquery::merged_schema(&input, subqueries, types);
Ok(LogicalPlan::Subquery(Subquery {
input: Arc::new(input),
schema: Arc::new(new_schema),
subqueries: subqueries.clone(),
types: types.clone(),
}))
}
// all other nodes: Add any additional columns used by
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ pub fn from_plan(
alias: alias.clone(),
}))
}
LogicalPlan::Subquery(Subquery { schema, .. }) => {
LogicalPlan::Subquery(Subquery { schema, types, .. }) => {
Ok(LogicalPlan::Subquery(Subquery {
subqueries: inputs[1..inputs.len()].to_vec(),
input: Arc::new(inputs[0].clone()),
subqueries: inputs[1..inputs.len()].to_vec(),
types: types.clone(),
schema: schema.clone(),
}))
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Subquery(Subquery { subqueries, input, schema }) => {
LogicalPlan::Subquery(Subquery { input, subqueries, types, schema }) => {
let cursor = Arc::new(OuterQueryCursor::new(schema.as_ref().to_owned().into()));
let mut new_session_state = session_state.clone();
new_session_state.execution_props = new_session_state.execution_props.with_outer_query_cursor(cursor.clone());
Expand All @@ -931,7 +931,7 @@ impl DefaultPhysicalPlanner {
})
.collect::<Vec<_>>();
let input = self.create_initial_plan(input, &new_session_state).await?;
Ok(Arc::new(SubqueryExec::try_new(subqueries, input, cursor)?))
Ok(Arc::new(SubqueryExec::try_new(input, subqueries, types.clone(), cursor)?))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
Expand Down
Loading

0 comments on commit 3d0c281

Please sign in to comment.