Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support scalar subquery in WHERE #111

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use super::{dfschema::ToDFSchema, expr_rewriter::coerce_plan_expr_for_schema, Di
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values,
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition,
SubqueryType, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;

Expand Down Expand Up @@ -528,12 +529,15 @@ impl LogicalPlanBuilder {
pub fn subquery(
&self,
subqueries: impl IntoIterator<Item = impl Into<LogicalPlan>>,
types: impl IntoIterator<Item = SubqueryType>,
) -> Result<Self> {
let subqueries = subqueries.into_iter().map(|l| l.into()).collect::<Vec<_>>();
let schema = Arc::new(Subquery::merged_schema(&self.plan, &subqueries));
let types = types.into_iter().collect::<Vec<_>>();
let schema = Arc::new(Subquery::merged_schema(&self.plan, &subqueries, &types));
Ok(Self::from(LogicalPlan::Subquery(Subquery {
input: Arc::new(self.plan.clone()),
subqueries,
types,
schema,
})))
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ pub use plan::{
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, Distinct,
DropTable, EmptyRelation, Filter, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, Subquery,
TableScan, ToStringifiedPlan, Union, Values,
SubqueryType, TableScan, ToStringifiedPlan, Union, Values,
};
pub use registry::FunctionRegistry;
74 changes: 64 additions & 10 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,64 @@ pub struct Limit {
/// Evaluates correlated sub queries
#[derive(Clone)]
pub struct Subquery {
/// The list of sub queries
pub subqueries: Vec<LogicalPlan>,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The list of sub queries
pub subqueries: Vec<LogicalPlan>,
/// The list of subquery types
pub types: Vec<SubqueryType>,
/// The schema description of the output
pub schema: DFSchemaRef,
}

/// Subquery type
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum SubqueryType {
/// Scalar (SELECT, WHERE) evaluating to one value
Scalar,
// This will be extended with `Exists` and `AnyAll` types.
}

impl Display for SubqueryType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let subquery_type = match self {
SubqueryType::Scalar => "Scalar",
};
write!(f, "{}", subquery_type)
}
}

impl Subquery {
/// Merge schema of main input and correlated subquery columns
pub fn merged_schema(input: &LogicalPlan, subqueries: &[LogicalPlan]) -> DFSchema {
subqueries.iter().fold((**input.schema()).clone(), |a, b| {
let mut res = a;
res.merge(b.schema());
res
})
pub fn merged_schema(
input: &LogicalPlan,
subqueries: &[LogicalPlan],
types: &[SubqueryType],
) -> DFSchema {
subqueries.iter().zip(types.iter()).fold(
(**input.schema()).clone(),
|schema, (plan, typ)| {
let mut schema = schema;
schema.merge(&Self::transform_dfschema(plan.schema(), *typ));
schema
},
)
}

/// Transform DataFusion schema according to subquery type
pub fn transform_dfschema(schema: &DFSchema, typ: SubqueryType) -> DFSchema {
match typ {
SubqueryType::Scalar => schema.clone(),
// Schema will be transformed for `Exists` and `AnyAll`
}
}

/// Transform Arrow field according to subquery type
pub fn transform_field(field: &Field, typ: SubqueryType) -> Field {
match typ {
SubqueryType::Scalar => field.clone(),
// Field will be transformed for `Exists` and `AnyAll`
}
}
}

Expand Down Expand Up @@ -475,13 +517,23 @@ impl LogicalPlan {
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
LogicalPlan::Window(Window { input, schema, .. })
| LogicalPlan::Projection(Projection { input, schema, .. })
| LogicalPlan::Subquery(Subquery { input, schema, .. })
| LogicalPlan::Aggregate(Aggregate { input, schema, .. })
| LogicalPlan::TableUDFs(TableUDFs { input, schema, .. }) => {
let mut schemas = input.all_schemas();
schemas.insert(0, schema);
schemas
}
LogicalPlan::Subquery(Subquery {
input,
subqueries,
schema,
..
}) => {
let mut schemas = input.all_schemas();
schemas.extend(subqueries.iter().map(|s| s.schema()));
schemas.insert(0, schema);
schemas
}
LogicalPlan::Join(Join {
left,
right,
Expand Down Expand Up @@ -1063,7 +1115,9 @@ impl LogicalPlan {
}
Ok(())
}
LogicalPlan::Subquery(Subquery { .. }) => write!(f, "Subquery"),
LogicalPlan::Subquery(Subquery { types, .. }) => {
write!(f, "Subquery: types={:?}", types)
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/optimizer/projection_drop_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ fn optimize_plan(
LogicalPlan::Subquery(Subquery {
input,
subqueries,
types,
schema,
}) => {
// TODO: subqueries are not optimized
Expand All @@ -269,6 +270,7 @@ fn optimize_plan(
.map(|(p, _)| p)?,
),
subqueries: subqueries.clone(),
types: types.clone(),
schema: schema.clone(),
}),
None,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,10 @@ fn optimize_plan(
}))
}
LogicalPlan::Subquery(Subquery {
input, subqueries, ..
input,
subqueries,
types,
..
}) => {
let mut subquery_required_columns = HashSet::new();
for subquery in subqueries.iter() {
Expand Down Expand Up @@ -484,11 +487,12 @@ fn optimize_plan(
has_projection,
_optimizer_config,
)?;
let new_schema = Subquery::merged_schema(&input, subqueries);
let new_schema = Subquery::merged_schema(&input, subqueries, types);
Ok(LogicalPlan::Subquery(Subquery {
input: Arc::new(input),
schema: Arc::new(new_schema),
subqueries: subqueries.clone(),
types: types.clone(),
}))
}
// all other nodes: Add any additional columns used by
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ pub fn from_plan(
alias: alias.clone(),
}))
}
LogicalPlan::Subquery(Subquery { schema, .. }) => {
LogicalPlan::Subquery(Subquery { schema, types, .. }) => {
Ok(LogicalPlan::Subquery(Subquery {
subqueries: inputs[1..inputs.len()].to_vec(),
input: Arc::new(inputs[0].clone()),
subqueries: inputs[1..inputs.len()].to_vec(),
types: types.clone(),
schema: schema.clone(),
}))
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Subquery(Subquery { subqueries, input, schema }) => {
LogicalPlan::Subquery(Subquery { input, subqueries, types, schema }) => {
let cursor = Arc::new(OuterQueryCursor::new(schema.as_ref().to_owned().into()));
let mut new_session_state = session_state.clone();
new_session_state.execution_props = new_session_state.execution_props.with_outer_query_cursor(cursor.clone());
Expand All @@ -931,7 +931,7 @@ impl DefaultPhysicalPlanner {
})
.collect::<Vec<_>>();
let input = self.create_initial_plan(input, &new_session_state).await?;
Ok(Arc::new(SubqueryExec::try_new(subqueries, input, cursor)?))
Ok(Arc::new(SubqueryExec::try_new(input, subqueries, types.clone(), cursor)?))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
Expand Down Expand Up @@ -1033,6 +1033,7 @@ pub fn create_physical_expr(
let cursors = execution_props.outer_query_cursors.clone();
let cursor = cursors
.iter()
.rev()
.find(|cur| cur.schema().field_with_name(c.name.as_str()).is_ok())
.ok_or_else(|| {
DataFusionError::Execution(format!(
Expand Down
Loading
Loading