Skip to content

Commit

Permalink
Reduce DataFrame stack size and fix large futures warnings (#10123)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk authored Apr 18, 2024
1 parent 1f71d79 commit d4eb72c
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 70 deletions.
174 changes: 122 additions & 52 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl Default for DataFrameWriteOptions {
/// ```
#[derive(Debug, Clone)]
pub struct DataFrame {
session_state: SessionState,
// Box the (large) SessionState to reduce the size of DataFrame on the stack
session_state: Box<SessionState>,
plan: LogicalPlan,
}

Expand All @@ -168,7 +169,7 @@ impl DataFrame {
/// `DataFrame` from an existing datasource.
pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
Self {
session_state,
session_state: Box::new(session_state),
plan,
}
}
Expand Down Expand Up @@ -230,7 +231,10 @@ impl DataFrame {
};
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;

Ok(DataFrame::new(self.session_state, project_plan))
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
})
}

/// Expand each list element of a column to multiple rows.
Expand Down Expand Up @@ -269,7 +273,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.unnest_column_with_options(column, options)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a DataFrame with only rows for which `predicate` evaluates to
Expand All @@ -294,7 +301,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.filter(predicate)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new `DataFrame` that aggregates the rows of the current
Expand Down Expand Up @@ -325,7 +335,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.aggregate(group_expr, aggr_expr)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new DataFrame that adds the result of evaluating one or more
Expand All @@ -334,7 +347,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.window(window_exprs)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Returns a new `DataFrame` with a limited number of rows.
Expand All @@ -359,7 +375,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.limit(skip, fetch)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
Expand All @@ -383,7 +402,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.union(dataframe.plan)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Calculate the distinct union of two [`DataFrame`]s.
Expand All @@ -405,12 +427,13 @@ impl DataFrame {
/// # }
/// ```
pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::from(self.plan)
.union_distinct(dataframe.plan)?
.build()?,
))
let plan = LogicalPlanBuilder::from(self.plan)
.union_distinct(dataframe.plan)?
.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new `DataFrame` with all duplicated rows removed.
Expand All @@ -428,10 +451,11 @@ impl DataFrame {
/// # }
/// ```
pub fn distinct(self) -> Result<DataFrame> {
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::from(self.plan).distinct()?.build()?,
))
let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a new `DataFrame` that has statistics for a DataFrame.
Expand Down Expand Up @@ -599,15 +623,18 @@ impl DataFrame {
describe_record_batch.schema(),
vec![vec![describe_record_batch]],
)?;
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::scan(
UNNAMED_TABLE,
provider_as_source(Arc::new(provider)),
None,
)?
.build()?,
))

let plan = LogicalPlanBuilder::scan(
UNNAMED_TABLE,
provider_as_source(Arc::new(provider)),
None,
)?
.build()?;

Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Sort the DataFrame by the specified sorting expressions.
Expand All @@ -633,7 +660,10 @@ impl DataFrame {
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Join this `DataFrame` with another `DataFrame` using explicitly specified
Expand Down Expand Up @@ -687,7 +717,10 @@ impl DataFrame {
filter,
)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Join this `DataFrame` with another `DataFrame` using the specified
Expand Down Expand Up @@ -737,7 +770,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.join_on(right.plan, join_type, expr)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Repartition a DataFrame based on a logical partitioning scheme.
Expand All @@ -758,7 +794,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.repartition(partitioning_scheme)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return the total number of rows in this `DataFrame`.
Expand Down Expand Up @@ -863,7 +902,7 @@ impl DataFrame {

/// Return a new [`TaskContext`] which would be used to execute this DataFrame
pub fn task_ctx(&self) -> TaskContext {
TaskContext::from(&self.session_state)
TaskContext::from(self.session_state.as_ref())
}

/// Executes this DataFrame and returns a stream over a single partition
Expand Down Expand Up @@ -969,7 +1008,7 @@ impl DataFrame {

/// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
pub fn into_parts(self) -> (SessionState, LogicalPlan) {
(self.session_state, self.plan)
(*self.session_state, self.plan)
}

/// Return the [`LogicalPlan`] represented by this DataFrame without running
Expand Down Expand Up @@ -1023,7 +1062,10 @@ impl DataFrame {
let plan = LogicalPlanBuilder::from(self.plan)
.explain(verbose, analyze)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Return a `FunctionRegistry` used to plan udf's calls
Expand All @@ -1042,7 +1084,7 @@ impl DataFrame {
/// # }
/// ```
pub fn registry(&self) -> &dyn FunctionRegistry {
&self.session_state
self.session_state.as_ref()
}

/// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
Expand All @@ -1062,10 +1104,11 @@ impl DataFrame {
pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
let left_plan = self.plan;
let right_plan = dataframe.plan;
Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
))
let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
Expand All @@ -1085,11 +1128,11 @@ impl DataFrame {
pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
let left_plan = self.plan;
let right_plan = dataframe.plan;

Ok(DataFrame::new(
self.session_state,
LogicalPlanBuilder::except(left_plan, right_plan, true)?,
))
let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Execute this `DataFrame` and write the results to `table_name`.
Expand All @@ -1114,7 +1157,13 @@ impl DataFrame {
write_options.overwrite,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await

DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}

/// Execute the `DataFrame` and write the results to CSV file(s).
Expand Down Expand Up @@ -1162,7 +1211,13 @@ impl DataFrame {
options.partition_by,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await

DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}

/// Execute the `DataFrame` and write the results to JSON file(s).
Expand Down Expand Up @@ -1211,7 +1266,13 @@ impl DataFrame {
options.partition_by,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await

DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}

/// Add an additional column to the DataFrame.
Expand Down Expand Up @@ -1258,7 +1319,10 @@ impl DataFrame {

let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;

Ok(DataFrame::new(self.session_state, project_plan))
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
})
}

/// Rename one column by applying a new projection. This is a no-op if the column to be
Expand Down Expand Up @@ -1322,7 +1386,10 @@ impl DataFrame {
let project_plan = LogicalPlanBuilder::from(self.plan)
.project(projection)?
.build()?;
Ok(DataFrame::new(self.session_state, project_plan))
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
})
}

/// Replace all parameters in logical plan with the specified
Expand Down Expand Up @@ -1384,7 +1451,10 @@ impl DataFrame {
/// ```
pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
let plan = self.plan.with_param_values(query_values)?;
Ok(Self::new(self.session_state, plan))
Ok(DataFrame {
session_state: self.session_state,
plan,
})
}

/// Cache DataFrame as a memory table.
Expand All @@ -1401,7 +1471,7 @@ impl DataFrame {
/// # }
/// ```
pub async fn cache(self) -> Result<DataFrame> {
let context = SessionContext::new_with_state(self.session_state.clone());
let context = SessionContext::new_with_state((*self.session_state).clone());
// The schema is consistent with the output
let plan = self.clone().create_physical_plan().await?;
let schema = plan.schema();
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ impl DataFrame {
options.partition_by,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
DataFrame {
session_state: self.session_state,
plan,
}
.collect()
.await
}
}

Expand Down
Loading

0 comments on commit d4eb72c

Please sign in to comment.