Skip to content

Commit

Permalink
Remove table reference mutating during logical planning, converting q…
Browse files Browse the repository at this point in the history
…uery when executing (datafusion-contrib#10)

* keep original qualified_field when wrap_projections

* only mutate the table reference when sending plan to execute

* formatting

* rename method
  • Loading branch information
y-f-u authored Jun 27, 2024
1 parent 776930f commit c3f696a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
14 changes: 13 additions & 1 deletion datafusion-federation/src/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use datafusion::{
error::Result,
logical_expr::{Expr, LogicalPlan, Projection, TableScan, TableSource},
optimizer::analyzer::AnalyzerRule,
sql::TableReference,
};

use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef};
Expand Down Expand Up @@ -142,7 +143,18 @@ fn wrap_projection(plan: LogicalPlan) -> Result<LogicalPlan> {
.schema()
.fields()
.iter()
.map(|f| Expr::Column(Column::new_unqualified(f.name())))
.enumerate()
.map(|(i, f)| {
Expr::Column(Column::from_qualified_name(format!(
"{}.{}",
plan.schema()
.qualified_field(i)
.0
.map(TableReference::table)
.unwrap_or_default(),
f.name()
)))
})
.collect::<Vec<Expr>>();
Ok(LogicalPlan::Projection(Projection::try_new(
expr,
Expand Down
26 changes: 15 additions & 11 deletions sources/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ impl SQLFederationAnalyzerRule {

impl AnalyzerRule for SQLFederationAnalyzerRule {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
// Find all table scans, recover the SQLTableSource, find the remote table name and replace the name of the TableScan table.
let mut known_rewrites = HashMap::new();
let plan = rewrite_table_scans(&plan, &mut known_rewrites)?;

let fed_plan = FederatedPlanNode::new(plan.clone(), Arc::clone(&self.planner));
let ext_node = Extension {
node: Arc::new(fed_plan),
Expand Down Expand Up @@ -631,6 +627,14 @@ impl VirtualExecutionPlan {
let df_schema = self.plan.schema().as_ref();
Arc::new(Schema::from(df_schema))
}

fn sql(&self) -> Result<String> {
// Find all table scans, recover the SQLTableSource, find the remote table name and replace the name of the TableScan table.
let mut known_rewrites = HashMap::new();
let ast = Unparser::new(self.executor.dialect().as_ref())
.plan_to_sql(&rewrite_table_scans(&self.plan, &mut known_rewrites)?)?;
Ok(format!("{ast}"))
}
}

impl DisplayAs for VirtualExecutionPlan {
Expand All @@ -643,7 +647,12 @@ impl DisplayAs for VirtualExecutionPlan {
if let Some(ctx) = self.executor.compute_context() {
write!(f, " compute_context={ctx}")?;
}
write!(f, " sql={ast}")
write!(f, " sql={ast}")?;
if let Ok(query) = self.sql() {
write!(f, " rewritten_sql={query}")?;
};

Ok(())
}
}

Expand Down Expand Up @@ -672,12 +681,7 @@ impl ExecutionPlan for VirtualExecutionPlan {
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let dialect = self.executor.dialect();
let unparser = Unparser::new(dialect.as_ref());
let ast = unparser.plan_to_sql(&self.plan)?;
let query = format!("{ast}");

self.executor.execute(query.as_str(), self.schema())
self.executor.execute(self.sql()?.as_str(), self.schema())
}

fn properties(&self) -> &PlanProperties {
Expand Down

0 comments on commit c3f696a

Please sign in to comment.