Skip to content

Commit

Permalink
Merge pull request #2 from alamb/alamb/improved_error_plan
Browse files Browse the repository at this point in the history
Use explicit enum for physical errors
  • Loading branch information
Lordworms authored Nov 3, 2024
2 parents 8001eed + 0b6f498 commit 82d9560
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 47 deletions.
7 changes: 6 additions & 1 deletion datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum PlanType {
FinalPhysicalPlanWithStats,
/// The final with schema, fully optimized physical plan which would be executed
FinalPhysicalPlanWithSchema,
/// An error creating the physical plan
PhysicalPlanError,
}

impl Display for PlanType {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Display for PlanType {
PlanType::FinalPhysicalPlanWithSchema => {
write!(f, "physical_plan_with_schema")
}
PlanType::PhysicalPlanError => write!(f, "physical_plan_error"),
}
}
}
Expand Down Expand Up @@ -118,7 +121,9 @@ impl StringifiedPlan {
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
PlanType::FinalLogicalPlan
| PlanType::FinalPhysicalPlan
| PlanType::PhysicalPlanError => true,
_ => verbose_mode,
}
}
Expand Down
14 changes: 5 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,18 +1798,14 @@ impl DefaultPhysicalPlanner {
}
}
Err(err) => {
return Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(Schema::new(vec![arrow_schema::Field::new(
"Err",
arrow_schema::DataType::Utf8,
false,
)])),
vec![StringifiedPlan::new(FinalLogicalPlan, err.to_string())],
e.verbose,
))))
stringified_plans.push(StringifiedPlan::new(
PhysicalPlanError,
err.to_string(),
));
}
}
}

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(e.schema.as_ref().to_owned().into()),
stringified_plans,
Expand Down
17 changes: 0 additions & 17 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ impl ExplainExec {
self.verbose
}

/// check if current plan is a failed explain plan
pub fn is_failed_explain(&self) -> bool {
self.stringified_plans.len() == 1 && self.schema.fields().len() == 1
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
Expand Down Expand Up @@ -137,18 +132,6 @@ impl ExecutionPlan for ExplainExec {
if 0 != partition {
return internal_err!("ExplainExec invalid partition {partition}");
}
if self.is_failed_explain() {
let mut err_builder = StringBuilder::with_capacity(1, 1024);
err_builder.append_value(&*self.stringified_plans[0].plan);
let record_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
vec![Arc::new(err_builder.finish())],
)?;
return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::iter(vec![Ok(record_batch)]),
)));
}
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ message PlanType {
datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
datafusion_common.EmptyMessage PhysicalPlanError = 13;
}
}

Expand Down
13 changes: 13 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::protobuf::{
AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
OptimizedPhysicalPlan,
OptimizedPhysicalPlan, PhysicalPlanError,
},
AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
Expand Down Expand Up @@ -141,6 +141,7 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
PhysicalPlanError(_) => PlanType::PhysicalPlanError,
},
plan: Arc::new(stringified_plan.plan.clone()),
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::protobuf::{
FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
PhysicalPlanError,
},
AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
Expand Down Expand Up @@ -115,6 +116,9 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
}),
PlanType::PhysicalPlanError => Some(protobuf::PlanType {
plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
}),
},
plan: stringified_plan.plan.to_string(),
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,17 @@ create table t1(a int);
statement ok
create table t2(b int);

query T
query TT
explain select a from t1 where exists (select count(*) from t2);
----
This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })
logical_plan
01)Filter: EXISTS (<subquery>)
02)--Subquery:
03)----Projection: count(*)
04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
05)--------TableScan: t2
06)--TableScan: t1 projection=[a]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })

statement ok
drop table t1;
Expand Down
9 changes: 7 additions & 2 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4070,14 +4070,19 @@ physical_plan
08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true

# we do not generate physical plan for Repartition yet (e.g Distribute By queries).
query T
query TT
EXPLAIN SELECT a, b, sum1
FROM (SELECT c, b, a, SUM(d) as sum1
FROM multiple_ordered_table_with_pk
GROUP BY c)
DISTRIBUTE BY a
----
This feature is not implemented: Physical plan does not support DistributeBy partitioning
logical_plan
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
physical_plan_error This feature is not implemented: Physical plan does not support DistributeBy partitioning

# union with aggregate
query TT
Expand Down
28 changes: 24 additions & 4 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4049,10 +4049,20 @@ physical_plan


# Test CROSS JOIN LATERAL syntax (planning)
query T
query TT
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
----
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })
logical_plan
01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))
09)------------EmptyRelation
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })


# Test CROSS JOIN LATERAL syntax (execution)
Expand All @@ -4062,10 +4072,20 @@ select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnes


# Test INNER JOIN LATERAL syntax (planning)
query T
query TT
explain select t1_id, t1_name, i from join_t1 t2 inner join lateral (select * from unnest(generate_series(1, t1_int))) as series(i) on(t1_id > i);
----
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })
logical_plan
01)Inner Join: Filter: CAST(t2.t1_id AS Int64) > series.i
02)--SubqueryAlias: t2
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))
09)------------EmptyRelation
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })


# Test INNER JOIN LATERAL syntax (execution)
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ query TT
EXPLAIN EXECUTE my_plan;
----
logical_plan Execute: my_plan params=[]
physical_plan_error This feature is not implemented: Unsupported logical plan: Execute

query TT
EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
----
logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]
physical_plan_error This feature is not implemented: Unsupported logical plan: Execute

query error DataFusion error: Schema error: No field named a\.
EXPLAIN EXECUTE my_plan(a);
Expand Down
52 changes: 42 additions & 10 deletions datafusion/sqllogictest/test_files/update.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,54 @@ create table t1(a int, b varchar, c double, d int);
statement ok
set datafusion.optimizer.max_passes = 0;

query T
query TT
explain update t1 set a=1, b=2, c=3.0, d=NULL;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, Float64(3) AS c, CAST(NULL AS Int32) AS d
03)----TableScan: t1
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

query T
query TT
explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
03)----TableScan: t1
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t2(a int, b varchar, c double, d int);

## set from subquery
query T
query TT
explain update t1 set b = (select max(b) from t2 where t1.a = t2.a)
----
This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, (<subquery>) AS b, t1.c AS c, t1.d AS d
03)----Subquery:
04)------Projection: max(t2.b)
05)--------Aggregate: groupBy=[[]], aggr=[[max(t2.b)]]
06)----------Filter: outer_ref(t1.a) = t2.a
07)------------TableScan: t2
08)----TableScan: t1
physical_plan_error This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)

# set from other table
query T
query TT
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t3(a int, b varchar, c double, d int);
Expand All @@ -59,7 +83,15 @@ query error DataFusion error: SQL error: ParserError\("Expected end of statement
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a;

# test table alias
query T
query TT
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

0 comments on commit 82d9560

Please sign in to comment.