From d4eb72c30d45c0f3f359c64f41a6caed30abe750 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Thu, 18 Apr 2024 03:45:33 -0700 Subject: [PATCH] Reduce DataFrame stack size and fix large futures warnings (#10123) --- datafusion/core/src/dataframe/mod.rs | 174 +++++++++++++------ datafusion/core/src/dataframe/parquet.rs | 7 +- datafusion/core/src/execution/context/mod.rs | 47 +++-- 3 files changed, 158 insertions(+), 70 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index eea5fc1127ce..75ca93a4c9b9 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -156,7 +156,8 @@ impl Default for DataFrameWriteOptions { /// ``` #[derive(Debug, Clone)] pub struct DataFrame { - session_state: SessionState, + // Box the (large) SessionState to reduce the size of DataFrame on the stack + session_state: Box, plan: LogicalPlan, } @@ -168,7 +169,7 @@ impl DataFrame { /// `DataFrame` from an existing datasource. pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self { Self { - session_state, + session_state: Box::new(session_state), plan, } } @@ -230,7 +231,10 @@ impl DataFrame { }; let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Expand each list element of a column to multiple rows. @@ -269,7 +273,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .unnest_column_with_options(column, options)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a DataFrame with only rows for which `predicate` evaluates to @@ -294,7 +301,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .filter(predicate)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that aggregates the rows of the current @@ -325,7 +335,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .aggregate(group_expr, aggr_expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new DataFrame that adds the result of evaluating one or more @@ -334,7 +347,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .window(window_exprs)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Returns a new `DataFrame` with a limited number of rows. @@ -359,7 +375,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .limit(skip, fetch)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows. @@ -383,7 +402,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .union(dataframe.plan)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the distinct union of two [`DataFrame`]s. @@ -405,12 +427,13 @@ impl DataFrame { /// # } /// ``` pub fn union_distinct(self, dataframe: DataFrame) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan) - .union_distinct(dataframe.plan)? - .build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan) + .union_distinct(dataframe.plan)? + .build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` with all duplicated rows removed. @@ -428,10 +451,11 @@ impl DataFrame { /// # } /// ``` pub fn distinct(self) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan).distinct()?.build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that has statistics for a DataFrame. @@ -599,15 +623,18 @@ impl DataFrame { describe_record_batch.schema(), vec![vec![describe_record_batch]], )?; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::scan( - UNNAMED_TABLE, - provider_as_source(Arc::new(provider)), - None, - )? - .build()?, - )) + + let plan = LogicalPlanBuilder::scan( + UNNAMED_TABLE, + provider_as_source(Arc::new(provider)), + None, + )? + .build()?; + + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Sort the DataFrame by the specified sorting expressions. @@ -633,7 +660,10 @@ impl DataFrame { /// ``` pub fn sort(self, expr: Vec) -> Result { let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using explicitly specified @@ -687,7 +717,10 @@ impl DataFrame { filter, )? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using the specified @@ -737,7 +770,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .join_on(right.plan, join_type, expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Repartition a DataFrame based on a logical partitioning scheme. @@ -758,7 +794,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .repartition(partitioning_scheme)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return the total number of rows in this `DataFrame`. @@ -863,7 +902,7 @@ impl DataFrame { /// Return a new [`TaskContext`] which would be used to execute this DataFrame pub fn task_ctx(&self) -> TaskContext { - TaskContext::from(&self.session_state) + TaskContext::from(self.session_state.as_ref()) } /// Executes this DataFrame and returns a stream over a single partition @@ -969,7 +1008,7 @@ impl DataFrame { /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`] pub fn into_parts(self) -> (SessionState, LogicalPlan) { - (self.session_state, self.plan) + (*self.session_state, self.plan) } /// Return the [`LogicalPlan`] represented by this DataFrame without running @@ -1023,7 +1062,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .explain(verbose, analyze)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a `FunctionRegistry` used to plan udf's calls @@ -1042,7 +1084,7 @@ impl DataFrame { /// # } /// ``` pub fn registry(&self) -> &dyn FunctionRegistry { - &self.session_state + self.session_state.as_ref() } /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1062,10 +1104,11 @@ impl DataFrame { pub fn intersect(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::intersect(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1085,11 +1128,11 @@ impl DataFrame { pub fn except(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::except(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Execute this `DataFrame` and write the results to `table_name`. @@ -1114,7 +1157,13 @@ impl DataFrame { write_options.overwrite, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to CSV file(s). @@ -1162,7 +1211,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to JSON file(s). @@ -1211,7 +1266,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Add an additional column to the DataFrame. @@ -1258,7 +1319,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Rename one column by applying a new projection. This is a no-op if the column to be @@ -1322,7 +1386,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(self.plan) .project(projection)? .build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Replace all parameters in logical plan with the specified @@ -1384,7 +1451,10 @@ impl DataFrame { /// ``` pub fn with_param_values(self, query_values: impl Into) -> Result { let plan = self.plan.with_param_values(query_values)?; - Ok(Self::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Cache DataFrame as a memory table. @@ -1401,7 +1471,7 @@ impl DataFrame { /// # } /// ``` pub async fn cache(self) -> Result { - let context = SessionContext::new_with_state(self.session_state.clone()); + let context = SessionContext::new_with_state((*self.session_state).clone()); // The schema is consistent with the output let plan = self.clone().create_physical_plan().await?; let schema = plan.schema(); diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 7cc3201bf7e4..0ec46df0ae5d 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -68,7 +68,12 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 31f390607f04..89662062452b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -466,24 +466,37 @@ impl SessionContext { /// [`SQLOptions::verify_plan`]. pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { match plan { - LogicalPlan::Ddl(ddl) => match ddl { - DdlStatement::CreateExternalTable(cmd) => { - self.create_external_table(&cmd).await - } - DdlStatement::CreateMemoryTable(cmd) => { - self.create_memory_table(cmd).await - } - DdlStatement::CreateView(cmd) => self.create_view(cmd).await, - DdlStatement::CreateCatalogSchema(cmd) => { - self.create_catalog_schema(cmd).await + LogicalPlan::Ddl(ddl) => { + // Box::pin avoids allocating the stack space within this function's frame + // for every one of these individual async functions, decreasing the risk of + // stack overflows. + match ddl { + DdlStatement::CreateExternalTable(cmd) => { + Box::pin(async move { self.create_external_table(&cmd).await }) + as std::pin::Pin + Send>> + } + DdlStatement::CreateMemoryTable(cmd) => { + Box::pin(self.create_memory_table(cmd)) + } + DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)), + DdlStatement::CreateCatalogSchema(cmd) => { + Box::pin(self.create_catalog_schema(cmd)) + } + DdlStatement::CreateCatalog(cmd) => { + Box::pin(self.create_catalog(cmd)) + } + DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)), + DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)), + DdlStatement::DropCatalogSchema(cmd) => { + Box::pin(self.drop_schema(cmd)) + } + DdlStatement::CreateFunction(cmd) => { + Box::pin(self.create_function(cmd)) + } + DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)), } - DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await, - DdlStatement::DropTable(cmd) => self.drop_table(cmd).await, - DdlStatement::DropView(cmd) => self.drop_view(cmd).await, - DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await, - DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await, - DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await, - }, + .await + } // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await